Django ComputedField()

A very common pattern, at least in code that I’ve written (and read) is to annotate on a field that uses an expression that is based on one or more other fields. This could then be used to filter the objects, or just in some other way.

The usual method of doing this is:

from django.db import models
from django.db.models.expressions import F, Value
from django.db.models.function import Concat


class PersonQuerySet(models.query.QuerySet):
    def with_name(self):
        return self.annotate(
            name=Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()),
        )


class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

    objects = PersonQuerySet.as_manager()

Yes, I’m aware of falsehoods programmers believe about names, but this is an easy-to-follow example.

In order to be able to access the name field, we must use the with_name() queryset method. This is usually okay, but if it is something that we almost always want, it can be a little tiresome. Alternatively, you could override the get_queryset() method of a custom manager, but that makes it somewhat surprising to a reader of the code. There are also some places where a custom manager will not automatically be used, or where it will be cumbersome to include the fields from a custom manager (select_related, for instance).

It would be much nicer if we could write the field declaratively, and have it use the normal django mechanism of defer and only to remove it from the query if required.

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    name = ComputedField(Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()))

I’ve spent some time digging around in the django source code, and have a fairly reasonable understanding of how fields work, and how queries are built up. But I did wonder how close to a working proof of concept of this type of field we could get without having to change any of the django source code. After all, I was able to backport the entire Subquery expression stuff to older versions of django after writing that. It would be nice to repeat the process here.

There are a few things you need to do to get this to work:

  • store the expression
  • prevent the field from creating a migration
  • ensure the field knows how to interpret data from the database
  • ensure the field adds the expression to it’s serialised version
  • prevent the field from writing data back to the database
  • inject the expression into the query instead of the field name
class ComputedField(models.Field):
    def __init__(self, expression, *args, **kwargs):
        self.expression = expression.copy()
        kwargs.update(editable=False)
        super().__init__(*args, **kwargs)

There is already a mechanism for a field to prevent a migration operation from being generated: it can return a db_type of None.

    def db_type(self, connection):
        return None

We can delegate the responsibility of interpreting the data from the database to the output field of the expression - that’s how it works in the normal operation of expressions.

    def from_db_value(self, value, expression, connection):
        return self.expression.output_field.from_db_value(value, expression, connection)

Storing the expression in the serialised version of a field is explained in the documentation on custom fields:

    def deconstruct(self):
        name, path, args, kwargs = super().deconstruct()
        return name, path, [self.expression] + args, kwargs

To prevent the field from being included in the data we write back to the database turned out to be fairly tricky. There are a couple of mechanisms that can be used, but ultimately the only one that worked in the way I needed was something that is used by the inheritance mechanism. We have to indicate that it is a “private” field. I’m not 100% sure of what the other implications of this might be, but the outcome of making this field private is that it no longer appears in the list of local fields. There is one drawback to this, which I’ll discuss below.

    def contribute_to_class(self, cls, name, private_only=False):
        return super().contribute_to_class(cls, name, True)

So, we only have one task to complete. How do we inject the expression into the query instead of the column?

When django evaluates a queryset, it look at the annotations, and the expressions that are in these. It will then “resolve” these expressions (which means the expression gets told which “query” is being used to evaluate it, allowing it to do whatever it needs to do to make things work).

When a regular field is encountered, it is not resolved: instead it is turned into a Col. This happens in a few different places, but the problem is that a Col should not need to know which query it belongs to: at most it needs to know what the aliased table name is. So, we don’t have a query object we can pass to the resolve_expression method of our expression.

Instead, we’ll need to use Python’s introspection to look up the stack until we find a place that has a reference to this query.

    def get_col(self, alias, output_field=None):
        import inspect

        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        col = self.expression.resolve_expression(query=query)
        col.target = self
        return col

So, how does this code actually work? We go through each frame in the stack, and look for a function (or method, but they are really just functions in python) that matches one of the types we know about that have a reference to the query. Then, we grab that, stop iterating and resolve our expression. We have to set the “target” of our resolved expression to the original field, which is how the Col interface works.

This moves the resolve_expression into the get_col, which is where it needs to be. The (resolved) expression is used as the faked column, and it knows how to generate it’s own SQL, which will be put into the query in the correct location.

And this works, almost.

There is one more situation that needs to be taken into account: when we are referencing the field through a join (the x__y lookup syntax you often see in django filters).

Because F() expressions reference the local query, we need to first turn any of these that we find in our computed field’s expression (at any level) into a Col that refers to the correct model. We need to do this before the resolve_expression takes place.

    def get_col(self, alias, output_field=None):
        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        def resolve_f(expression):
            if hasattr(expression, 'get_source_expressions'):
                expression = expression.copy()
                expression.set_source_expressions([
                  resolve_f(expr) for expr in expression.get_source_expressions()
                ])
            if isinstance(expression, models.F):
                field = self.model._meta.get_field(expression.name)
                if hasattr(field, 'expression'):
                    return resolve_f(field.expression)
                return Col(alias, field)
            return expression

        col = resolve_f(self.expression).resolve_expression(query=query)
        col.target = self
        return col

