Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400.

However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.

Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.

We will start with a model to store our task:

class ScheduledTask(models.Model):
    task_name = models.TextField()
    task_args = models.JSONField(default=list)
    task_kwargs = models.JSONField(default=dict)
    due_date = models.DateTimeField()

    objects = ScheduledTaskQuerySet.as_manager()

    @property
    def task(self):
        module, task = task_name.rsplit('.', 1)
        return getattr(importlib.import_module(module), task)

    def execute(self):
        self.task.apply_async(args=self.args, kwargs=self.kwargs)

We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:

class ScheduledTaskQuerySet(models.query.QuerySet):
    def due(self):
        return self.due_date < datetime.datetime.utcnow()

    def schedule(self, task, when, *args, **kwargs):
        self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)

Finally, we need a task that will enqueue the due tasks:

@app.task
def enqueue_due_tasks():
    for task in ScheduledTask.objects.due():
        task.execute()
        task.delete()

As it stands, with this code, to schedule a task, you need to create a model instance:

ScheduledTask.objects.schedule(
    'foo.tasks.bar',
    datetime.datetime(2525, 11, 11, 9, 30),
    'x',
    y='z'
)

But, it would be nice if we could use the task to schedule itself:

foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)

Or, even better:

foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))

The first one we should be able to do by using custom tasks (and implementing a schedule method):

class Scheduled(celery.Task):
    def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
        if not eta and not countdown:
            raise ValueError('One of eta and countdown must be supplied')
        if eta and countdown:
            raise ValueError('Only one of eta and countdown must be supplied')
        if eta:
            ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
        else:
            ScheduledTask.objects.schedule(
                self.name,
                datetime.datetime.utcnow() + datetime.timedelta(countdown),
                *args, **kwargs
            )

Then, as long as a task is defined as using the base class, we can schedule it:

@app.task(base=Schedule)
def task_name(x, y=None):
    pass

But what about mucking with the .s() or .signature() calls? Now we are getting a bit experimental, but it still might be fun:

from celery.canvas import Signature

def schedule(self, when=None):
    if when:
        if isinstance(when, datetime.timedelta):
            when = datetime.datetime.utcnow() + when
    else:
        if self.options.countdown:
            when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
        elif self.options.eta:
            when = self.otions.eta

    ScheduledTask.objects.create(
        task_name=self.task,
        task_args=self.args,
        task_kwargs=self.kwargs,
        due_date=when,
    )

Signature.schedule = schedule

This is totally mostly untested, and I’m not sure it’s a really good idea.

A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.

Logging WiFi client Signal Strength

One of the cool things about the old macOS version of Airport Utility is that it would show you the WiFi signal of the various connected clients. At some point, they removed this feature, bringing it down to where the iOS app is in terms of functionality.

Whilst I still have a Mac that can (only) run that version next to me on the desk, mostly I don’t use that, and having to switch computers and open it up is a pain. It would be really nice if I could get that data in a more configurable format (possibly with the client data from multiple base stations).

It turns out that this data is available over SNMP, but you need the old version of Airport Utility to enable it. And, as we shall see shortly, there is another caveat too.

Before we get to that though, my main router is actually an OpenWRT device, so it might be nice to grab the handful of clients that connect to that (it being in the garage and all).

We can get a list of the WiFi interfaces using iwinfo | awk '/ESSID/ {print $1}'. In my case, that gives me wlan0 and wlan1, but the values are not relevant, other than the fact we will need them later.

Now, for each each WiFi interface, we want to get the MAC address, which we can get similarly: iwinfo $iw assoclist | awk '/Access Point:/ {print $3}.

It might be nice to also get the “connection speed”, although in some cases this is not known. In those cases, we want to use null, since we’ll have a numeric value otherwise.

  iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
  else
   print $3
  }'

Okay, now we are ready to get the signal/noise/MAC data:

  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash

Oh, there’s a bit in there, we might unpack that a bit:

We get the list of associated clients, and then pipe that through awk. To make it easier, we want to stick our previous environment variable captures of our rate and MAC into awk variables, and then we grab the relevant columns, and build it all up into a shell command that will look something like:

mosquitto_pub -t "wifi-signal/98:AF:65:E3:16:F1"  -m '{
        "signal":-70,
        "noise":-102,
        "rate":null,
        "station":"B0:95:75:E8:C8:32"
      }' -h mqtt.lan

Because awk has no way to escape a single quote, we need to use \x27 to get it to emit one of them.

Finally, this is passed to bash to execute the command(s).

And, putting this all together:

#! /bin/sh

for iw in $(iwinfo | awk '/ESSID/ {print $1}')
do
  MAC=$(iwinfo $iw info | awk '/Access Point:/ {print $3}')
  RATE=$(iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
else
   print $3
 }')
  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash
done

Okay, so that’s the easy one. What about getting the data from SNMP?

