Cleaning up a required process when you quit

Some time ago, I moved a bunch of our devops tools into a Makefile. I’m not sure exactly what the benefit was then, but there have been some challenges.

One of these was related to having to run multiple processes when performing a task, but cleaning up the dependencies when we finish our task. Specifically, we run a websocket server in production that uses Redis for PubSub. We can safely assume the developer is running redis already, since the rest of the platform also depends upon it. However, to collect the changes from Postgres and publish them into Redis, we use a custom django management command. All this does is listen for postgres notifications and forward them to redis.

$ ./manage.py relay

Mostly we don’t keep this running in development, unless you need to be running the websocket code locally for testing. It is a hassle to remember to start a process and then kill it when you are done, so it would be convenient to start the relay command, then start our websocket worker, and clean up both processes when we quit the worker.

This can be done in a Makefile command:

websocket:
    $(shell trap 'kill 0' SIGINT ; \
        ./manage.py relay & \
        gunicon \
          -k geventwebsocket.gunicorn.workers.GeventWebsocketWorker \
          -workers=1 \
          -reload \
          webocket:application \
    )

The trap command performs the magic - it will kill the background command when the gunicorn worker is killed with Ctrl-C.

I have some other commands which start the background process, and then use kill -0 $(shell ps ax | grep ... | head -n 1 | cut -d ' ' -f 1) to ensure they kill the process on quit, but this sometimes does not work: I should get around to changing them over to this…

ESPHome variables from Home Assistant

One of my key tenets of Home Automation is that as much stuff should be done locally as possible. Whilst with HomeAssistant, that means “in the local network”, I feel that “in the device” is even better. However, sometimes values should still be configurable (from within Home Assistant), but the device should still work even if there is not (and has never been) a connection to Home Assistant since boot.

For instance, I have a sensor light. The device contains a PIR sensor, with configurable delay, sensitivity and lux settings. This in turn drives a relay connected to a pair of PAR bulbs, and additionally has a green LED. This device usually only has the ESP8266 controlling if the thing should ever turn on at all, but I hacked it so that it exposes the PIR sensor, and allows triggering the relay.

But the delay between the motion no longer being detected and the light turning off should be configurable. I don’t want to have to re-flash the device if I decide to change this value, so I wanted to make it that the value will be fetched from Home Assistant (if it is set).

This turned out to work really well. There are a few parts that need to be set up in the YAML (this is not the complete file):

esphome:
  name: ${device_name}
  platform: ESP8266
  board: esp01_1m
  on_boot:
    - logger.log:
        level: DEBUG
        format: 'Light ON time is set to %d'
        args: ['id(light_on_time)']

globals:
  - id: light_on_time
    type: int
    restore_value: true
    initial_value: '30'

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        - globals.set:
            id: light_on_time
            value: !lambda 'return int(x);'

It really is that simple. The first boot will set the light_on_time variable to 30. Then, when it connects to Home Assistant, it will look for an input_number.<device_name> (which matches the device name). If it finds one (or is ever told about this value changing), then it will commit that new value to the flash, and this will be be restored after a reboot.

There is one other thing we could do here, to make it so that we don’t write the value to the flash if it has not changed (and prevent wear to that, since it is limited to a number of writes):

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        if:
          condition:
            lambda: 'return id(light_on_time) != int(x);'
          then:
            - logger.log:
                level: DEBUG
                format: 'Light ON time changed from %d to %d seconds'
                args: ['id(light_on_time)', 'int(x)']
            - globals.set:
                id: light_on_time
                value: !lambda 'return int(x);'

With regards to a similar problem, detecting if it is dark enough to turn the light on should be something like “a bit before sunset to a bit after sunrise”. I could set the lux threshold on the device, but it would be nice to have motion detection work during the day too.

Here, we can have a global variable sun_is_down that defaults to being true, and is only set to false when the sun comes up. However, we would also want this to trigger when first connecting to Home Assistant and the sun is already up.

We can use a wait_until in our on_boot handler to trigger this:

globals:
  - id: sun_is_down
    type: bool
    restore_value: false
    initial_value: 'true'

esphome:
  ...
  on_boot:
    - wait_until:
        api.connected:
    - if:
        condition:
          sun.is_above_horizon:
        then:
          - logger.log:
              level: DEBUG
              format: 'Sun is up, light will not turn on.'
          - globals.set:
              id: sun_is_down
              value: 'false'
        else:
          - logger.log:
              level: DEBUG
              format: 'Sun is down, light will turn on.'