There is a repo containing this, which has a bunch of tests showing how the different query types can use the computed field:

https://github.com/schinckel/django-computed-field


But wait, there is one more thing…

A very common requirement, especially if you are planning on using this column for filtering, would be to stick an index on there.

Unfortunately, that’s not currently possible: the mechanism for preventing the field name from being in the write queries, making it a private field, prevents using this field in an index. Anyway, function/expression indexes are not currently supported in Django.

It’s not all bad news though: Markus has a Pull Request that will enable this feauture; from there we could (if db_index is set) automatically add an expression index to Model._meta.indexes in contribute_to_class, but it would also be great to be able to use index_together.

I suspect to get that, though, we’ll need another mechansim to prevent it being in the write queries, but still be a local field.

(Thanks to FunkyBob for suggestions, including suggesting the field at all).

Set-returning and row-accepting functions in Django and Postgres

Postgres set-returning functions are an awesome thing. With them, you can do fun things like unnesting and array, and will end up with a new row for each item in the array. For example:

class Post(models.Model):
    author = models.ForeignKey(AUTH_USER_MODEL, related_name='posts')
    tags = ArrayField(base_field=TextField(), null=True, blank=True)
    created_at = models.DateTimeField()
    content = models.TextField()

The equivalent SQL might be something like:

CREATE TABLE blog_post (
  id SERIAL NOT NULL PRIMARY KEY,
  author_id INTEGER NOT NULL REFERENCES auth_user (id),
  tags TEXT[],
  created_at TIMESTAMPTZ NOT NULL,
  content TEXT NOT NULL
);

We can “explode” the table so that we have one tag per row:

SELECT author_id, UNNEST(tags) AS tag, created_at, content
FROM blog_post;

To do the same sort of thing in Django, we can use a Func:

from django.db.models import F, Func

Post.objects.annotate(tag=Func(F('tags'), function='UNNEST'))

In practice, just like in the Django docs, I’ll create a convenience function:

class Unnest(Func):
    function = 'UNNEST'

    @property
    def output_field(self):
        output_fields = [x.output_field for x in self.get_source_expressions()]
        if len(output_fields) == 1:
          return output_fields[0].base_field

        return super(Unnest, self).output_field

The opposite of this is aggregation: in the case of UNNEST, it’s almost ARRAY_AGG, although because of the handling of nested arrays, this doesn’t quite round-trip. We already know how to do aggregation in Django, so I will not discuss that here.

Hovewer, there is another related operation: what if you want to turn a row into something else. In my case, this was turning a row from a result into a JSON object.

SELECT id,
       to_jsonb(myapp_mymodel) - 'id' AS "json"
  FROM myapp_mymodel

This will get all of the columns except ‘id’, and put them into a new column called “json”.

But how do we get Django to output SQL that will enable us to use a Model as the argument to a function? Ultimately, we want to get to the following:

class ToJSONB(Func):
    function = 'TO_JSONB'
    output_field = JSONField()


MyModel.objects.annotate(
  json=ToJSONB(MyModel) - Value('id')
).values('id', 'json')

Our first attempt could be to use RawSQL. However, this has a couple of problems. The first is that we are writing lots of raw SQL, the second is that it won’t work so well if the table is aliased by the ORM. That is, if you use this in a join or subquery, where Django automatically assigns an alias to this table, then referring directly to the table name will not work.

MyModel.objects.annotate(json=Raw("to_jsonb(myapp_mymodel) - 'id'", [], output_field=JSONField()))

Instead, we need to dynamically find out what the current alias for the model is in this query, and use that. We’ll also want to figure out how to “subtract” the id key from the JSON object.

class Table(django.db.models.Expression):
    def __init__(self, model, *args, **kwargs):
        self.model = model
        self.query = None
        super(Table, self).__init__(*args, **kwargs)

    def resolve_expression(self, query, *args, **kwargs):
        clone = super(Table, self).resolve_expression(query, *args, **kwargs)
        clone.query = query
        return clone

    def as_sql(self, compiler, connection, **kwargs):
        if not self.query:
            raise ValueError('Unresolved Table expression')
        alias = self.query.table_map.get(self.model._meta.db_table, [self.model._meta.db_table])[0]
        return compiler.quote_name_unless_alias(alias), []

Okay, there’s a fair bit going on there. Let’s look through it. We’ll start with how we use it:

MyModel.objects.annotate(json=ToJSONB(Table(MyModel)))

We create a Table instance, which stores a reference to the model. Technically, all we need later on is the database table name that will be used, but we’ll keep the model for now.

When the ORM “resolves” the queryset, we grab the query object, and store a reference to that.

When the ORM asks us to generate some SQL, we look at the query object we have a reference to, and see if our model’s table name has an entry in the table_map dict: if so, we get the first entry from that, otherwise we just use the table name.