Since this code is running not on a Mac, I can’t rely on the MIB data being available. Instead, we’ll use the raw numeric codes. It turns out that we can get all of the data we need (and then some!) using one command:

snmpwalk -v 2c -c public hostname.local iso.3.6.1.4.1.63.501.3.2.2.1

This outputs a whole bunch of data - the ones we care about have the values 1, 6, 7 and 8 in position 13, so we can extract those values. But, we want to retain a bit more context, so we can sort the data and ensure we have a consistent ordering to our data: MAC address, signal, noise and then rate for each client. To ensure we have a stable sorting, we need to get the last two parts of the MIB (which seemed to be enough in my case to generate a unique value), and then the 13th column. Then we want to get the “value” that was returned for this row. If it was a STRING value, then it will already have quotes around it.

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] '

This will build up the data in the correct format, from here we now need to put each client’s data on a single line:

  snmpwalk ...
    | <as-above>
    | awk '/^.. .. 1/{ if (NR!=1) print "";}{printf $4 " "}'

And then turn it into a string that we’ll be able to execute with bash (like before). Putting it all together:

  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpwalk -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.2.1 \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash

Note that I also needed to check that there was a signal value - one of the stations was reporting this as empty, so we skip over any that don’t have a signal value reported.

But… this doesn’t quite work. I mean, it works, but for some reason (after having left it running every minute overnight), it never updates the signal/noise/rate data. It seems that the Airport Express/Extreme SNMP data is not refreshed.

You can, however, force it to update by asking for the number of connected clients!

snmpget -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.1.0

Thus, our full script can look like:

#! /bin/bash

stations="study.lan
bedroom.lan
dining-room.lan"

CLIENT_COUNT=iso.1.3.6.1.4.1.63.501.3.2.1.0
WIFI_DATA=iso.3.6.1.4.1.63.501.3.2.2.1

for station in ${stations}
do
  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpget -v 2c -c public $station $CLIENT_COUNT > /dev/null

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash
done

These scripts could be more configurable - I’ve hard-coded the MQTT broker name, for instance, but that will be fine for my purposes. I’m also not actually doing anything with this data yet - I will eventually look at pulling this into Home Assistant or somewhere else, and graphing the data.

Locking rows in PostgreSQL

Someone asked a question today in #postgresql about locking rows for a long running task, where multiple tasks could be updating rows. There is a flag in each row that indicates whether a row has been processed, but not one indicating if it is currently being processed.

One solution is to use advisory locks to mark that we are currently working on a given row. The advantage of this over having a column that indicates it is currently being processed (or mis-using the “processed” column by setting it before we’ve finished building the data) is that if our task that generates the data fails, the lock is released when the session is closed.

So, let’s look at the things we need to make this happen. We’ll start with something that represents our long-running process:

CREATE OR REPLACE FUNCTION expensive_operation(INTEGER)
RETURNS JSONB AS $$

  BEGIN
    PERFORM pg_sleep(10);
    RETURN JSONB_BUILD_OBJECT(
        'foo', 'bar',
        'baz', $1
    );
  END;

$$ LANGUAGE plpgsql;

And we need our table to put the tasks in (and some unprocessed rows):

CREATE TABLE tasks (
    id SERIAL PRIMARY KEY,
    processed BOOLEAN DEFAULT FALSE,
    data JSONB
);

INSERT INTO tasks (data) VALUES (NULL), (NULL), (NULL);

Finally, we are ready to process our tasks.

WITH next_task AS (
  SELECT t.id, pg_advisory_lock(t.id)
    FROM (
      SELECT id
        FROM tasks
       WHERE NOT processed
         AND id NOT IN (SELECT objid
                          FROM pg_locks
                         WHERE locktype = 'advisory')
       LIMIT 1
    ) t
)
UPDATE tasks
   SET data = expensive_operation(id),
       processed = true
 WHERE id IN (SELECT id FROM next_task)
   AND NOT processed;

The CTE will get a task.id that has not been processed (and is not currently being processed). It will take an advisory lock on that value, which marks the task as in-progress.

Then, when it has the lock, it performs the expensive operation and updates the data (and the processed status).

I’m not totally sure if the AND NOT processed at the bottom of the query is necessary - it seems to me that there could be a situation where two workers try to get the lock at the same time (and therefore are both asking for the same lock), and one of them gets it. The other one, when it finally gets the lock, would now see that row as processed, and so would not update anything.

I’m happy to see a better way to do this.

RobotFramework, Chromedriver and Docker

One of my team implemented RobotFramework support for automated browser testing of our platform a while ago. At the time, we were using Codeship Basic, and I built a helper to run a robot test suite within a tox environment. It was all good, because chromedriver and all it’s dependencies were already installed.