sun:
  on_sunrise:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now down, light will turn on.'
        - globals.set:
            id: sun_is_down
            value: 'false'
  on_sunset:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now up, light will not turn on.'
        - globals.set:
            id: sun_is_down
            value: 'true'

I’ve used a 10° elevation to trigger the light to turn on a bit before sunset and still turn on a bit after sunrise when motion is detected. I haven’t figured out a way to get the same check to apply in the sun.is_above_horizon: check.

Mute the rest of this track.

Working from home means that I don’t use my AirPods as much as I used to: I’m happy to mostly listen to the music my partner (who has worked from home for over a decade) wants to listen to. Lately, that has been NonStop80s.

We play this through the IKEA Symphonisk stereo pair in the dining room, several metres away from the office. The sound is good, and it’s nice not having it come out of the computer speaker.

Because I’m a nut, I have all of this hooked up to Home Assistant, and have one nice automation that will turn off the music that is playing on that speaker when the TV in that room is turned on.

Sometimes the tracks that are played on NonStop80s are really bad. For instance, Snooker Loopy, by Chas and Dave is an absolute piece of shit song. If it weren’t a streaming station, I’d just hit skip.

What would be nice is to be able to have the speakers stop playing, and start playing again when the next track comes on.

Enter “Mute the rest of this track”.

alias: Mute the rest of this track
sequence:
  - condition: state
    entity_id: media_player.dining_room_sonos
    state: playing
  - condition: template
    value_template: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: true
    entity_id: media_player.dining_room_sonos
  - delay: '1'
  - wait_for_trigger:
      - platform: state
        entity_id: media_player.dining_room_sonos
    timeout: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: false
    entity_id: media_player.dining_room_sonos
mode: single
icon: 'mdi:volume-mute'
description: Mute the media player until the media_title attribute changes.
fields:
  timeout:
    description: Amount of time to stay muted if the track does not change
    example: 300

This is a script that you can use in Home Assistant to do just that. It will (iff the dining_room_sonos is unmuted, and currently playing), mute that speaker until something else on the speaker changes.

This could be a track change, or a volume change, or anything else. It’s deliberately loose, so that if it was muted, and someone changes the volume, it unmutes immediately.

After a configurable (default 300 seconds) timeout, it will unmute.

I was trying to make it so that you could provide the named media_player entity, but there still seem to be places where you can’t use templates (but need to provide the string directly). I was able to get the conditions to use the variable, but was not able to figure out how to use them in the service calls…and to be honest, I don’t really need it to apply to other players.

Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400.

However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.

Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.

We will start with a model to store our task:

class ScheduledTask(models.Model):
    task_name = models.TextField()
    task_args = models.JSONField(default=list)
    task_kwargs = models.JSONField(default=dict)
    due_date = models.DateTimeField()

    objects = ScheduledTaskQuerySet.as_manager()

    @property
    def task(self):
        module, task = task_name.rsplit('.', 1)
        return getattr(importlib.import_module(module), task)

    def execute(self):
        self.task.apply_async(args=self.args, kwargs=self.kwargs)

We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:

class ScheduledTaskQuerySet(models.query.QuerySet):
    def due(self):
        return self.due_date < datetime.datetime.utcnow()

    def schedule(self, task, when, *args, **kwargs):
        self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)

Finally, we need a task that will enqueue the due tasks:

@app.task
def enqueue_due_tasks():
    for task in ScheduledTask.objects.due():
        task.execute()
        task.delete()

As it stands, with this code, to schedule a task, you need to create a model instance:

ScheduledTask.objects.schedule(
    'foo.tasks.bar',
    datetime.datetime(2525, 11, 11, 9, 30),
    'x',
    y='z'
)

But, it would be nice if we could use the task to schedule itself:

foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)

Or, even better:

foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))

The first one we should be able to do by using custom tasks (and implementing a schedule method):

class Scheduled(celery.Task):
    def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
        if not eta and not countdown:
            raise ValueError('One of eta and countdown must be supplied')
        if eta and countdown:
            raise ValueError('Only one of eta and countdown must be supplied')
        if eta:
            ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
        else:
            ScheduledTask.objects.schedule(
                self.name,
                datetime.datetime.utcnow() + datetime.timedelta(countdown),
                *args, **kwargs
            )

Then, as long as a task is defined as using the base class, we can schedule it:

@app.task(base=Schedule)
def task_name(x, y=None):
    pass

But what about mucking with the .s() or .signature() calls? Now we are getting a bit experimental, but it still might be fun:

from celery.canvas import Signature

def schedule(self, when=None):
    if when:
        if isinstance(when, datetime.timedelta):
            when = datetime.datetime.utcnow() + when
    else:
        if self.options.countdown:
            when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
        elif self.options.eta:
            when = self.otions.eta

    ScheduledTask.objects.create(
        task_name=self.task,
        task_args=self.args,
        task_kwargs=self.kwargs,
        due_date=when,
    )

Signature.schedule = schedule

This is totally mostly untested, and I’m not sure it’s a really good idea.

A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.

Logging WiFi client Signal Strength

One of the cool things about the old macOS version of Airport Utility is that it would show you the WiFi signal of the various connected clients. At some point, they removed this feature, bringing it down to where the iOS app is in terms of functionality.

Whilst I still have a Mac that can (only) run that version next to me on the desk, mostly I don’t use that, and having to switch computers and open it up is a pain. It would be really nice if I could get that data in a more configurable format (possibly with the client data from multiple base stations).

It turns out that this data is available over SNMP, but you need the old version of Airport Utility to enable it. And, as we shall see shortly, there is another caveat too.

Before we get to that though, my main router is actually an OpenWRT device, so it might be nice to grab the handful of clients that connect to that (it being in the garage and all).

We can get a list of the WiFi interfaces using iwinfo | awk '/ESSID/ {print $1}'. In my case, that gives me wlan0 and wlan1, but the values are not relevant, other than the fact we will need them later.

Now, for each each WiFi interface, we want to get the MAC address, which we can get similarly: iwinfo $iw assoclist | awk '/Access Point:/ {print $3}.

It might be nice to also get the “connection speed”, although in some cases this is not known. In those cases, we want to use null, since we’ll have a numeric value otherwise.

  iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
  else
   print $3
  }'

Okay, now we are ready to get the signal/noise/MAC data:

  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash

Oh, there’s a bit in there, we might unpack that a bit:

We get the list of associated clients, and then pipe that through awk. To make it easier, we want to stick our previous environment variable captures of our rate and MAC into awk variables, and then we grab the relevant columns, and build it all up into a shell command that will look something like:

mosquitto_pub -t "wifi-signal/98:AF:65:E3:16:F1"  -m '{
        "signal":-70,
        "noise":-102,
        "rate":null,
        "station":"B0:95:75:E8:C8:32"
      }' -h mqtt.lan

Because awk has no way to escape a single quote, we need to use \x27 to get it to emit one of them.

Finally, this is passed to bash to execute the command(s).

And, putting this all together:

#! /bin/sh

for iw in $(iwinfo | awk '/ESSID/ {print $1}')
do
  MAC=$(iwinfo $iw info | awk '/Access Point:/ {print $3}')
  RATE=$(iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
else
   print $3
 }')
  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash
done

Okay, so that’s the easy one. What about getting the data from SNMP?

Since this code is running not on a Mac, I can’t rely on the MIB data being available. Instead, we’ll use the raw numeric codes. It turns out that we can get all of the data we need (and then some!) using one command:

snmpwalk -v 2c -c public hostname.local iso.3.6.1.4.1.63.501.3.2.2.1

This outputs a whole bunch of data - the ones we care about have the values 1, 6, 7 and 8 in position 13, so we can extract those values. But, we want to retain a bit more context, so we can sort the data and ensure we have a consistent ordering to our data: MAC address, signal, noise and then rate for each client. To ensure we have a stable sorting, we need to get the last two parts of the MIB (which seemed to be enough in my case to generate a unique value), and then the 13th column. Then we want to get the “value” that was returned for this row. If it was a STRING value, then it will already have quotes around it.

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] '

This will build up the data in the correct format, from here we now need to put each client’s data on a single line:

  snmpwalk ...
    | <as-above>
    | awk '/^.. .. 1/{ if (NR!=1) print "";}{printf $4 " "}'

And then turn it into a string that we’ll be able to execute with bash (like before). Putting it all together:

  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpwalk -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.2.1 \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash

Note that I also needed to check that there was a signal value - one of the stations was reporting this as empty, so we skip over any that don’t have a signal value reported.

But… this doesn’t quite work. I mean, it works, but for some reason (after having left it running every minute overnight), it never updates the signal/noise/rate data. It seems that the Airport Express/Extreme SNMP data is not refreshed.

You can, however, force it to update by asking for the number of connected clients!

snmpget -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.1.0

Thus, our full script can look like:

#! /bin/bash