Okay, what about being able to remove the entry in the JSONB object for ‘id’?

We can’t just use the subtraction operator, because Postgres will try to convert the RHS value into JSONB first, and fail. So, we need to ensure it renders it as TEXT. We also need to wrap it in an ExpressionWrapper, so we can indicate what the output field type will be:

id_value = models.Func(models.Value('id'), template='%(expressions)s::TEXT')
MyModel.objects.annotate(
    json=ExpressionWrapper(
        ToJSONB(Table(MyModel)) - id_value, output_field=JSONField()
    )
)

I also often use a convenience Cast function, that automatically does this based on the supplied output_field, but this is a little easier to use here. Note there is a possible use for ToJSONB in a different context, where it doesn’t take a table, but some other primitive.


There’s one more way we can use this construct: the geo_unique_indexer function from a previous post needs a table name, but also the name of a field to omit from the index. So, we can wrap this up nicely:

class GeoMatch(models.Func):
    function = 'geo_unique_indexer'
    output_field = JSONField()

    def __init__(self, model, *args, **kwargs):
        table = Table(model)
        pk = models.Value(model._meta.pk.db_column or model._meta.pk.name)
        return super(GeoMatch, self).__init__(table, pk, *args, **kwargs)

This is really tidy: it takes the model class (or maybe an instance, I didn’t try), and builds a Table, and gets the primary key. These are just used as the arguments for the function, and then it all works.

Best match in hierarchy using Postgres

The other day, I had an issue where I needed to prevent duplicates in a subset of nullable columns for the purpose of modelling different regional breakdowns; I came up with quite a neat solution that uses JSONB functions.

What I didn’t realise until later was how this same function can be used to detect the “best match” of an object within these rows.

In this case, the best match is the row that has the most specificity (ie, the highest number of non-null values), but does not have any values that differ in the target object.

For instance, given the data (and the function defined in the aforementioned post):

{"country": "AU"}
{"country": "AU", "state": "SA"}
{"country": "NZ"}

We would want an object that is within Australia (AU) to match the first row, unless it also was within South Australia (SA), when it should match the second row.

We can use the json containment operators to query this match:

SELECT geo_unique_indexer(geo_table, 'id')
  FROM geo_table
 WHERE geo_unique_indexer(geo_table, 'id') <@ '{"country": "AU", "state": "SA"}';

Okay, that’s really close: it shows all “parent” data:

{"country": "AU"}
{"country": "AU", "state": "SA"}

So, we need to ensure that only the most specific one of these is actually returned:

SELECT geo_unique_indexer(geo_table, 'id')
  FROM geo_table
 WHERE geo_unique_indexer(geo_table, 'id') <@ '{"country": "AU", "state": "SA"}';
 ORDER BY (SELECT count(*) FROM JSONB_OBJECT_KEYS(geo_unique_indexer(geo_table, 'id'))) DESC
 LIMIT 1;

And, presto, we have our best match:

{"country": "AU", "state": "SA"}

Postgres multi-column unique index

Consider something that requires a “best match” facility, based on the geographic location. There may be a set of behaviours that apply when an object is in Australia, and a different set within the USA. If this is all you have, then it’s simple to just have those behaviours based on a single country column in your database.

But now consider that in some circumstances, it’s possible to have behaviours within a single state of Australia that should override the nationwide behaviours. For instance, Western Australia may just use the Australian rules, but South Australia has a custom set of rules. Likewise, California may use a custom set of rules, but Hawaii just uses the inherited USA rules.

To ensure that a consistent set of rules is applied, there could only be a single “Australia” rule set, but there could be an “Australia+South Australia” set stored. This could also be modelled, in a slightly more complicated fashion, by having two columns, country, and state, and requiring country to be unique if state is NULL, and both country and state to be unique together.

Now, consider that there could be a whole stack more levels, and some of these levels have different names (and different orders in the hierarchy) in different countries. For instance, New Zealand has regions, but Great Britian has Nations (England, Northern Ireland, Scotland and Wales), but also counties (or council areas in Scotland).

Django Localflavor has a whole bunch of these divisions, but there are only nine “names” for these:

  • nation
  • state
  • province
  • provincial district
  • prefecture
  • region
  • county
  • department
  • municipality
CREATE TABLE geo_table (
  id SERIAL PRIMARY KEY,
  country VARCHAR(2) NOT NULL,
  county TEXT,
  department TEXT,
  municipality TEXT,
  nation TEXT,
  prefecture TEXT,
  province TEXT,
  provincial_district TEXT,
  region TEXT,
  state TEXT
);
INSERT INTO geo_table(country, state)
VALUES ('AU', NULL), ('AU', 'SA');

So, to prevent having data in our database that could result in an inability to determine which rule set we should use for a given object, we want to have the following:

A unique constraint that applies to all columns that are not NULL

It could be possible to model this with a number of different unique constraints, but that doesn’t seem like heaps of fun to deal with.

