The edge cases 2to3 might not fix

A year after Python 2 was officially deprecated, 2to3 is still my favourite tool for porting Python 2 code to Python 3. Only recently, when using it on a legacy code base, I found one of the edge cases 2to3 will not fix for you. Consider this function in Python, left completely untouched by running 2to3. It worked fine in Python 2, but throws RecursionError in Python 3. (It is of questionable quality; I didn’t make it originally).

def safe_escape(value):
    if isinstance(value, dict):
        value = OrderedDict([
            (safe_escape(k), safe_escape(v)) for k, v in value.items()
        ])
    elif hasattr(value, '__iter__'):
        value = [safe_escape(v) for v in value]
    elif isinstance(value, str):
        value = value.replace('<', '%3C')
        value = value.replace('>', '%3E')
        value = value.replace('"', '%22')
        value = value.replace("'", '%27')
    return value

But why? It turns out strings in Python 2 don’t have the __iter__ method, but they do in Python 3. What happens in Python 3 is that the hasattr(value, '__iter__') condition becomes true, when value is a string. It now iterates over each character in every string in the list comprehension, and calls itself (the recursion part). But… each of those strings (characters) also has the __iter__ attribute, quickly reaching the max recursion depth set by your Python interpreter.

In this function it was easy to fix of course:

  1. Either the order of the two elifs can be swapped
  2. or we exclude strings from the iter-check (elif hasattr(value, '__iter__') and not isinstance(value, str))

The more labour-intensive way of fixing it would be rewriting it entirely, since the only thing it actually really does is recursively URL encoding (but for four characters only). Maybe there’s a (bad) reason it only URL encodes these four characters, so that was a can of worms I didn’t want to open.

Anyway, main lesson for me was: even though Python 2 is gone, you might still need to remember its quirks.

Advent of Code 2020

Lift or slope classifier

The goal of this notebook is to separate ski lifts from ski slopes, using a set of features and an external dataset with the ski lifts of the world (OpenSnowMap). This shouldn’t be too difficult a task, but maybe just difficult enough to justify some feature engineering and training a classifier.

Separating ski lifts from slopes is useful, since the activity’s statistics (average speed, heart rate etc.) can be improved by removing the ski lifts from the data.

My secondary goal is trying out datalore.io.

import json
import os
import struct

from base64 import b64encode
from datetime import timedelta
from pprint import pprint

import iso8601
import numpy
import pandas
import untangle
import geopandas

from cachier import cachier
from shapely.geometry import Point, MultiPoint
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from IPython.core.display import HTML
from shapely.ops import nearest_points
from haversine import haversine, Unit

I’m using the output of a Polar Vantage V sports watch. The output is a GPX file with all the GPS registrations, and a CSV file. This CSV file has the added boolean column Lift, which is hand-labeled data (True for ski-lift; False for snowboarding or schnapps). These files are loaded in the code block below and will be the training set.

For this, I’m using the untangle library, which converts XML into native Python objects.

Parsing the relevant parts from OpenSnowMap is quite heavy, so I serialise it into lifts.jsonl so it doesn’t need to run each time.

[Download lifts.jsonl]

The dataset consists of nodes and ways. A way can have the label “aerialway”, which seem to be the ski lifts. For each way that has this label, I collect all the nodes and save them in JSON lines format. The XML parsing is done using untangle again.

if not os.path.exists('data/lifts.jsonl'):
    opensnowmap = untangle.parse('data/planet_pistes.osm')
    nodes = {node['id']: (node['lat'], node['lon']) for node in opensnowmap.osm.node}
    with open('data/lifts.jsonl', 'w') as w:
        lifts = []
        for way in opensnowmap.osm.way:
            try:
                for tag in way.tag:
                    if tag['k'] == 'aerialway':
                        lifts.append([nodes[nd['ref']] for nd in way.nd])
                        w.write(json.dumps(lifts[-1]))
                        w.write('\n')
                        break
            except AttributeError:
                continue
if os.path.exists('data/lifts.jsonl'):
    lifts = []
    lift_points = []
    with open('data/lifts.jsonl', 'r') as f:
        for i, line in enumerate(f.readlines()):
            skilift = json.loads(line)
            lifts.append(skilift)
            for lat, lon in skilift:
                lift_points.append({'lift_id': i,
                                    'lift_point': Point(float(lat), float(lon))})
    lift_points = geopandas.GeoDataFrame(lift_points)