stations="study.lan
bedroom.lan
dining-room.lan"

CLIENT_COUNT=iso.1.3.6.1.4.1.63.501.3.2.1.0
WIFI_DATA=iso.3.6.1.4.1.63.501.3.2.2.1

for station in ${stations}
do
  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpget -v 2c -c public $station $CLIENT_COUNT > /dev/null

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash
done

These scripts could be more configurable - I’ve hard-coded the MQTT broker name, for instance, but that will be fine for my purposes. I’m also not actually doing anything with this data yet - I will eventually look at pulling this into Home Assistant or somewhere else, and graphing the data.

Locking rows in PostgreSQL

Someone asked a question today in #postgresql about locking rows for a long running task, where multiple tasks could be updating rows. There is a flag in each row that indicates whether a row has been processed, but not one indicating if it is currently being processed.

One solution is to use advisory locks to mark that we are currently working on a given row. The advantage of this over having a column that indicates it is currently being processed (or mis-using the “processed” column by setting it before we’ve finished building the data) is that if our task that generates the data fails, the lock is released when the session is closed.

So, let’s look at the things we need to make this happen. We’ll start with something that represents our long-running process:

CREATE OR REPLACE FUNCTION expensive_operation(INTEGER)
RETURNS JSONB AS $$

  BEGIN
    PERFORM pg_sleep(10);
    RETURN JSONB_BUILD_OBJECT(
        'foo', 'bar',
        'baz', $1
    );
  END;

$$ LANGUAGE plpgsql;

And we need our table to put the tasks in (and some unprocessed rows):

CREATE TABLE tasks (
    id SERIAL PRIMARY KEY,
    processed BOOLEAN DEFAULT FALSE,
    data JSONB
);

INSERT INTO tasks (data) VALUES (NULL), (NULL), (NULL);

Finally, we are ready to process our tasks.

WITH next_task AS (
  SELECT t.id, pg_advisory_lock(t.id)
    FROM (
      SELECT id
        FROM tasks
       WHERE NOT processed
         AND id NOT IN (SELECT objid
                          FROM pg_locks
                         WHERE locktype = 'advisory')
       LIMIT 1
    ) t
)
UPDATE tasks
   SET data = expensive_operation(id),
       processed = true
 WHERE id IN (SELECT id FROM next_task)
   AND NOT processed;

The CTE will get a task.id that has not been processed (and is not currently being processed). It will take an advisory lock on that value, which marks the task as in-progress.

Then, when it has the lock, it performs the expensive operation and updates the data (and the processed status).

I’m not totally sure if the AND NOT processed at the bottom of the query is necessary - it seems to me that there could be a situation where two workers try to get the lock at the same time (and therefore are both asking for the same lock), and one of them gets it. The other one, when it finally gets the lock, would now see that row as processed, and so would not update anything.

I’m happy to see a better way to do this.

RobotFramework, Chromedriver and Docker

One of my team implemented RobotFramework support for automated browser testing of our platform a while ago. At the time, we were using Codeship Basic, and I built a helper to run a robot test suite within a tox environment. It was all good, because chromedriver and all it’s dependencies were already installed.

But time passes, and we needed to move to Codeship Pro. Which has some neater features, but required me to build docker images for everything. We already use docker for deployment, but I didn’t really want to build a bunch of distinct images just for testing that re-implemented the same stuff that we have in our deployment images. Even just appending new stuff to them means that things could turn out to be a pain in the arse to manage.

And getting chromedriver installed into a docker image is not neat.

I did find a docker image that just has an instance of chromedriver, and exposes that. But getting that to work with robot was still a bunch of work. After much experimentation, I was able to get the connections between everything to work.

First, we need to have the chromedriver container running:

$ docker run -p 4444:4444 CHROMEDRIVER_WHITELISTED_IPS='' robcherry/docker-chromedriver:latest

Then, there are a few moving parts that need to be in place to get things to work. Using my djangobot management command (which I had to extend a bit here), a single command can be used to spin up a Django runserver command, apply migrations (if necessary), and then run the robot commands. The trick is you need to teach Robot to speak to the remote WebDriver instance, which then in turn speaks to the running django webserver.

First, the RobotFramework commands; my resource.robot file which is referenced by all of my robot test suites contains:

*** Variables ***

${HOSTNAME}         127.0.0.1
${PORT}             8000
${SCHEME}           http
${SERVER}           ${SCHEME}://${HOSTNAME}:${PORT}
${BROWSER}          headlesschrome
${TIMEOUT}          30
${REMOTE_URL}