But time passes, and we needed to move to Codeship Pro. Which has some neater features, but required me to build docker images for everything. We already use docker for deployment, but I didn’t really want to build a bunch of distinct images just for testing that re-implemented the same stuff that we have in our deployment images. Even just appending new stuff to them means that things could turn out to be a pain in the arse to manage.

And getting chromedriver installed into a docker image is not neat.

I did find a docker image that just has an instance of chromedriver, and exposes that. But getting that to work with robot was still a bunch of work. After much experimentation, I was able to get the connections between everything to work.

First, we need to have the chromedriver container running:

$ docker run -p 4444:4444 CHROMEDRIVER_WHITELISTED_IPS='' robcherry/docker-chromedriver:latest

Then, there are a few moving parts that need to be in place to get things to work. Using my djangobot management command (which I had to extend a bit here), a single command can be used to spin up a Django runserver command, apply migrations (if necessary), and then run the robot commands. The trick is you need to teach Robot to speak to the remote WebDriver instance, which then in turn speaks to the running django webserver.

First, the RobotFramework commands; my resource.robot file which is referenced by all of my robot test suites contains:

*** Variables ***

${HOSTNAME}         127.0.0.1
${PORT}             8000
${SCHEME}           http
${SERVER}           ${SCHEME}://${HOSTNAME}:${PORT}
${BROWSER}          headlesschrome
${TIMEOUT}          30
${REMOTE_URL}

*** Settings ***

Documentation   A resource file with reusable keywords and variables.
Library         SeleniumLibrary             timeout=${TIMEOUT}      implicit_wait=1
Library         Collections
Library         DebugLibrary
Library         DateTime
Library         String
Library         djangobot.DjangoLibrary     ${HOSTNAME}     ${PORT}

*** Keywords ***

Create Remote Webdriver
    ${chrome_options} =     Evaluate    sys.modules['selenium.webdriver'].ChromeOptions()    sys, selenium.webdriver
    Call Method    ${chrome_options}   add_argument    headless
    Call Method    ${chrome_options}   add_argument    disable-gpu
    Call Method    ${chrome_options}   add_argument    no-sandbox
    ${options}=     Call Method     ${chrome_options}    to_capabilities

    Create Webdriver    Remote   command_executor=${REMOTE_URL}    desired_capabilities=${options}
    Open Browser    ${SERVER}   ${BROWSER}  remote_url=${REMOTE_URL}    desired_capabilities=${options}

Start Session
    Run Keyword If  '${REMOTE_URL}'    Create Remote Webdriver
    Run Keyword If  '${REMOTE_URL}' == ''    Open Browser    ${SERVER}   ${BROWSER}

    Set Window Size     2048  2048
    Fetch Url       login
    Add Cookie      robot   true

    Register Keyword To Run On Failure    djangobot.DjangoLibrary.Dump Error Data

End Session
    Close Browser

Logout
    Fetch Url     logout

Notice that the Start Session keyword determines which type of browser to open - either a local or remote one.

Thus, each *.robot file starts with:

*** Settings ***
Resource                resource.robot
Suite Setup             Start Session
Suite Teardown          End Session
Test Setup              Logout

Because the requests will no longer be coming from localhost, you need to ensure that your runserver is listening on the interface the requests will be coming from. If you can’t detect this, and your machine is not exposed to an insecure network, then you can use 0.0.0.0 to get the django devserver to listen on all interfaces. You will also need to supply the hostname that you will be using for the requests (which won’t be localhost anymore), and ensure this is in your Django settings.ALLOWED_HOSTS.

In my case, I needed to make my robot command allow all this, but ultimately I can now do:

$ ./manage.py robot --runserver 0 \
                    --listen 0.0.0.0 \
                    --hostname mymachine.local \
                    --remote-url http://localhost:4444 \
                    --include tag

This runs against the database I already have prepared, but in my codeship-steps.yml I needed to do a bit more, and hook it up to the other containers:

coverage run --branch --parallel \
    /app/manage.py robot --migrate \
                         --server-url=http://web.8000 \
                         --remote-url=http://chromedriver:4444 \
                         --tests-dir=/app/robot_tests/ --output-dir=/coverage/robot_results/ \
                         --exclude skip  --exclude expected-failure

Now, if only Codeship’s jet tool actually cached multi-stage builds correctly.

Maybe I neeed to try this.

Update value only if present

We have a bunch of integrations with external systems, and in most of these cases we are unable to use Oauth, or other mechanisms that don’t require us to store a username password pair. So, we have to store that information (encrypted, because we need to use the value, rather than just being able to store a hashed value to compare an incoming value with).

Because this data is sensitive, we do not want to show this value to the user, but we do need to allow them to change it. As such, we end up with a form that usually contains a username and a password field, and sometimes a URL field:

class ConfigForm(forms.ModelForm):
    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

But this would show the password to the user. We don’t want to do that, but we do want to allow them to include a new password if it has changed.