However, postgres allows us to define “functional” indices, that is, they apply a function to some columns from the row, and use that as the stored value in the index.

We can write an expression that takes the whole row, turns it into JSON, removes the primary key column, and then any columns that are NULL, and use that as the index.

SELECT jsonb_strip_nulls(
         to_jsonb(geo_table) - 'id'::TEXT
       )
  FROM geo_table;
  {"country": "AU"}
  {"country": "AU", "state": "SA"}

So, that looks pretty good, but what’s going on here?

In this case, we take the row, and turn it into a JSONB object, remove the primary key (in this case, called id), and then remove any keys that have a NULL value.

Now, we can’t just use this expression in an index, because to_jsonb (and row_to_json) are not IMMUTABLE, because the value they turn a TIMESTAMPTZ into depends upon something that may not be fixed. However, because our data will only contain strings, which can be turned reliably into the same JSON value regardless of anything else. So, we’ll need to create a function:

CREATE OR REPLACE FUNCTION geo_unique_indexer(anyelement, pk TEXT)
RETURNS jsonb AS $$

    SELECT jsonb_strip_nulls(to_jsonb($1) - pk);

$$
LANGUAGE SQL IMMUTABLE;

You’ll notice that I had to use anyelement, as it’s not possible to have an SQL function refer to a record type. I’ve also made it configurable what the primary key column is, so it doesn’t depend on using the column “id”.

Finally, we can create the index:

CREATE UNIQUE INDEX geo_unique_match ON geo_table(
  geo_unique_indexer(geo_table, 'id')
);

Okay, that looks good. Now we can try to insert some values that should not be permitted. We already have a row with just Australia:

INSERT INTO geo_table(country, state) VALUES ('AU', NULL);
ERROR:  duplicate key value violates unique constraint "geo_unique_match"
DETAIL:  Key (geo_unique_indexer(geo_table.*, 'id'::text))=({"country": "AU"}) already exists.

…and another row with both Australia and SA.

INSERT INTO geo_table(country, state) VALUES ('AU', 'SA');
ERROR:  duplicate key value violates unique constraint "geo_unique_match"
DETAIL:  Key (geo_unique_indexer(geo_table.*, 'id'::text))=({"country": "AU", "state": "SA"}) already exists.

Finally, this one should succeed:

INSERT INTO geo_table(country, state) VALUES ('AU', 'NSW');

Excellent.

Codeship PR checker

Continuous Integration (CI) is great: basically you have a system that ensures that every time a change is committed your full test suite is run, and any failures are reported back. In our case, we have tests that take around 40 minutes to run (because we have lots of tests that need to create data and then ensure that things work based on that data), so being able to have that happen while you continue work is really nice. On top of this, we use Codecov to check that coverage does not decrease on a given commit.

Our general workflow is that we create a branch (in Mercurial, branches are different to git branches, they are long-lived, and all code within a branch retains that reference, meaning it’s easy to associate code with Jira issues), and create a Pull Request when the code is ready for review. At any one time there may be several Pull Requests open, that may or may not affect the same files.

Tests are run by Codeship (our CI service) against every commit, so it’s easy to see if a given commit is valid for merging, but there’s no way to know if a commit will (a) merge cleanly, and (b) still pass tests after merging, at least until you merge it. (a) is handled by BitBucket (it shows us if a merge will apply cleanly), but (b) is still a problem.

Until yesterday.

I was able to leverage the “Test Pipelines” feature of Codeship to make a pipeline that checks if the current branch is default, and if it is not, then it attempts an automated merge, followed by running all of the tests. It doesn’t send results to Codecov, because the commit that would be covered does not exist yet, but it does report errors if it fails to merge, or fails to run tests correctly.

if [ "$CI_BRANCH" != "default" ] ; then hg merge -r default --tool internal:merge; detox; fi

Notes: we use mercurial, so the branch name is based on that, as is the merge command. We also use tox, and detox to run tests in parallel.

This catches some big issues we had, where two migrations in one app were created in different branches, and the migrations framework is unable to deal with that. It could also pick up other issues, where a merge results in code that does not work, even though both commits contained fully passing code.

It’s not quite PR testing, because if the target revision changes (ie, a different PR is merged), then it should run the merge-checking pipeline again, but I’m not sure how to trigger this.

HomeKit Pairing Issues (HAP-python)

There were quite a few changes to HAP-python that I hadn’t kept up with in my MQTT to HomeKit bridge, but after restarting my computer, I must have updated the installed version in that package, and all sorts of things stopped working.

I spent some time getting code to actually execute again, but had an issue where it was still not working. All of the code was running as expected, but HomeKit was just failing to recognise anything. So, I unpaired and attempted to re-pair.

It failed to pair.

Well, technically, it paired, but then unpaired immediately.

It turns out that if the JSON data that is sent to HomeKit in invalid (semantically, in this case: it was valid JSON data, just not quite valid HAP data), then it will unpair - if the device is already paired, it will just appear as unavailable.

