Syncing a bokeh plot to a video

The Python library bokeh is great for plotting all kinds of data in the browser. Bokeh includes a serve command, which can host a document, having a per-user state in the associated Python code. When creating a visualisation, you can have callback functions in both Python and JavaScript code. I used this here to sync an HTML video-element to a bokeh line plot.

Accelerometer data synced to a dashcam video:

The HTML video element

<video src="bokeh-video-sync/static/20190912_041033_EF.mp4"
       height="530"
       id="frontcamera"></video>

The JavaScript callback:

/* Select the video with the id frontcamera */
var v = document.getElementById("frontcamera");

/* If the time of the video element updates, run the callback */
v.addEventListener("timeupdate", function () {
    /* The selector below assumes there are no other input fields in your
       code */
    inputs = document.getElementsByTagName('input');
    for (index = 0; index < inputs.length; ++index) {
        /* Update the input field with the new time of the video */
        inputs[index].value = v.currentTime;
        /* Trigger a change of the input field, to call the Python code */
        var event = new Event('change', {bubbles: true});
        inputs[index].dispatchEvent(event);
    }
}, true);

The Python code, with the update callback function:

import pandas

from bokeh.layouts import row
from bokeh.models import ColumnDataSource, TextInput
from bokeh.plotting import curdoc, figure


def update(_, old, new):
    """The callback function we want to invoke if the time in the video
       advances."""
    # Our data source is in milliseconds, but the callback receives seconds,
    # therefore we multiply the input by 1000
    new, old = float(new) * 1000, float(old) * 1000
    subset = df[(df['ms_since_start'] > old) & (df['ms_since_start'] < new)]
    data.stream({
        'ms': list(subset['ms_since_start']),
        'z': list(subset['z_int']),
        'x': list(subset['x_int']),
        'y': list(subset['y_int']),
    })

# Read in the data using pandas
df = pandas.read_csv('./20190912_041033_EF.acc.csv')

# Create a new bokeh figure
p = figure(plot_width=1900, plot_height=400)

# Define the columns to plot later
data = ColumnDataSource({
    'ms': [],
    'z': [],
    'x': [],
    'y': [],
})

# Plot three lines, that listen to changes in the ColumnDataSource
p.line('ms', 'z', source=data, color='red')
p.line('ms', 'x', source=data, color='white')
p.line('ms', 'y', source=data, color='lightgreen')

# Add a TextInput() that we use to pass on the current time in the video
# This is hacky, and could potentially be done in a nicer way
current_time = TextInput()
 
# If the text field current_time changes, invoke the update callback function 
current_time.on_change('value', update)

# Add all elements to the bokeh document
curdoc().add_root(row(p))
curdoc().add_root(row(current_time))

Although I like this first try at syncing plots, there is still room for improvement. It would be nicer to have the video element call the Python function directly, instead of through an input field. This could be achieved by implementing the video element in bokeh.models. Currently, the line will not disappear when you rewind the video, or start it a second time. All these things can be achieved using the HTML5 media events. Maybe I’ll make a proper media player for bokeh one day…

[Drone footage] Warmond

Pay for the cheapest Netflix, get the most premium one

TLDR: In this post I show how to take advantage of Netflix delivering your new subscription before your payment starts. You could do this manually, but of course "it’s more fun to compute".

Netflix has an interesting upgrade flow: Once you upgrade, you get the upgraded plan for the remainder of the billing period. You only start paying your new fee at the start of the new period.

However, if you upgrade, and downgrade in the same billing period, you’ll get the upgraded plan for the remainder of the current billing period. At the start of the new billing period, you’ll be downgraded to your original plan again. Of course, there’s nothing stopping you from doing the same thing again. Therefore, if you repeat this every billing period, you can have the best plan for the lowest price.

This raises a question: “This can’t be intentional, can it?”. After I submitted a short bug report, Netflix replied that it is indeed intended behaviour:

We actually received a similar report previously about this one and [decided] that this is actually an intended functionality.