In the past, I’ve done this on a per-form basis by overridding the clean_password method:

class ConfigForm(forms.ModelForm):
    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

    def clean_password(self):
        return self.cleaned_data.get('password') or self.instance.password

But this requires implementing that method on every form. As I mentioned before, we have a bunch of these. And on at least one, we’d missed this method. We could subclass a base form class that implements this method, but I think there is a nicer way.

It should be possible to have a field that handles this. The methods that look interesting are clean, and has_changed. Specifically, it would be great if we could just override has_changed:

class WriteOnlyField(forms.CharField):
    def has_changed(self, initial, data):
        return bool(data) and initial != data

However, it turns out this is not used until the form is re-rendered (or perhaps not at all by default, it’s very likely my code calls this to get a list of changed fields to mark as changed as a UI affordance).

The clean method in a CharField does not have access to the initial value, and there really is not a nice way to get this value attached to the field (other than doing it in the has_changed method, which is not called).

But it turns out this behaviour (apply changes only when a value is supplied) is the same behaviour that is used by FileField: and as such, it gets a special if statement in the form cleaning process, and is passed both the initial and the new values.

So, we can leverage this and get a field class that does what we want:

class WriteOnlyField(forms.CharField, forms.FileField):
    def clean(self, value, initial):
        return value or initial

    def has_changed(self, initial, data):
        return bool(data) and initial != data

We can even go a bit further, and rely on the behaviour of forms.PasswordInput() to hide the value on an unbound form:

class WriteOnlyField(forms.CharField, forms.FileField):
    def __init__(self, *args, **kwargs):
        defaults = {
            'widget': forms.PasswordInput(),
            'help_text': _('Leave blank if unchanged'),
        }
        defaults.update(**kwargs)
        return super().__init__(*args, **defaults)

    def clean(self, value, initial):
        return value or initial

    def has_changed(self, initial, data):
        return bool(data) and initial != data

Then we just need to override that field on our form definition:

class ConfigForm(forms.ModelForm):
    password = WriteOnlyField()

    class Meta:
        model = ExternalSystem
        fields = ('username', 'password', 'url')

Please note that this technique should not be used in the situation where you don’t need the user to be able to change a value, but instead just want to render the value. In that case, please omit the field from the form, and just use `` instead - you can even put that in a disabled text input widget if you really want it to look like the other fields.


I also use a JavaScript affordance on all password fields that default to hiding the value, but allows clicking on a control to toggle the visibility of the value: UIkit Password Field.

Preventing Model Overwrites in Django and Postgres

I had an idea tonight while helping someone in #django. It revolved around using a postgres trigger to prevent overwrites with stale data.

Consider the following model:

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

If we had two users attempting to update a given instance at around the same time, Django would fetch whatever it had in the database when they did the GET request to fetch the form, and display that to them. It would also use whatever they sent back to save the object. In that case, the last update wins. Sometimes, this is what is required, but it does mean that one user’s changes would be completely overwritten, even if they had only changed something that the subsequent user did not change.

There are a couple of solutions to this problem. One is to use something like django-model-utils FieldTracker to record which fields have been changed, and only write those back using instance.save(update_fields=...). If you are using a django Form (and you probably should be), then you can also inspect form.changed_data to see what fields have changed.

However, that may not always be the best behaviour. Another solution would be to refuse to save something that had changed since they initially fetched the object, and instead show them the changes, allow them to update to whatever it should be now, and then resubmit. After which time, someone else may have made changes, but then the process repeats.

But how can we know that the object has changed?

One solution could be to use a trigger (and an extra column).

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    _last_state = models.UUIDField()

And in our database trigger:

CREATE EXTENSION "uuid-ossp";

CREATE OR REPLACE FUNCTION prevent_clobbering()
RETURNS TRIGGER AS $prevent_clobbering$

BEGIN
  IF NEW._last_state != OLD._last_state THEN
    RAISE EXCEPTION 'Object was changed';
  END IF;
  NEW._last_state = uuid_generate_v4();
  RETURN NEW;
END;

$prevent_clobbering$
LANGUAGE plpgsql STRICT IMMUTABLE;

CREATE TRIGGER prevent_clobbering
BEFORE UPDATE ON person_person
FOR EACH ROW EXECUTE PROCEDURE prevent_clobbering();

You’d also want to have some level of handling in Django to capture the exception, and re-display the form. You can’t use the form/model validation handling for this, as it needs to happen during the save.

To make this work would also require the _last_state column to have a DEFAULT uuid_generate_v4(), so that newly created rows would get a value.


This is only a prototype at this stage, but does work as a mechanism for preventing overwrites. As usual, there’s probably more work in the application server, and indeed in the UI that would need to be required for displaying stale/updated values.

What this does have going for it is that it’s happening at the database level. There is no way that an update could happen (unless the request coming in happened to guess what the new UUID was going to be).