I had some custom code that built up the Information Services slightly differently, but my method of ignoring the standard HAP-python code that added this seemed to no longer work, so my bridge, and all of my accessories had two Information Services.

Fixing this meant that I was able to pair correctly again.

MQTT HomeKit Bridge

Writing HomeKit devices is possible (and even simple) using tools like HAP-python. However, devices like the esp8622 are slow to do the handshake stuff, and having to keep them awake to read temperature or other data on demand means you can’t use the deep sleep features.

These IoT devices can, however, quite easily handle publishing to an MQTT topic.

I’ve read most of the HomeKit Accessory Protocol spec (at least, the non-commercial one, but you’ll still need credentials to view that link), and I think I have a pretty good handle on it. And it occurred to me that it should be possible to bridge, in both directions, an MQTT broker and HomeKit.

Basically, you can then have a single bridge device (that you only need to register in HomeKit once), and have this connect to your MQTT broker. It can then perform two actions:

  • Listen for MQTT messages that meet certain criteria, and pass these through to HomeKit
  • Listen for HomeKit messages, and convert these into MQTT messages.

There’s a bit more to it than that: it keeps track of what devices are known, and will automatically add new devices when it detects one (via a matching MQTT topic). It could also remove devices that have not been seen for some time (or when a specific message indicates that device is no longer available).

I’ve chosen to make this as simple as possible - at this stage of my prototype there is no authentication in the MQTT broker, but that will have to change before I hook up anything other than temperature sensors. My Garage Door opener is still a standalone HomeKit device!

So, down to the nuts and bolts.

A message that matches the following pattern will be processed:

HomeKit/<device_id>/<service_type>/<characteristic_name>

For instance, I can currently see some messages that look like:

HomeKit/esp8266_12345678/TemperatureSensor/CurrentTemperature 20
HomeKit/esp8266_12345678/HumiditySensor/CurrentRelativeHumidity 58
HomeKit/123456789ABCDEF/TemperatureSensor/CurrentTemperature 20.125
HomeKit/TEST/Switch/On 1

The thing you might notice is that two of those messages have the same device id - the bridge knows this, and will add a second service to the accessory.

To be honest, this solution seems too simple, but it has been working really well for me for some time now. I have configured the sensors to send retain (persistent) messages, but I think I’m going to turn that off, except in the case of things like the switch device.

The other thing I haven’t totally nutted out yet is the authentication/authorisation stuff for MQTT. I have had some thoughts at this point though:

  • A device will generate a password when it first boots (and stores this).
  • This password will be used with the device id to authenticate with the broker.
  • When the client attempts to connect, a check will be made to see if the user exists - if so, the password must match. If not, the user will be created.
  • Any user created in this manner will be able to read and write topics that match HomeKit/<user_id>/#
  • A special user (the HomeKit bridge user) must be able to read and write all HomeKit/# topics.

Now that I’ve gotten OTA working with these devices, I need some mechanism for triggering this via MQTT.

OTA firmware updates with MicroPython/ESP8266

It’s been a hard couple of days, but I’ve finally managed to get OTA (over-the-air) firmware updates working on an ESP8266 running MicroPython.

Building and flashing OTA firmware

This assumes you have a working ESP toolchain (maybe see this comment if you don’t yet), and have been able to build and flash custom micropython firmware.

The bootloader/OTA software is available at yaota, but the instructions for how to actually build and flash the initial OTA server and regular app are not complete. Here’s what I ended up having to do to get it to work.

  • Before you build, you need to generate keys:
      $ cd ota-client
      $ . gen_keys.sh
      $ python -c "import rsa_sign; rsa_sign.dump_c(rsa_sign.load_key())"
  • The last step outputs some data from the key - specifically you need the mod value, and this needs to be used in a config.h file at the root of the project. Duplicate the config.h.example file, and replace the MODULUS value with your public key’s modulus.

  • I needed to make some other changes too: until they are merged, you can see them here. They are seperate pull requests, because whilst they all need to be applied to get a build to complete, they are all independent.

  • Once these patches have been applied, run make at the root of the project.

  • Flash the generated yaota8266.bin file to location 0x0 of your device.

  • Flash the micropython OTA build to location 0x3c000 of your device.

You should now be able to apply OTA updates. I also included in my ota-server the patch from ulno that allows for triggering OTA from within micropython code. I added into my micropython firmware a file ota.py, that contains:

import machine

def start():
    machine.RTC().memory('yaotaota')
    machine.reset()

This means I can trigger an OTA state by doing:

import ota
ota.start()

Performing an OTA update.

In order to do this, you must have built and flashed your own yaota8266, with your own keys. Otherwise you won’t be able to sign the updates, and the ota-server binary will reject them.

Trigger an OTA state (either using the process above, or by hitting a button in the 3 seconds after the bootloader is complete).

Then, in the ota-client directory, execute the following:

$ ota_client.py live-ota <../../micropython/ports/esp8266/build/firmware-ota.bin> -a <10.9.8.160>