*** Settings ***

Documentation   A resource file with reusable keywords and variables.
Library         SeleniumLibrary             timeout=${TIMEOUT}      implicit_wait=1
Library         Collections
Library         DebugLibrary
Library         DateTime
Library         String
Library         djangobot.DjangoLibrary     ${HOSTNAME}     ${PORT}

*** Keywords ***

Create Remote Webdriver
    ${chrome_options} =     Evaluate    sys.modules['selenium.webdriver'].ChromeOptions()    sys, selenium.webdriver
    Call Method    ${chrome_options}   add_argument    headless
    Call Method    ${chrome_options}   add_argument    disable-gpu
    Call Method    ${chrome_options}   add_argument    no-sandbox
    ${options}=     Call Method     ${chrome_options}    to_capabilities

    Create Webdriver    Remote   command_executor=${REMOTE_URL}    desired_capabilities=${options}
    Open Browser    ${SERVER}   ${BROWSER}  remote_url=${REMOTE_URL}    desired_capabilities=${options}

Start Session
    Run Keyword If  '${REMOTE_URL}'    Create Remote Webdriver
    Run Keyword If  '${REMOTE_URL}' == ''    Open Browser    ${SERVER}   ${BROWSER}

    Set Window Size     2048  2048
    Fetch Url       login
    Add Cookie      robot   true

    Register Keyword To Run On Failure    djangobot.DjangoLibrary.Dump Error Data

End Session
    Close Browser

Logout
    Fetch Url     logout

Notice that the Start Session keyword determines which type of browser to open - either a local or remote one.

Thus, each *.robot file starts with:

*** Settings ***
Resource                resource.robot
Suite Setup             Start Session
Suite Teardown          End Session
Test Setup              Logout

Because the requests will no longer be coming from localhost, you need to ensure that your runserver is listening on the interface the requests will be coming from. If you can’t detect this, and your machine is not exposed to an insecure network, then you can use 0.0.0.0 to get the django devserver to listen on all interfaces. You will also need to supply the hostname that you will be using for the requests (which won’t be localhost anymore), and ensure this is in your Django settings.ALLOWED_HOSTS.

In my case, I needed to make my robot command allow all this, but ultimately I can now do:

$ ./manage.py robot --runserver 0 \
                    --listen 0.0.0.0 \
                    --hostname mymachine.local \
                    --remote-url http://localhost:4444 \
                    --include tag

This runs against the database I already have prepared, but in my codeship-steps.yml I needed to do a bit more, and hook it up to the other containers:

coverage run --branch --parallel \
    /app/manage.py robot --migrate \
                         --server-url=http://web.8000 \
                         --remote-url=http://chromedriver:4444 \
                         --tests-dir=/app/robot_tests/ --output-dir=/coverage/robot_results/ \
                         --exclude skip  --exclude expected-failure

Now, if only Codeship’s jet tool actually cached multi-stage builds correctly.

Maybe I neeed to try this.

Update value only if present

We have a bunch of integrations with external systems, and in most of these cases we are unable to use Oauth, or other mechanisms that don’t require us to store a username password pair. So, we have to store that information (encrypted, because we need to use the value, rather than just being able to store a hashed value to compare an incoming value with).

Because this data is sensitive, we do not want to show this value to the user, but we do need to allow them to change it. As such, we end up with a form that usually contains a username and a password field, and sometimes a URL field:

class ConfigForm(forms.ModelForm):
    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

But this would show the password to the user. We don’t want to do that, but we do want to allow them to include a new password if it has changed.

In the past, I’ve done this on a per-form basis by overridding the clean_password method:

class ConfigForm(forms.ModelForm):
    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

    def clean_password(self):
        return self.cleaned_data.get('password') or self.instance.password

But this requires implementing that method on every form. As I mentioned before, we have a bunch of these. And on at least one, we’d missed this method. We could subclass a base form class that implements this method, but I think there is a nicer way.

It should be possible to have a field that handles this. The methods that look interesting are clean, and has_changed. Specifically, it would be great if we could just override has_changed:

class WriteOnlyField(forms.CharField):
    def has_changed(self, initial, data):
        return bool(data) and initial != data

However, it turns out this is not used until the form is re-rendered (or perhaps not at all by default, it’s very likely my code calls this to get a list of changed fields to mark as changed as a UI affordance).

The clean method in a CharField does not have access to the initial value, and there really is not a nice way to get this value attached to the field (other than doing it in the has_changed method, which is not called).