What about drawbacks? Well, there is a bit more storage in the UUID, and we need to regenerate a new one each time we save a row. We could have something that checks the other rows looking for changes.

Perhaps we could even have the hash of the previous row’s value stored in this field - that way it would not matter that there had been N changes, what matters is the value the user saw before they entered their changes.

Another drawback is that it’s hard-coded to a specific column. We could rewrite the function to allow defining the column when we create the trigger:

CREATE TRIGGER prevent_clobbering
BEFORE UPDATE ON person_person
FOR EACH ROW EXECUTE PROCEDURE prevent_clobbering('_last_state_');

But that requires a bit more work in the function itself:

CREATE OR REPLACE FUNCTION prevent_clobbering()
RETURNS TRIGGER AS $prevent_clobbering$

BEGIN
  IF to_jsonb(NEW)->TG_ARGV[0] != to_jsonb(OLD)->TG_ARGV[0] THEN
    RAISE EXCEPTION 'Object was changed';
  END IF;
  NEW._last_state = uuid_generate_v4();
  RETURN NEW;
END;

$prevent_clobbering$
LANGUAGE plpgsql STRICT IMMUTABLE;

Django properties from expressions, or ComputedField part 2

I’ve discussed the concept of a ComputedField in the past. On the weekend, a friend pointed me towards SQL Alchemy’s Hybrid Attributes. The main difference here is that in a ComputedField, the calculation is always done in the database. Thus, if a change is made to the model instance (and it is not yet saved), then the ComputedField will not change it’s value. Let’s look at an example from that original post:

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    display_name = ComputedField(
        Concat(F('first_name'), Value(' '), F('last_name')),
        output_field=models.TextField()
    )

We can use this to query, or as an attribute:

Person.objects.filter(display_name__startswith='foo')
Person.objects.first().display_name

But, if we make changes, we don’t see them until we re-query:

person = Person(first_name='Fred', last_name='Jones')
person.display_name  # This is not set

So, it got me thinking. Is it possible to turn a django ORM expression into python code that can execute and have the same output?

And, perhaps the syntax SQL Alchemy uses is nicer?

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

    @shared_property
    def display_name(self):
        return Concat(
            F('first_name'),
            Value(' '),
            F('last_name'),
            output_field=models.TextField(),
        )

The advantage to using the decorator approach is that you could have a more complex expression - but perhaps that is actually a disadvantage. It might be nice to ensure that the code can be turned into a python function, after all.


The first step is to get the expression we need to convert to a python function. Writing a python decorator will give us access to the “function” object - we can just call this, as long as it does not refer to self at all, this can be done without an instance:

class shared_property(object):
    def __init__(self, function):
        expression = function(None)

This gives us the expression object. Because this is a python object, we can just look at it directly, and turn that into an AST. Having a class for parsing this makes things a bit simpler. Let’s look at a parser that can handle this expression.

import ast


class Parser:
    def __init__(self, function):
        # Make a copy, in case this expression is used elsewhere, and we change it.
        expression = function(None).copy()
        tree = self.build_expression(expression)
        # Need to turn this into code...
        self.code = compile(tree, mode='eval', filename=function.func_code.co_filename)

    def build_expression(self, expression):
        # Dynamically find the method we need to call to handle this expression.
        return getattr(self, 'handle_{}'.format(expression.__class__.__name__.lower()))(expression)

    def handle_concat(self, concat):
        # A Concat() contains only one source expression: ConcatPair().
        return self.build_expression(*concat.get_source_expressions())

    def handle_concatpair(self, pair):
        left, right = pair.get_source_expressions()
        return ast.BinOp(
            left=self.build_expression(left),
            op=ast.Add(),
            right=self.build_expression(right),
        )

    def handle_f(self, f):
        # Probably some more work here around transforms/lookups...
        # Set this, because without it we get errors. Will have to
        # figure out a better way to handle this later...
        f.contains_aggregate = False
        return ast.Attribute(
            value=ast.Name(id='self'),
            attr=f.name,
        )

    def handle_value(self, value):
        if value.value is None:
            return ast.Name(id='None')

        if isinstance(value.value, (str, unicode)):
            return ast.Str(s=value.value)

        if isinstance(value.value, (int, float)):
            return ast.Num(n=value.value)

        if isinstance(value.value, bool):
            return ast.Name(id=str(value.value))

        # ... others?
        raise ValueError('Unable to handle {}'.format(value))

There’s a bit more “noise” required in there (every node must have a ctx, and a filename, lineno and col_offset), but they make it a bit harder to follow.

So, we have our expression, and we have turned that into an equivalent python expression, and compiled it…except it won’t compile. We need to wrap it in an ast.Expression(), and then we can compile it (and call it).

Roughly, we’ll end up with a code object that does:

self.first_name + (' ' + self.last_name)

We can call this with our context set:

eval(code, {'self': instance})

But, before we head down that route (I did, but you don’t need to), it’s worth noticing that not all ORM expressions can be mapped directly onto a single python expression. For instance, if we added an optional preferred_name field to our model, our display_name expression may look like:

@shared_property
def display_name(self):
    return Case(
        When(preferred_name__isnull=True, then=Concat(F('first_name'), Value(' '), F('last_name'))),
        When(preferred_name__exact=Value(''), then=Concat(F('first_name'), Value(' '), F('last_name'))),
        default=Concat(F('first_name'), Value(' ('), F('preferred_name'), Value(') ') F('last_name')),
        output_field=models.TextField()
    )

Since this will roughly translate to:

@property
  def display_name(self):
      if all([self.preferred_name is None]):
          return self.first_name + ' ' + self.last_name
      elif all([self.preferred_name == '']):
          return self.first_name + ' ' + self.last_name
      else:
          return self.first_name + ' (' + self.preferred_name + ') ' + self.last_name

Whilst this is still a single ast node, it is not an expression (and cannot easily be turned into an expression - although in this case we could use a dict lookup based on self.preferred_name, but that’s not always going to work). Instead, we’ll need to change our code to generate a statement that contains a function definition, and then evaluate that to get the function object in the context. Then, we’ll have a callable that we can call with our model instance to get our result.

There are a few hitches along the way though. The first is turning our method into both a private field and a property. That is the relatively straightforward part:

class shared_property:
    def __init__(self, function):
        self.parsed = Parser(function)
        context = {}
        eval(self.parsed.code, context)
        self.callable = context[function.func_code.co_name]

    def __get__(self, instance, cls=None):
        # Magic Descriptor method: this method will be called when this property
        # is accessed on the instance.
        if instance is None:
            return self
        return self.callable(instance)

    def contribute_to_class(self, cls, name, private_only=False):
        # Magic Django method: this is called by django on class instantiaton, and allows
        # us to add our field (and ourself) to the model. Mostly this is the same as
        # a normal Django Field class would do, with the exception of setting concrete
        # to false, and using the output_field instead of ourself.
        field = self.parsed.expression.output_field
        field.set_attributes_from_name(name)
        field.model = cls
        field.concrete = False
        # This next line is important - it's the key to having everything work when querying.
        field.cached_col = ExpressionCol(self.parsed.expression)
        cls._meta.add_field(field, private=True)
        if not getattr(cls, field.attname, None):
            setattr(cls, field.attname, self)

There are a few things to note in that last method.

  • We use the output_field from the expression as the added field.
  • We mark this field as a private, non-concrete field. This prevents django from writing it back to the database, but it also means it will not appear in a .values() unless we explicitly ask for it. That’s actually fine, because we want the python property to execute instead of just using the value the database gave us.
  • The cached_col attribute is used when generating queries - we’ll look more at that now.

When I previously wrote the ComputedField implementation, the place I was not happy was with the get_col() method/the cached_col attribute. Indeed, to get that to work, I needed to use inspect to sniff up the stack to find a query instance to resolve the expression.

This time around though, I took a different approach. I was not able to use the regular resolve_expression path, because fields are assumed not to require access to the query to resolve to a Col expression. Instead, we can delay the resolve until we have something that gives us the query object.

class ExpressionCol:
    contains_aggregate = False
    def __init__(self, expression):
        self.expression = expression
        self.output_field = expression.output_field

    def get_lookup(self, name):
        return self.output_field.get_lookup(name)

    def get_transform(self, name):
        return self.output_field.get_transform(name)

    def as_sql(self, compiler, connection):
        resolved = self.expression.resolve_expression(compiler.query)
        return resolve_expression.as_sql(compiler, connection)

    def get_db_converters(self, connection):
      return self.output_field.get_db_converters(connection) + \
             self.expression.get_db_converters(connection)

This doesn’t need to be a full Expression subclass, because it mostly delegates things to the output field, but when it is turned into SQL, it can resolve the expression before then using that resolved expression to build the SQL.

So, let’s see how this works now (without showing the new Nodes that are handled by the Parser):

Person.objects.filter(display_name__startswith='Bob')

Yeah, that correctly limits the queryset. How about the ability to re-evaluate without a db round trip?

person = Person(first_name='Fred', last_name='Jones')
person.display_name  # -> 'Fred Jones'
person.preferred_name = 'Jonesy'
person.display_name  # -> 'Fred (Jonesy) Jones'

Success!


This project is not done yet: I have improved the Parser (as implied) to support more expressions, but there is still a bit more to go. It did occur to me (but not until I was writing this post) that the ComputedField(expression) version may actually be nicer. As hinted, that requires the value to be an expression, rather than a function call. It would be possible to create a function that references self, for instance, and breaks in all sorts of ways.

Redirect all DNS traffic to the pi.hole