This will send off the 2274-ish packets to the device, and after this you should have your new firmware installed. Note you’ll need to supply your firmware file and IP address.


Notes that I came across during this process.

  • If you are able to flash both images correctly, but get errors from the micropython app, check that you aren’t flashing a non-ota build. In my case, the boot sequence was performing as expected, but when no OTA state was detected, and the micropython app was due to start, it was crashing with an error:
     Fatal exception (0):
     epc1=0x4020a8ac, epc2=0x00000000, e...
  • I was unable to get an OTA update to work on my ESP8266-01 device. This could be because it doesn’t have enough flash, or some other reason.

  • The sonoff device(s) I have require flashing using -fm dout mode: with other modes a write_flash will appear to succeed, but the binary will not be viable, and only garbage will be seen on the screen.

  • Some USB-serial converters do not provide enough current to power an ESP8266 running in anything other than flash-from-UART mode. That is, you will be able to flash them, but not boot into regular apps.

  • Having a USB-serial converter that has RTS/DTR means you can have esptool.py automatically trigger boot mode. This is nice. It’s a shame that the sonoff devices don’t expose enough pins to hook this up though.


As an aside, my micropython builds contain the mqtt client library, and automatically set up the WiFi credentials for the network they are installed in. ESP8266 devices retain their WiFi credentials and attempt to restore their connection state after restart, so that can simplify code somewhat, and you can just edit the configuration in the modules/inisetup.py module.

Now, I just need to come up with some mechanism for triggering the OTA state in a device using MQTT, and for devices that are not continually awake (I have several sensors that use deep-sleep for some time, and then wake up, publish an MQTT message and then go back to sleep).

More JSONB querying

Occasionally, I get emails from people regarding specific queries in Postgres, usually because I have blogged about JSONB querying before.

Today, I got one: rather than just reply, I thought I’d blog about how queries could be written to solve this problem.

Our table can be a single column with JSONB data for the purposes of this.

CREATE TABLE priority (data JSONB);

We also need a bit of data to query:

INSERT INTO priority (data) VALUES (
'{
  "id": "02e32a14-904c-4153-a32b-fe8d1f1bbbe1",
  "entity": "activity",
  "fields": {
    "subject": [
      {"val": "Subject", "priority": 7}
    ]
  },
  "recordStatusType": "active"
}'), (
'{
  "id": "b33498b2-32f6-4575-b2cd-9e9a1ae2059d",
  "entity": "activity",
  "fields": {
    "subject": [
      {"val": "Subject", "priority": 4}
    ]
  },
  "recordStatusType": "active"
}'), (
'{
  "id": "a2d327d2-7668-4dc0-ae1d-d6144130e3ec",
  "entity": "activity",
  "fields": {
    "object": [],
    "subject": [
      {"val": "Object", "priority": 1},
      {"val": "Target", "priority": 7}
    ]
  }
}'), (
'{
  "id": "3bc8b536-00af-4fc7-881e-b88b620ac436",
  "entity": "activity",
  "fields": {
    "object": [
      {"val": "Object", "priority": 9}
    ]
  }
}'
);

The problem requires selection of the data rows where priority is greater than 5.

I’ve extended the data provided: I’m not sure if there will be multiple “fields”, but I assume so. I also assume that a match for any priority within a subject field will be required.

Lets start with a simpler version: get the records where the first fields->subject priority is greater than 5 (I’ll return just the id, to make it simpler):