But it turns out this behaviour (apply changes only when a value is supplied) is the same behaviour that is used by FileField: and as such, it gets a special if statement in the form cleaning process, and is passed both the initial and the new values.

So, we can leverage this and get a field class that does what we want:

class WriteOnlyField(forms.CharField, forms.FileField):
    def clean(self, value, initial):
        return value or initial

    def has_changed(self, initial, data):
        return bool(data) and initial != data

We can even go a bit further, and rely on the behaviour of forms.PasswordInput() to hide the value on an unbound form:

class WriteOnlyField(forms.CharField, forms.FileField):
    def __init__(self, *args, **kwargs):
        defaults = {
            'widget': forms.PasswordInput(),
            'help_text': _('Leave blank if unchanged'),
        }
        defaults.update(**kwargs)
        return super().__init__(*args, **defaults)

    def clean(self, value, initial):
        return value or initial

    def has_changed(self, initial, data):
        return bool(data) and initial != data

Then we just need to override that field on our form definition:

class ConfigForm(forms.ModelForm):
    password = WriteOnlyField()

    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

Please note that this technique should not be used in the situation where you don’t need the user to be able to change a value, but instead just want to render the value. In that case, please omit the field from the form, and just use `` instead - you can even put that in a disabled text input widget if you really want it to look like the other fields.


I also use a JavaScript affordance on all password fields that default to hiding the value, but allows clicking on a control to toggle the visibility of the value: UIkit Password Field.

Preventing Model Overwrites in Django and Postgres

I had an idea tonight while helping someone in #django. It revolved around using a postgres trigger to prevent overwrites with stale data.

Consider the following model:

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

If we had two users attempting to update a given instance at around the same time, Django would fetch whatever it had in the database when they did the GET request to fetch the form, and display that to them. It would also use whatever they sent back to save the object. In that case, the last update wins. Sometimes, this is what is required, but it does mean that one user’s changes would be completely overwritten, even if they had only changed something that the subsequent user did not change.

There are a couple of solutions to this problem. One is to use something like django-model-utils FieldTracker to record which fields have been changed, and only write those back using instance.save(update_fields=...). If you are using a django Form (and you probably should be), then you can also inspect form.changed_data to see what fields have changed.

However, that may not always be the best behaviour. Another solution would be to refuse to save something that had changed since they initially fetched the object, and instead show them the changes, allow them to update to whatever it should be now, and then resubmit. After which time, someone else may have made changes, but then the process repeats.

But how can we know that the object has changed?

One solution could be to use a trigger (and an extra column).

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    _last_state = models.UUIDField()

And in our database trigger:

CREATE EXTENSION "uuid-ossp";

CREATE OR REPLACE FUNCTION prevent_clobbering()
RETURNS TRIGGER AS $prevent_clobbering$

BEGIN
  IF NEW._last_state != OLD._last_state THEN
    RAISE EXCEPTION 'Object was changed';
  END IF;
  NEW._last_state = uuid_generate_v4();
  RETURN NEW;
END;

$prevent_clobbering$
LANGUAGE plpgsql STRICT IMMUTABLE;

CREATE TRIGGER prevent_clobbering
BEFORE UPDATE ON person_person
FOR EACH ROW EXECUTE PROCEDURE prevent_clobbering();

You’d also want to have some level of handling in Django to capture the exception, and re-display the form. You can’t use the form/model validation handling for this, as it needs to happen during the save.

To make this work would also require the _last_state column to have a DEFAULT uuid_generate_v4(), so that newly created rows would get a value.


This is only a prototype at this stage, but does work as a mechanism for preventing overwrites. As usual, there’s probably more work in the application server, and indeed in the UI that would need to be required for displaying stale/updated values.

What this does have going for it is that it’s happening at the database level. There is no way that an update could happen (unless the request coming in happened to guess what the new UUID was going to be).

What about drawbacks? Well, there is a bit more storage in the UUID, and we need to regenerate a new one each time we save a row. We could have something that checks the other rows looking for changes.

Perhaps we could even have the hash of the previous row’s value stored in this field - that way it would not matter that there had been N changes, what matters is the value the user saw before they entered their changes.

Another drawback is that it’s hard-coded to a specific column. We could rewrite the function to allow defining the column when we create the trigger:

CREATE TRIGGER prevent_clobbering
BEFORE UPDATE ON person_person
FOR EACH ROW EXECUTE PROCEDURE prevent_clobbering('_last_state_');

But that requires a bit more work in the function itself:

CREATE OR REPLACE FUNCTION prevent_clobbering()
RETURNS TRIGGER AS $prevent_clobbering$

BEGIN
  IF to_jsonb(NEW)->TG_ARGV[0] != to_jsonb(OLD)->TG_ARGV[0] THEN
    RAISE EXCEPTION 'Object was changed';
  END IF;
  NEW._last_state = uuid_generate_v4();
  RETURN NEW;
END;

$prevent_clobbering$
LANGUAGE plpgsql STRICT IMMUTABLE;

Django properties from expressions, or ComputedField part 2

I’ve discussed the concept of a ComputedField in the past. On the weekend, a friend pointed me towards SQL Alchemy’s Hybrid Attributes. The main difference here is that in a ComputedField, the calculation is always done in the database. Thus, if a change is made to the model instance (and it is not yet saved), then the ComputedField will not change it’s value. Let’s look at an example from that original post:

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    display_name = ComputedField(
        Concat(F('first_name'), Value(' '), F('last_name')),
        output_field=models.TextField()
    )

We can use this to query, or as an attribute:

Person.objects.filter(display_name__startswith='foo')
Person.objects.first().display_name

But, if we make changes, we don’t see them until we re-query:

person = Person(first_name='Fred', last_name='Jones')
person.display_name  # This is not set

So, it got me thinking. Is it possible to turn a django ORM expression into python code that can execute and have the same output?

And, perhaps the syntax SQL Alchemy uses is nicer?

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

    @shared_property
    def display_name(self):
        return Concat(
            F('first_name'),
            Value(' '),
            F('last_name'),
            output_field=models.TextField(),
        )

The advantage to using the decorator approach is that you could have a more complex expression - but perhaps that is actually a disadvantage. It might be nice to ensure that the code can be turned into a python function, after all.


The first step is to get the expression we need to convert to a python function. Writing a python decorator will give us access to the “function” object - we can just call this, as long as it does not refer to self at all, this can be done without an instance:

class shared_property(object):
    def __init__(self, function):
        expression = function(None)

This gives us the expression object. Because this is a python object, we can just look at it directly, and turn that into an AST. Having a class for parsing this makes things a bit simpler. Let’s look at a parser that can handle this expression.

import ast


class Parser:
    def __init__(self, function):
        # Make a copy, in case this expression is used elsewhere, and we change it.
        expression = function(None).copy()
        tree = self.build_expression(expression)
        # Need to turn this into code...
        self.code = compile(tree, mode='eval', filename=function.func_code.co_filename)

    def build_expression(self, expression):
        # Dynamically find the method we need to call to handle this expression.
        return getattr(self, 'handle_{}'.format(expression.__class__.__name__.lower()))(expression)

    def handle_concat(self, concat):
        # A Concat() contains only one source expression: ConcatPair().
        return self.build_expression(*concat.get_source_expressions())

    def handle_concatpair(self, pair):
        left, right = pair.get_source_expressions()
        return ast.BinOp(
            left=self.build_expression(left),
            op=ast.Add(),
            right=self.build_expression(right),
        )

    def handle_f(self, f):
        # Probably some more work here around transforms/lookups...
        # Set this, because without it we get errors. Will have to
        # figure out a better way to handle this later...
        f.contains_aggregate = False
        return ast.Attribute(
            value=ast.Name(id='self'),
            attr=f.name,
        )

    def handle_value(self, value):
        if value.value is None:
            return ast.Name(id='None')

        if isinstance(value.value, (str, unicode)):
            return ast.Str(s=value.value)

        if isinstance(value.value, (int, float)):
            return ast.Num(n=value.value)

        if isinstance(value.value, bool):
            return ast.Name(id=str(value.value))

        # ... others?
        raise ValueError('Unable to handle {}'.format(value))

There’s a bit more “noise” required in there (every node must have a ctx, and a filename, lineno and col_offset), but they make it a bit harder to follow.

So, we have our expression, and we have turned that into an equivalent python expression, and compiled it…except it won’t compile. We need to wrap it in an ast.Expression(), and then we can compile it (and call it).

Roughly, we’ll end up with a code object that does:

self.first_name + (' ' + self.last_name)

We can call this with our context set:

eval(code, {'self': instance})

But, before we head down that route (I did, but you don’t need to), it’s worth noticing that not all ORM expressions can be mapped directly onto a single python expression. For instance, if we added an optional preferred_name field to our model, our display_name expression may look like:

@shared_property
def display_name(self):
    return Case(
        When(preferred_name__isnull=True, then=Concat(F('first_name'), Value(' '), F('last_name'))),
        When(preferred_name__exact=Value(''), then=Concat(F('first_name'), Value(' '), F('last_name'))),
        default=Concat(F('first_name'), Value(' ('), F('preferred_name'), Value(') ') F('last_name')),
        output_field=models.TextField()
    )

Since this will roughly translate to:

@property
  def display_name(self):
      if all([self.preferred_name is None]):
          return self.first_name + ' ' + self.last_name
      elif all([self.preferred_name == '']):
          return self.first_name + ' ' + self.last_name
      else:
          return self.first_name + ' (' + self.preferred_name + ') ' + self.last_name

Whilst this is still a single ast node, it is not an expression (and cannot easily be turned into an expression - although in this case we could use a dict lookup based on self.preferred_name, but that’s not always going to work). Instead, we’ll need to change our code to generate a statement that contains a function definition, and then evaluate that to get the function object in the context. Then, we’ll have a callable that we can call with our model instance to get our result.

There are a few hitches along the way though. The first is turning our method into both a private field and a property. That is the relatively straightforward part:

class shared_property:
    def __init__(self, function):
        self.parsed = Parser(function)
        context = {}
        eval(self.parsed.code, context)
        self.callable = context[function.func_code.co_name]

    def __get__(self, instance, cls=None):
        # Magic Descriptor method: this method will be called when this property
        # is accessed on the instance.
        if instance is None:
            return self
        return self.callable(instance)

    def contribute_to_class(self, cls, name, private_only=False):
        # Magic Django method: this is called by django on class instantiaton, and allows
        # us to add our field (and ourself) to the model. Mostly this is the same as
        # a normal Django Field class would do, with the exception of setting concrete
        # to false, and using the output_field instead of ourself.
        field = self.parsed.expression.output_field
        field.set_attributes_from_name(name)
        field.model = cls
        field.concrete = False
        # This next line is important - it's the key to having everything work when querying.
        field.cached_col = ExpressionCol(self.parsed.expression)
        cls._meta.add_field(field, private=True)
        if not getattr(cls, field.attname, None):
            setattr(cls, field.attname, self)

There are a few things to note in that last method.

  • We use the output_field from the expression as the added field.
  • We mark this field as a private, non-concrete field. This prevents django from writing it back to the database, but it also means it will not appear in a .values() unless we explicitly ask for it. That’s actually fine, because we want the python property to execute instead of just using the value the database gave us.
  • The cached_col attribute is used when generating queries - we’ll look more at that now.

When I previously wrote the ComputedField implementation, the place I was not happy was with the get_col() method/the cached_col attribute. Indeed, to get that to work, I needed to use inspect to sniff up the stack to find a query instance to resolve the expression.

This time around though, I took a different approach. I was not able to use the regular resolve_expression path, because fields are assumed not to require access to the query to resolve to a Col expression. Instead, we can delay the resolve until we have something that gives us the query object.

class ExpressionCol:
    contains_aggregate = False
    def __init__(self, expression):
        self.expression = expression
        self.output_field = expression.output_field

    def get_lookup(self, name):
        return self.output_field.get_lookup(name)

    def get_transform(self, name):
        return self.output_field.get_transform(name)

    def as_sql(self, compiler, connection):
        resolved = self.expression.resolve_expression(compiler.query)
        return resolve_expression.as_sql(compiler, connection)

    def get_db_converters(self, connection):
      return self.output_field.get_db_converters(connection) + \
             self.expression.get_db_converters(connection)

This doesn’t need to be a full Expression subclass, because it mostly delegates things to the output field, but when it is turned into SQL, it can resolve the expression before then using that resolved expression to build the SQL.

So, let’s see how this works now (without showing the new Nodes that are handled by the Parser):

Person.objects.filter(display_name__startswith='Bob')

Yeah, that correctly limits the queryset. How about the ability to re-evaluate without a db round trip?

person = Person(first_name='Fred', last_name='Jones')
person.display_name  # -> 'Fred Jones'
person.preferred_name = 'Jonesy'
person.display_name  # -> 'Fred (Jonesy) Jones'

Success!


This project is not done yet: I have improved the Parser (as implied) to support more expressions, but there is still a bit more to go. It did occur to me (but not until I was writing this post) that the ComputedField(expression) version may actually be nicer. As hinted, that requires the value to be an expression, rather than a function call. It would be possible to create a function that references self, for instance, and breaks in all sorts of ways.