So… here’s the code, using Selenium:

import logging
from collections import namedtuple

from selenium import webdriver

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

Configuration = namedtuple('Configuration', ['username', 'password'])

config = Configuration(username='username_here',
                       password='password_here')
options = webdriver.ChromeOptions()
options.binary_location = "./headless-chromium"
browser = webdriver.Chrome(executable_path='./chromedriver',
                           chrome_options=options)
browser.implicitly_wait(time_to_wait=10)
browser.get('https://www.netflix.com/ChangePlan')
browser.find_element_by_id('id_userLoginId').send_keys(
    config.username
)
browser.find_element_by_id('id_password').send_keys(
    config.password
)
browser.find_element_by_css_selector(
    'button.btn.login-button.btn-submit.btn-small'
).click()
try:
    message = browser.find_element_by_css_selector(
        'div.ui-message-contents'
    ).text
    logging.info('Page contains infobox (probably stating that Netflix '
                 'has already been upgraded this month')
    logging.info(message)
    logging.info('Nothing to do left')
    quit(0)
except TimeoutError:
    # The upgrade has not been done this month yet, because there's no
    # infobox saying so
    current_plan = browser.find_element_by_css_selector(
        'li.selected > div > h2 > div > div > span.plan-name'
    ).text
    logging.info(f'Currently the {current_plan} is selected')
    plans = browser.find_elements_by_css_selector('span.plan-name')
    # Now we click the premium plan (the exact term here may be
    # language dependent)
    for plan in plans:
        if plan.text == 'Premium':
            plan.click()
    browser.find_element_by_css_selector(
        'button.btn.save-plan-button.btn-blue.btn-small'
    ).click()
    browser.find_element_by_css_selector(
        'button.btn.modal-action-button.btn-blue.btn-small'
    ).click()
    logging.info('Upgraded to Premium')
    # Now we downgrade to our original plan
    browser.get('https://www.netflix.com/ChangePlan')
    for plan in plans:
        if plan.text == current_plan:
            plan.click()
    browser.find_element_by_css_selector(
        'button.btn.save-plan-button.btn-blue.btn-small'
    ).click()
    browser.find_element_by_css_selector(
        'button.btn.modal-action-button.btn-blue.btn-small'
    ).click()
    logging.info("Downgraded to the original plan again")

Of course this trick has to be deployed to AWS Lambda. We can’t be bothered to do this each month 🙈. I am working on that using Selenium and serverless Chrome.

Disclaimer: This code may or may not do what you and I expect. Run it at your own risk. In the worst case, it may actually upgrade your account, without doing the downgrade.

[Drone footage] Kinderdijk in the snow

ETL pipeline out of Polar Flow into Runkeeper, using Serverless

TLDR: In this post, I show how to use the Serverless framework to hastily stitch together two undocumented APIs. Using this current-year-equivalent-of-a-cronjob, I export my data from Polar Flow to Runkeeper. The end result is in the repo polar-flow-to-runkeeper.

The (admittedly first world) problem I faced recently, is that I used Runkeeper in the past, but now my running data gets synced with Polar Flow. Both accounts work well enough, so I would like them to be in sync. Both services don’t provide documented APIs. Like most other (web) apps, there is an undocumented API however.

Undocumented APIs

Using undocumented APIs becomes more and more an alternative to web scraping, since most apps and websites separate content and logic better. Filling in the content into web pages is no longer done server-side, but more and more client-side.

Fair warning: Code that relies on undocumented APIs, is guaranteed to fail at some point. The fact that these APIs are undocumented, means that they can go away, be replaced or change behaviour.

Finding out how to call the API is relatively easy. Looking in the Network tab of the Developer Tools of your favourite browser will most of the time give you all the answers. Look for requests that are marked as XHR and JSON.

Authentication can often be replicated by storing the cookies that are returned by a call to the login page. The library requests has a class Session, which can fully automate this. To authenticate to Polar Flow, all you need to do is extend requests.Session with a call to log in to the service:

from requests import Session

class PolarFlowClient(Session):

    def __init__(self):
        super().__init__()

    def login(self, username, password):
        return self.post('https://flow.polar.com/login',
                         data={"email": username,
                               "password": password,
                               "returnUrl": '/'})

After instantiating PolarFlowClient and calling its login method, you’re good to submit your API requests.

We do something similar for Runkeeper. Check the repository to see the code for that.

Serverless

The Serverless Framework is a tool for deploying code that runs on-demand. The central idea is that you slice code you would like to run into functional units, that can be called individually. This can be a very cost-effective way to run a service, without having to rent cloud machines 24/7. It also offers scheduling functionality, which I use here to sync data periodically.

All big cloud providers have products that are compatible with Serverless functions. For this deployment, I chose AWS Lambda. Amazon’s documentation will guide you through authenticating your development machine and installing the required tooling.

I copied some code from the examples repo provided by the Serverless Framework. After modifying serverless.yml, it looked like this:

service: polar-flow-to-runkeeper  # How the service will be known
frameworkVersion: ">=1.2.0 <2.0.0"  # Provided by the example ¯\_(ツ)_/¯
plugins:
  - serverless-python-requirements  # Makes sure requirements are installed
provider:
  name: aws  # I use AWS Lambda in this project
  runtime: python3.7  # Python >3.6 for the f-strings :-)
  memorySize: 256  # Overwrite the default memory size. Default is 1024 (MB).
  timeout: 60  # It's important to set this higher than you actually expect 
               # the function to run (in seconds).
functions:
  cron:  # 'cron' is the name of the function, you could list more below
    handler: handler.run  # Path to the function
    events:
      - schedule: rate(30 minutes)  # Run every thirty minutes
                                    # (not a suggested workout schedule)

The pipeline

Syncing is as simple as looping over the entries in Polar Flow and uploading each one to Runkeeper if it doesn’t exist in our list of already uploaded entries. That looks something like this:

# Import statements have been omitted.
# The objects flow and runkeeper are authenticated API clients.
# synced_runs is a list we keep in MongoDB (MLab has a free tier)
year = datetime.datetime.now().year
activities = flow.get('https://flow.polar.com/training/getCalendarEvents',
                      params={'start': f'01.01.{year}',
                              'end': f'31.12.{year}'}).json()
activities = filter(lambda x: x['listItemId'] not in synced_runs, 
                    activities)
for activity in activities:
    tcx_export = flow.get(
        'https://flow.polar.com/api/export/training/tcx/' +
        str(activity['listItemId'])
    )
    response = runkeeper.post(
        'https://runkeeper.com/trackMultipleFileUpload',
        data={'handleUpload': 'handleUpload'},
        files={'trackFiles': ('import.tcx', tcx_export.text,
                              'application/octet-stream')}
    )
    synced_runs.append(activity['listItemId'])
# afterwards, synced_runs is put back into the database

Now that everything has been put together, it can be deployed, by typing serverless deploy.

Data quality issues with Runkeeper’s importer

Using Runkeeper’s importer unfortunately means you lose some data quality:

  • Looking into the response provided by Runkeeper shows lots of activities can’t actually be imported (swimming for example).

  • Runkeeper also flattens your heart rate, which sounds really scary, but just means it sets the heart rate to your average across the entire activity.

  • Given the same set of GPS points as Polar Flow, Runkeeper could calculate a different distance.

Final thoughts

Playing around with Serverless is cool if there’s not much at stake. The synchronization of Polar Flow and Runkeeper fits comfortably in Amazon’s free tier. The tooling is intuitive, and there are a lot of provided examples.

An important concern I have, are the costs of using Serverless. It starts out cheap, but Serverless functions have a great potential to become a very expensive hobby when building larger projects: Eventually, you might want Serverless functions that call other Serverless functions. You’ll need a very good overview of your system to avoid infinite or circular loops. The only exit condition might be your credit card bottoming out.