This is more to remind me than anything else, but I figured out how to configure my firewall to redirect all DNS traffic (except from the pihole itself) to the pihole.

My pihole has an IP address of 10.1.1.3:

iptables -t nat -A PREROUTING -i br-lan ! -s 10.1.1.3 -p tcp --dport 53 -j DNAT --to 10.1.1.3
iptables -t nat -A PREROUTING -i br-lan ! -s 10.1.1.3 -p udp --dport 53 -j DNAT --to 10.1.1.3
iptables -t nat -A POSTROUTING -j MASQUERADE

In OpenWrt, this needs to be pasted into Network → Firewall → Custom Rules, and then possibly reboot the router.

It is likely that a reboot is not necessary: the MASQUERADE line made me think I was still hitting the external DNS server, but it was transparently being handled by my pihole.

Certificate Expiry Dates without extra software

I’ve got my Home Assistant set up and running, and have obtained a Lets Encrypt certificate to allow me to serve it all over HTTPS.

One of the things that you can do is set it up to notify you about expiring certificates. However, this requires the installation of a specific package. Since I’m running Home Assistant in a docker image, I can’t really do this.

However, the tools you need to determine a certificate’s expiry date are already in most systems (otherwise how would they be able to tell if the certificate from a site is still valid?).

echo | \
  openssl s_client -connect example.com:443 2>/dev/null | \
  openssl x509 -noout -dates

This gives the very useful:

notBefore=Nov 28 00:00:00 2018 GMT
notAfter=Dec  2 12:00:00 2020 GMT

We can manipulate this using some other commands to get just the expiry date:

echo | \
  openssl s_client -connect example.com:443 2>/dev/null | \
  openssl x509 -noout -dates | \
  tail -n 1 | \
  cut -d '=' -f 2

Now, we want to turn this into a number of days from today. Bash can do arithmetic, we just need to make the values in the right format. In this case, we’ll get date to give us an epoch value, and divide this by 3600 * 24.

echo $(( ($(date +%s --date "2020-12-02 12:00:00") - $(date +%s)) / (3600 * 24) ))

That gives us 158 days from the day I wrote this. Now let’s put our command instead of the fixed date:

echo $((
  (
    $(date +%s --date "$(echo | \
  openssl s_client -connect example.com:443 2>/dev/null | \
  openssl x509 -noout -dates | \
  tail -n 1 | \
  cut -d '=' -f 2)") - $(date +%s)
  ) / (3600 * 24)
))

Okay, we still get our 158. That’s a good sign.

Now, to put this into a Home Assistant sensor, we need to edit our configuration.yaml. Note that I needed to change the date parsing format inside the docker container to %b %d %H:%M:%S %Y GMT.

sensor:
  - platform: command_line
    name: SSL Certificate Expiry
    unit_of_measurement: days
    scan_interval: 10800
    command: echo $((
      (
        $(date +%s --date "$(echo | openssl s_client -connect example.com:443 2>/dev/null
                                  | openssl x509 -noout -dates
                                  | tail -n 1
                                  | cut -d '=' -f 2)"
                   -D "%b %d %H:%M:%S %Y GMT") - $(date +%s) ) / (3600 * 24) ))

This should give us a sensor that we can then use to create an automation, as seen in the original post.

Don’t forget to change the domain to your Home Assistant hostname!

Smart Devices Aren't (or why connected devices suck)

I love tinkering with gadgets. I’ve put a bunch of sensors around my house, so I can see the temperature in various places, and have a couple of smart light and power devices too. At the moment, they are limited to my laundry (where the hard-wired switch is in the wrong place, due to the moving of a door), my workbench (because the overhead lights there run from a power point, so it was trivial to put in a smart switch), and the lounge room (where I had room in the light fitting to put a Sonoff Mini).

In all of these cases, with the exception of the Laundry, since the switch is not really accessible, I have taken great care to ensure that the physical switches still toggle the light. In that case I have an Ikea bulb connected to an Ikea dimmer.

In my study, I have a desk lamp that has a smart (dimmable) bulb in it, and it irks me no end that I have to use a smart device or computer to turn it on or off. I will be getting some more of the Ikea dimmers to alleviate this, but in the mean time, it’s a pain to turn on or off.

Having said that, I love the option of being able to automate power and lighting, or turn things off from a distance. I just don’t like that being the only way.

I installed Home Assistant on the weekend. But, in order to fit that onto my Raspberry Pi, I needed to use a bigger Micro SD card.

Which meant I needed to clone the old one.

Which took several hours.

I’d already installed Home Assistant before running out of space, and had converted a couple of my esphome devices to use the API instead of just MQTT for connection, including the lounge room light.