In the code blocks below, I add some crafted features.

  • speed differences between smoothed and current speed (how constant is the speed)
  • altitude changes (going up is more likely to be a ski lift, although not all ski lifts go up)
  • distance to closest known ski lift
  • (smoothed) alignment with closest known ski lift (TODO)
  • curviness (sinuosity index (of the last 10 seconds))
def sinuosity_index(window):
    source = list(window)
    window = []
    for latlon in source:
        lat, lon = numpy.frombuffer(bytes(latlon), dtype=numpy.float32)
        window.append((lat, lon))
    last_lat, last_lon = window[0]
    first_lat, first_lon = previous_lat, previous_lon = window.pop()
    distance = 0.
    for lat, lon in window:
        distance += haversine((previous_lat, previous_lon), (lat, lon), unit=Unit.METERS)
        previous_lat, previous_lon = lat, lon
    try:
        sinuosity = (haversine((first_lat, first_lon), (last_lat, last_lon), unit=Unit.METERS)) / distance
    except ZeroDivisionError:
        sinuosity = 0.
    return sinuosity
skilifts_multipoint = MultiPoint(lift_points['lift_point'].tolist())

def distance_to_lift(row):
    query, result = nearest_points(Point(float(row['lat']), float(row['lon'])), 
                                   skilifts_multipoint)
    row['Distance to ski lift (meters)'] = haversine(
        (query.x, query.y), 
        (result.x, result.y), 
        unit=Unit.METERS
    )
    return row
class ReadSnowboardingDataset(TransformerMixin, BaseEstimator):

    def __init__(self, sinuosity_window=10, altitude_window=15):
        self.sinuosity_window = sinuosity_window
        self.altitude_window = altitude_window

    def fit(self, X):
        return self.transform(X)

    def transform(self, X):
        snowboarding_datasets = []
        for snowboarding_filename in X:
            snowboarding = pandas.read_csv(snowboarding_filename, skiprows=2)

            trackpoints = untangle.parse(snowboarding_filename.replace(".csv", ".gpx"))
            start_time = iso8601.parse_date(trackpoints.gpx.metadata.time.cdata)
            trackpoints = pandas.DataFrame(
                [
                    {'lat': trackpoint['lat'],
                     'lon': trackpoint['lon'],
                     'latlon': numpy.frombuffer(
                         bytes(numpy.float32(trackpoint['lat'])) + 
                         bytes(numpy.float32(trackpoint['lon'])),
                         dtype=numpy.float64,
                      ),
                     'timestamp': iso8601.parse_date(trackpoint.time.cdata)}
                    for trackpoint in trackpoints.gpx.trk.trkseg.trkpt
                ]
            )

            snowboarding['timestamp'] = snowboarding['Time'].apply(str)
            del snowboarding['Time']

            # TODO proper formatting of timedelta or timestamp using proper utils
            trackpoints['timestamp'] = trackpoints['timestamp'] - start_time
            trackpoints['timestamp'] = trackpoints['timestamp'].apply(
                lambda x: str(x).split('days ')[1][0:8]
            ).apply(str)

            snowboarding = snowboarding.merge(trackpoints, on='timestamp')
            
            snowboarding = snowboarding.apply(distance_to_lift, axis=1)

            snowboarding['Sinuosity index'] = snowboarding['latlon'].rolling(self.sinuosity_window).apply(
                sinuosity_index, raw=True
            )

            # TODO alignment with closest ski lift

            snowboarding['Altitude change (m)'] = snowboarding['Altitude (m)'].diff()
            snowboarding['Altitude change smoothed (m)'] = snowboarding['Altitude change (m)']\
                .rolling(self.altitude_window).mean()
            snowboarding['Speed smoothed (km/h)'] = snowboarding['Speed (km/h)']\
                .rolling(self.altitude_window).mean()

            snowboarding['Absolute speed difference between smoothed and current (km/h)'] = \
                snowboarding['Speed (km/h)'] - snowboarding['Speed smoothed (km/h)']

            snowboarding['Absolute speed difference between smoothed and current (km/h)'] = \
                snowboarding['Speed (km/h)'] - snowboarding['Speed smoothed (km/h)']

            snowboarding['Relative speed difference between smoothed and current (km/h)'] = \
                snowboarding['Absolute speed difference between smoothed and current (km/h)'] / \
                snowboarding['Speed (km/h)']

            snowboarding_datasets.append(snowboarding)
        return pandas.concat(snowboarding_datasets)