SELECT data->'id'
  FROM priority
 WHERE (data#>>'{fields,subject,0,priority}')::INTEGER > 5;

 "02e32a14-904c-4153-a32b-fe8d1f1bbbe1"

This uses the #>> operator - which does a path lookup, and returns a string value, that we then cast to an integer for the comparison. Note that the path lookup differs from normal Postgres’ array indexing, in that it uses 0 as the first index, rather than 1.

But, we want to query for all rows where any subject field has a priority greater than 5.

We’ll want to use the jsonb_array_elements (which is the JSONB equivalent of unnest). We can use that to get the fields themselves:

SELECT jsonb_array_elements(data#>'{fields,subject}') FROM priority;

Note this uses the #> operator, because we still want JSONB data:

       jsonb_array_elements
──────────────────────────────────
 {"val": "Subject", "priority": 7}
 {"val": "Subject", "priority": 4}
 {"val": "Object", "priority": 1}
 {"val": "Target", "priority": 7}
(4 rows)

We can get a bit further too:

SELECT jsonb_array_elements(data#>'{fields,subject}')->'priority' FROM priority;

Indeed, we can get all the way to our boolean test:

SELECT (jsonb_array_elements(data#>'{fields,subject}')->>'priority')::INTEGER > 5 FROM priority;
 ?column?
─────────
 t
 f
 f
 t
(4 rows)

But we want the data rows themselves, not just the matching subject field, and this is not that useful. So, we can use the fact that jsonb_array_elements returns a set, and use that as a subquery in our WHERE clause, using the value operator ANY() construct:

SELECT data->'id'
  FROM priority
 WHERE 5 < ANY(SELECT (jsonb_array_elements(data#>'{fields,subject}')->>'priority')::INTEGER)

This means that we want only the records where 5 is less than any of the priority values in subject fields.

                ?column?
────────────────────────────────────────
 "02e32a14-904c-4153-a32b-fe8d1f1bbbe1"
 "a2d327d2-7668-4dc0-ae1d-d6144130e3ec"

I hope this helps, Paulo!

Django multitenancy using Postgres Row Level Security

Quite some time ago, I did some experiments in using Postgres Row Level Security (RLS) from within Django.

It occurred to me that this philosophy could be used to model a multi-tenant application.

The main big problem with django-boardinghouse is that you have to apply migrations to multiple schemata. With many tenants, this can take a long time. It’s not easy to do this in a way that would be conducive to having limited downtime.

On the other hand, RLS means that the database restricts which rows of specific tables need to be shown in a given circumstance. Normally, examples of RLS show this by using a different user, but this is not necessary.

In fact, in most modern web applications, a single database user is used for all connections. This has some big benefits (in that a connection to the database can belong to a pool, and be shared by different requests). Luckily, there are other ways to have RLS applied.

One method is to use Postgres’ session variables. This is outlined quite well in Application users vs. Row Level Security. I’m going just use simple session variables, as the facility for doing this will be encapsulated, and based on a key in the Django session - which users cannot set directly. If someone has access to this (or access to setting a Postgres session variable directly, then they have enough access to do whatever they want).

There are some caveats: specifically, the Postgres user must not be a SUPERUSER, but that’s easy to sort out. We’ll be able to continue to use PGBouncers or similar, but only if we use use session pooling (not transaction pooling).


Now, mirroring the previous post, we have a few things that need to happen:

  • We will need some middleware that sets the (postgres) session variable.
  • We may want to have some mechanism for switching tenants (unless a user is tied to a single tenant).
  • We must have a Tenant model of some sort (because we’ll be using foreign keys to this to indicate a given row belongs to a given tenant).
  • We’ll want to be able to enable/force/disable RLS for a given table.
  • We should be able to detect the USING clause (and WITH CHECK clause) for a given table.
  • We must allow the user to overwrite the USING/WITH CHECK clauses for a given table.

It turns out this is much simpler than all of the things that django-boardinghouse needs to do.

It also turns out that we can cascade the USING/WITH CHECK clauses for dependent tables, but we’ll get to that. I’m not sure how well that will perform, but it might be reasonable.


Since all good projects need a clever name, I’ve chosen django-occupation for this one (as a play on multi-tenancy). Thus, you may see the name occupation used in a few places. Also, this will be a strictly Django 2.0+ (and therefore Python3) app!

Let’s start with the easy bits:

# occupation/middleware.py
def ActivateTenant(get_response):
    def middleware(request):
        connection.cursor().execute(
            'SET occupation.active_tenant = %s',
            [request.session.get('active_tenant', '')]
        )
        return get_response(request)
    return middleware

This middleware will set a session variable. Importantly, it always sets this variable, because the rules we will be creating later rely on this being present: exceptions will result from a missing current_setting. Setting it to an empty string will mean that no rows will be returned when no tenant is selected, which is acceptable.

The code for switching tenants is a bit more complicated, and it probably needs to be. It will need some method of detecting if the given user is indeed permitted to switch to the target tenant, which could be dependent on a range of other things. For instance, in our multi-tenant application, an employee needs to be currently (or in the future) employed in order to get access, but some users may get access for other reasons (ie, a Payroll company).

We can use a view that specifically handles this, but with django-boardinghouse I also came up with a middleware that can handle this. There are, in that project, three mechanisms for switching tenants: a query parameter, an HTTP header, and a raw view. The rationalé for this was that a URL (containing a query parameter) could be used to have a permanent link to an object (which works across tenants). The drawback is that it does leak some information (about the tenant id). In practice, having this as a UUID may be nice.

Having a view that switches tenant makes doing a switch (and getting a success code if it works) easy, and having a header might make it easier for an API to switch.

Anyway, we can ignore this requirement for now.

I’ve used the same “swappable” concept in django-boardinghouse that django.contrib.auth uses for swappable user models. This has some nice side effects, but an understanding of how this works is not necessary for understanding what is about to happen next. Instead, let’s look at the definition of some models. Please keep in mind that this is a simplified example, and some parts have been omitted for clarity.

class School(models.Model):
    "This is our Tenant model."
    name = models.CharField(unique=True)

    def __str__(self):
        return self.name


class Student(models.Model):
    name = models.CharField(max_length=128)
    student_number = models.CharField(max_length=16)
    school = models.ForeignKey('School', related_name='students', on_delete=models.CASCADE)

    class Meta:
        unique_together = (
            ('school', 'student_number'),
        )

    def __str__(self):
        return self.name


class Subject(models.Model):
    name = models.CharField(unique=True, max_length=64)

    def __str__(self):
        return self.name

GRADES = [
  # ...
]


class Enrolment(models.Model):
    student = models.ForeignKey(Student, related_name='enrolments', on_delete=models.CASCADE)
    subject = models.ForeignKey(Subject, related_name='enrolments', on_delete=models.CASCADE)
    grade = models.CharField(choices=GRADES, max_length=3, null=True, blank=True)

    def __str__(self):
        if self.grade:
            return '{student} studied {subject}. Grade was {grade}.'.format(
                student=self.student.name,
                subject=self.subject.name,
                grade=self.get_grade_display(),
            )
        return '{student} is enrolled in {subject}.'.format(
            student=self.student.name,
            subject=self.subject.name,
        )

Okay, wall of code done. There are a few things to note about these models:

  • School is the tenant model.
  • Student has a direct relationship to the tenant model. This is a candidate for RLS.
  • Subject has no relationship to the tenant model. This is a non-tenant (ie, global) model. All instances will be visible to all users.
  • Enrolment has a chained relationship to the tenant model. Because of this, it’s likely that this will also be an RLS model (if the prior models in the chain have RLS restrictions).

Now a digression into some mechanics of RLS.

Enabling RLS for a given table is quite simple. We’ll do two a FORCE, because we are probably the table owner, and without FORCE, table owners may view all rows.

ALTER TABLE school_student ENABLE ROW LEVEL SECURITY;
ALTER TABLE school_student FORCE ROW LEVEL SECURITY;

In the case of a student, the user should only be able to view them if they are currently viewing the school the student belongs to:

CREATE POLICY access_tenant_data ON school_student
USING (school_id::TEXT = current_setting('occupation.active_tenant'))
WITH CHECK (school_id::TEXT = current_setting('occupation.active_tenant'))

Notice that we used the current_setting('occupation.active_tenant') that we configured before. As I mentioned, this policy will throw an exception if the setting is not set, so our middleware sets it to an empty string - which should not match any rows.

The other thing that may look out of place is that we are coercing the school_id to a TEXT. This is because current_setting() returns a text value, even if it was set using a number.

So, what does this actually do?

It restricts the query to only rows that match the USING clause (in the case of a SELECT, UPDATE or DELETE), and then ensures that any rows that are being written (in the case of UPDATE or INSERT) meet the same restriction. This prevents a user accidentally (or on purpose) writing a row that they could not currently view.

So, that’s the SQL. Can we generate this in a nice way from our Django models?

CREATE_POLICY = '''
CREATE POLICY access_tenant_data ON {table}
USING ({fk}::TEXT = current_setting('occupation.active_tenant'))
WITH CHECK ({fk}::TEXT = current_setting('occupation.active_tenant'))'''

def build_policy_clause(model):
    for field in model._meta.fields:
        if field.related_model is School:
            return CREATE_POLICY.format(fk=field.db_column, table=model._meta.db_table)

Again, this is simplified. It only works for a direct link, and naïvely assumes the db_column exists. In practice there’s more to it than that. But that will do for now.

So, given our knowledge of our models, we don’t need to enable RLS for our Subject model, but we want to enable it for our Enrolment model. In fact, we will need to - otherwise a user would be able to load up an Enrolment object, but not be able to see the related Student.

In fact, we use this relation (and the fact that the restriction is already applied to all queries) to make our policy for that table simpler:

CREATE POLICY access_tenant_data ON school_enrolment
USING (student_id IN (SELECT id FROM school_student))
WITH CHECK (student_id IN (SELECT id FROM school_student))

Notably, this sort of CHECK happens every time Postgres writes a FOREIGN KEY reference: we need to repeat it because FK references are not subject to RLS, but we basically want to make it so they are.

Interestingly, because of the cascading nature of this configuration, we don’t need to include the current_setting call at all, because that happens in the inner query.

However, it does concern me that this will result in more work in the database. I’ll have to run some tests on larger data sets to see how this performs.

Building up the SQL to use there is slightly more complicated: we need to look at every foreign key on the model and see which of them can trace a chain up to the tenant model. Then we’d need a clause in the USING/WITH CHECK for each of those foreign keys.

I do have some code that does this, but it’s not very pretty.

Also, I’d like to be able to come up with a way that generates this SQL using more of the ORM, but I’m not sure it’s really necessary, since the resulting code is quite simple.

As for applying these changes - the two solutions are to create a RunSQL call for each required statement, and writing this directly to the migration file, or having a migration operation that executes the SQL. I’m not sure which way I’ll drop with that just yet.


I do have a proof of concept for this up and running (code is available at django-occupation). There are still some things I want to figure out.

  • Cross-tenant queries are a thing in my domain - what is the best mechanism for doing this. Should there be a postgres session variable that ignores it, or could we enumerate tenants? That would allow restricted cross-tenant queries.
  • Just how well does this perform at scale?
  • How much of this stuff is not really related to multi-tenancy, and could be extracted out into a more generic RLS package?