Now, it turns out, by default there is an “auto reboot if API not found in 15 minutes” setting, which meant that during the four or five hours it took to create an image of the Micro SD, verify this, copy to a new SD card, and then verify that, my lights (and a powerboard in my office) would flick off every 15 minutes. Likewise, if they cannot connect to a WiFi access point they will power cycle. I believe this second one can be resolved using a Captive AP setting that will mean if they can’t connect to a network, they will create their own.

Which really got me thinking. Smart devices should continue to work in every way possible when they don’t have access to the local network, or the internet. In my case, my smart devices do not have access to the internet anyway, because they don’t need to. However, the point is the same.

In situations where a network connection, or even worse, a working connection to a server that you don’t control, is no longer available, you dont’ want your lights or god forbid, your coffee machine to not be able to perform their simple task.

This worries me somewhat about the current trends in smart homes. At some point, companies will stop supporting their devices (this has already happened), and they will become less useful than their dumb counterparts. And add further to our global waste problems.

But having a significant system outage (even an intentional one, like in my case), made me think about other aspects of my home automation as well.

I’ve been using NodeRED for a couple of automation tasks. One of them was to have different grind lengths for my coffee grinder, and making this available to Siri.

However, with the device running NodeRED not operating, I was no longer able to rely on this.

I was heading this way philosophically before, but (OMG NO COFFEE) this just cemented something else in my mind. Automations, where they don’t rely on interaction between multiple devices, should live on the local device where possible. Further to this, where the interaction between devices is required for the automation (like the PIR sensor in the laundry I have that turns on the Ikea lightbulb), the devices should connect directly to one another, without requiring some other mechanism to trigger the automation.

In my case, I have a physical button that I press to trigger a long grind. But, the grind only stops if the NodeRED server tells it to. And, I had no way to (when NodeRED was not running), trigger a short grind.

I was able to fix this: I now have a short press triggering a long grind, and a long press triggering a short grind. That seems backwards, but since I mostly do a long grind in the morning before I’ve had time to properly wake up, I want that the easiest one to trigger…


Having to program this in my esphome firmware instead of NodeRED made for an interesting exercise. Because we need to turn off the device after a period of time, but need to be aware of other events that have happened in the meantime, we need to use scripts.

script:
  - id: short_grind
    then:
      - switch.turn_on: relay
      - delay: 13s
      - switch.turn_off: relay
  - id: long_grind
    then:
      - switch.turn_on: relay
      - delay: 17s
      - switch.turn_off: relay

Whenever our relay turns on, we want to start our long grind script, so that even if the relay was triggered some other way than through the script, it will turn off after 17s if not before. Whenever it turns off, we want to stop any instances of our scripts running. We can also use Template Switches to have logical devices we can use to trigger the different scripts, either from Home Assistant, or from button presses:

switch:
  - platform: gpio
    id: relay
    pin: GPIO2
    restore_mode: ALWAYS_OFF
    on_turn_on:
      - script.execute: long_grind
    on_turn_off:
      - script.stop: short_grind
      - script.stop: long_grind
  - platform: template
    name: "Grind a Single"
    optimistic: true
    id: grind_a_single
    icon: mdi:coffee-outline
    turn_on_action:
      - script.execute: short_grind
      - script.wait: short_grind
      - switch.template.publish:
          id: grind_a_single
          state: OFF
    turn_off_action:
      - switch.turn_off: relay
  - platform: template
    name: "Grind a Double"
    optimistic: true
    id: grind_a_double
    icon: mdi:coffee
    turn_on_action:
      - script.execute: long_grind
      - script.wait: long_grind
      - switch.template.publish:
          id: grind_a_double
          state: OFF
    turn_off_action:
      - switch.turn_off: relay

Both of these template switches will also turn off the grinder when toggled off if they are currently on.

There’s only one more bit of logic that’s required, and that’s the handling of the physical button. I wanted this to trigger either length based on the amount of time that the button is held down for, but I also want a UX affordance of knowing when you have held it down long enough to trigger the alternate action. Finally, if it’s on, any type of press should turn it off, and not trigger a new grind.

binary_sensor:
  - platform: gpio
    pin:
      number: GPIO14
      inverted: true
      mode: INPUT_PULLUP
    on_press:
      - light.turn_on:
          id: led
          transition_length: 0s
      - delay: 500ms
      - light.turn_off:
          id: led
          transition_length: 0s
    on_click:
      - max_length: 350ms
        then:
          - if:
              condition:
                - switch.is_on: relay
              then:
                - switch.is_off: relay
              else:
                - script.execute: long_grind
      - min_length: 500ms
        max_length: 2s
        then:
          - if:
              condition:
                - switch.is_on: relay
              then:
                - switch.is_off: relay
              else:
                - script.execute: short_grind

Remember that the turning off of the relay will stop any running scripts.

So, now, when you hold down the button, when the light turns off, you can release it and it will trigger a short grind. If you just tap the switch and release it immediately, it will trigger a long grind. Any button press when the grinder is already running will turn it off.