Q: What sorcery is happening with the latlon field?

A: pandas currently makes it hard to apply a function on a rolling window, for Series that are non-numeric [1]. The same goes for rolling calculations that need multiple fields [2]. Therefore, I mash two 32 bit floats into a single 64 bit float, and unpack it in the function sinuosity_index.

class SplitFeaturesClass(TransformerMixin, BaseEstimator):

    def fit(self, X):
        return self.transform(X)

    def transform(self, X):
        snowboarding_selection = X[
            ['Altitude change smoothed (m)',
            'Speed (km/h)',
            'Absolute speed difference between smoothed and current (km/h)',
            'Relative speed difference between smoothed and current (km/h)',
            'Distance to ski lift (meters)',
            'Sinuosity index',
            'Lift']].dropna()

        movement_features = snowboarding_selection[
            ['Altitude change smoothed (m)',
            'Speed (km/h)',
            'Absolute speed difference between smoothed and current (km/h)',
            'Relative speed difference between smoothed and current (km/h)',
            'Distance to ski lift (meters)',
            'Sinuosity index']]\
            .replace([numpy.inf, -numpy.inf], 0.)
        is_lift = snowboarding_selection['Lift']

        return movement_features, is_lift

For this project, I wanted the complexity to be in the feature engineering step, and then just fit a very simple model (logistic regression). In the code block below, the data is split in the features and the target classes (X and y respectively in scikit learn terms).

snowboarding_pipeline = Pipeline([
    ('read_snowboarding_dataset', ReadSnowboardingDataset()),
    ('split_features_class', SplitFeaturesClass()),
])
snowboarding_filenames = ['./data/Bart_Broere_2020-02-05_14-56-24.csv']
features, is_lift = snowboarding_pipeline.transform(snowboarding_filenames)
# features = PolynomialFeatures(degree=2, interaction_only=True).fit_transform(features)

model = LogisticRegressionCV(max_iter=10000)
model.fit(X=features, y=is_lift)

cross_validated_scores = cross_val_score(model,
                                         X=features,
                                         y=is_lift,
                                         cv=4)

pprint(list(cross_validated_scores))
print(numpy.mean(cross_validated_scores))
[1.0, 0.993849938499385, 0.9876998769987699, 0.9408866995073891]
0.980609128751386
for column, weight in zip(features.columns, list(model.coef_[0])):
    print(f"{column}: {weight}")
Altitude change smoothed (m): 3.5423181866480538
Speed (km/h): 0.21813147529809632
Absolute speed difference between smoothed and current (km/h): -0.026170639562450405
Relative speed difference between smoothed and current (km/h): 0.22554038490129336
Distance to ski lift (meters): 0.0046837551993429835
Sinuosity index: 0.21784616373482674
model.intercept_[0]
-3.9772632313517495

Although the classifier’s performance is quite good already, its robustness could probably be improved by more labeled data. Currently it’s a classifier that assigns the most value to the altitude change (are we going up?). If we add down-hill ski lifts, this could probably be improved. This hopefully causes weights to shift to features like how constant the speed is. This does require more data labeling, which is boring. Searching for better hyperparameters (like the window for curviness) only makes sense if there’s more training data. With the limited set of training data available now, the hyperparameters can’t be searched reliably.

Ski lifts definitely go down sometimes

O, and I like datalore.io, but I sometimes miss my debugger.

Running a PyPi registry for Windows using Github Actions and Github Pages

“This page is not a pip package index.” https://www.lfd.uci.edu/~gohlke/pythonlibs/

python -m pip install --extra-index-url https://pypi.bartbroe.re <yourpackagehere>

When trying to build Python projects for Windows, I often end up on Christoph Gohlke’s collection of Python wheels for Windows. Most of the time, I can download the package I’m looking for, compiled for Windows, and continue my day. But wouldn’t it be nice if these packages were exposed in a proper Python package index?

The standard for a simple Python package index is really easy. You need one HTML-page with all the packages, containing HTML a-tags for each package, linking to subpages per package. Each of these subpages again should contain a-tags for each provided wheel.

To turn the web page into a package index, you would only need to scrape it, find the packages, find the wheels, and build the new set of html pages.

But… there was obfuscation of the download URLs performed with JavaScript.

function dl1(ml, mi) {
    var ot = "https://download.lfd.uci.edu/pythonlibs/";
    for (var j = 0; j < mi.length; j++) ot += String.fromCharCode(ml[mi.charCodeAt(j) - 47]);
    location.href = ot;
}

function dl(ml, mi) {
    mi = mi.replace('&lt;', '<');
    mi = mi.replace('&#62;', '>');
    mi = mi.replace('&#38;', '&');
    setTimeout(function (l) {
        dl1(ml, mi)
    }, 1500, 1);
}

dl([101,53,106,110,46,105,118,50,115,104,97,100,99,49,116,54,108,51,119,95,112,52,109,113,45,47], 
   "761FC50=H9:@G6363&lt;G;[email protected]&#62;G;[email protected]&#62;EGA42B9E:&#62;D3A8?");
// this triggers a download: https://download.lfd.uci.edu/pythonlibs/s2jqpv5t/ad3-2.2.1-cp36-cp36m-win_amd64.whl

This code, reconstructed in our Python scraper, looks like this:

ml = [101, 53, 106, 110, 46, 105, 118, 50, 115, 104, 97, 100, 99, 
      49, 116, 54, 108, 51, 119, 95, 112, 52, 109, 113, 45, 47]
mi = "761FC50=H9:@G6363&lt;G;[email protected]&#62;G;[email protected]&#62;EGA42B9E:&#62;D3A8?"


def deobfuscate_download_url(ml, mi):    
    mi = mi.replace('&lt;', '<')
    mi = mi.replace('&#62;', '>')
    mi = mi.replace('&#38;', '&')
    output = ''
    for i in range(len(mi)):
        output += chr(ml[ord(mi[i]) - 47])
    return output

print("https://download.lfd.uci.edu/pythonlibs/" + deobfuscate_download_url(ml, mi))
# https://download.lfd.uci.edu/pythonlibs/s2jqpv5t/ad3-2.2.1-cp36-cp36m-win_amd64.whl

And… the server seemed to be checking the User Agent in the request, so we tell it we are Mozilla/5.0 and not something like python-requests/{package version} {runtime}/{runtime version} {uname}/{uname -r}.

Now we have a scraper that can find all packages and wheels in this page, and we build our own package index from this.

Using Github Actions, I planned a periodic run of the scraper, committing back to its own repository. This has the advantage that we can host the package index with Github Pages, which makes this entire thing a free operation.

This is the Github Action that periodically runs:

name: Update PyPi registry
on: 
  schedule:
    - cron:  '25 */4 * * *' # daily cron
jobs:
  build:
    name: Update registry
    runs-on: ubuntu-latest
    steps:
    - uses: actions/[email protected]
    - name: Set up Python 3.7
      uses: actions/[email protected]
      with:
        python-version: 3.7
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install requests
    - name: Remove old package index
      run: |
        mv docs/CNAME ./CNAME
        rm -rf docs/*
        mv ./CNAME docs/CNAME
    - name: Scrape Christoph Gohlke
      run: |
        python scrape.py
    - name: Commit files
      run: |
        git config --local user.email "[email protected]"
        git config --local user.name "PyPi updater"
        git add *
        git commit -m "Update PyPi registry" -a
    - name: Push changes
      uses: ad-m/[email protected]
      with:
        github_token: $

I’m hosting this on pypi.bartbroe.re, untill it eventually breaks, so it’s usable with:

python -m pip install --extra-index-url https://pypi.bartbroe.re <yourpackagehere>

Decompiling and deobfuscating a Zend Guard protected code base

Up untill PHP version 5.6 Zend Guard could be used to obfuscate / compile your PHP source code. It is possible to deobfuscate these code bases.

The easiest way to achieve deobfuscation is by using a PHP runtime that caches opcodes, translating these cache entries back to source code. The repository Zend-Decoder by Tools2 on Github hooks into the lighttpd xcache opcode cacher, and does exactly this.

For my own convenience, I have written a Dockerfile that sets up this workflow. Combined with some bash one liners it’s possible to deobfuscate an entire code base.

Steps to get this running are:

  1. Get the codebase with the Dockerfile: git clone https://github.com/bartbroere/zend-decoder
  2. Obtain a copy of ZendGuardLoader.so and place it in the cloned repository
  3. Build the container docker build -t zenddecoder .
  4. Run the container, with the code base as a bind mount, and drop into a shell docker run -v /path/to/your/codebase:/src -it zenddecoder /bin/bash
  5. Now it’s possible to deobfuscate your entire code base with one-liners like this:
    for f in $(find /src/ -name '*.php'); do php index.php $f > ${f::-4}".dec.php"; done"