Query Zen is no queries at all

Performing no queries is always going to be faster than performing a query.

Today I had two instances of the same problem: I have two tables, one of which essentially stores calculated data based on other data (and data in other tables, or involving a process that uses application code, and cannot be purely determined within the database).

In one case, we have an audit logging table (which is purely handled within postgres) and another related table that stores a string representation of what the audited object looked like according to the application at that point in time, which needs to be calculated after the fact in Django.

The other case stores some cached values that can be calculated in the database: basically some metadata about a shift according to the location that the shift is at. Changes to the shift table will cause this value to automatically be updated, however we have several million shifts that do not currently have this value, but we need to create items for all shifts that currently don’t have the annotation.

In both cases, we have a celery task that will create a (relatively small, to prevent locks and other performance issues) number of the related objects, but only for those that don’t already have one. The tricky bit is that we need to trigger another instance of the celery task if we still have remaining objects in the database that don’t yet have the related item.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if not missing.exists():
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Since we have some 15 million audit logs (so far), it turns out that this missing.exists() was taking several seconds to run. I tried to write an optimised version, but was not able to improve the performance.

Then, it occurred to me (thanks mlt- on #postgres), that we can look at the number of items we created, and see if it was the same as the batch_size. If it was smaller than the batch size, then we know we are up to date, and don’t need to reschedule our task.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if len(created) == batch_size:
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Bingo: since we needed to execute the query to fetch the objects to begin with, we are now doing no extra work to see if we need to run our task again.


The other situation can be done in the database, however a single query of several million rows will block other things from happening, so we want to run the update in batches. There is a trigger on the table so that new or updated rows will already have a value, which actually makes it the same problem, but in SQL:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    JOIN ... ON (...)
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    LIMIT 1000
), step_2 AS (
  ...
),
..., step_N AS (
  ...
)
INSERT INTO annotations (shift_id, ...) SELECT * FROM step_N;

There’s actually a bunch more to this, but it’s not really important: building up the annotations hits a couple of other tables, and I’ve used a CTE because each value is based on a previous annotation.

We can put this query into a task too, but we need some way of determining how many inserts we did. Luckily, Postgres has the RETURNING clause on an INSERT. It would be really nice if we could do:

WITH step_1 AS (...)
INSERT INTO annotations (shift_id, ...)
SELECT * FROM step_N
RETURNING COUNT(*)

Alas, that’s not possible. However, we can just extend our CTE:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    ...
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    -- NOTE: the LIMIT value is a parameter!
    LIMIT %s
),
...,
step_N AS (...),
inserts AS (
  INSERT INTO annotations(shift_id, ...)
  SELECT * FROM step_N
  RETURNING shift_id
)
SELECT COUNT(*) FROM inserts

Then, we can write our celery task:

from django.db import connection

@app.task
def update_missing_annotations(batch_size):
    with connection.cursor() as cursor:
        cursor.execute(QUERY, [batch_size])
        if cursor.fetchone()[0] == batch_size:
            update_missing_annotations.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Partially failing tests

$ ack unittest.expected | grep expect | wc -l
        12

I’m not proud to say it, but we have about (exactly?) a dozen tests in our system that for one reason or another are currently failing. These were determined to not be enough to block development, so they are still there. As sort-of shown in the command line above, it’s possible to mark these tests as “Expected to Fail”. From time to time, I’ll go back and revisit these, and see if there’s anything that we can fix in the short term.

Today, however, I was working on something, and since it was going to touch some of the same code that my big query performance branch is already doing, I decided to work from there. I merged it all in, and wrote some new tests, and updated the queries.

But it turns out that the query performance branch is missing some bits that are required for one of the queries. I didn’t realise this until I’d written the tests, and the migrations to update the queries.

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

I was able to remove the migration easily enough, but I didn’t really want to lose the tests.

I could have just commented them out, but I thought maybe I’d just try to mark those parts as expectedFailure:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        with unittest.expectedFailure:
          # This assertion will fail, but we don't care right now.
          self.assertEqual('baz', qux)

However, this doesn’t work.

Hmm, I thought, what if we put that into a seperate method, not prefixed with test_:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        self.optional_tests()

    @unittest.expectedFailure
    def optional_tests(self):
        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

And this actually works. If you have a failure in the inner method, this will not trigger a failed build, but if you have a failure in the actual test method, then it will.

Hallelujah

I have been listening to Malcolm Gladwell’s podcast Revisionist History. It’s been quite entertaining, but I sometimes do feel that he is a little less than honest with some of his opinions. I know there are people who have strong opinions about Gladwell, but I’m not really one of them. I have read some of his stuff, and find myself sometimes in agreement with his hypothesis, but not always. I think not blindly accepting what he talks about is prudent: but the same is true of everyone.

In episode Hallelujah, the discussion starts with a foray into Elvis Costello, but at some point then pivots and talks about the Leonard Cohen song, covered by almost everyone, but most famously Jeff Buckley. Gladwell’s point is that some songs (or works of art, or other works of genius) are “fully formed”, but others take iteration, and in some cases, a bunch of it.

Specifically, he states that the song Hallelujah had this really long, really unlikely string of events that all needed to happen in order for the song to become recognised as the magical entity that it is. I’m not denying that, but he does miss the point that there are probably many more chains of events that could have resulted in songs just as good as this; maybe even better.

We only know about Hallelujah because in our universe, there was a completed chain of events that lead to the song being released by Buckley, and then his unfortunate demise. I also wonder if perhaps the song may still have become as famous as it did even if he had not drowned: my iTunes library tells me another song of his was in the Triple J Hottest 100 the year before he died.

I’m not doubting at all that Leonard Cohen, Elvis Costello or Jeff Buckley were all musical geniuses: but this selection bias hides the fact that there could be many more musical geniuses out there that wrote songs that did fade into obvlivion.

I’m still glad we got Hallelujah, though.

Sonoff Touch LAN Mode and MQTT

I’ve had a couple of the Sonoff Basic devices for quite some time. It’s fairly easy to solder some header pins onto these which makes flashing the firmware somewhat of a non event, but it’s still a bit of a pain.

The other thing I bought (again, some time ago, but a bit after the Sonoff Basic) is a Sonoff Touch. This is an in-wall light switch replacement, which means you can replace your existing light switches with something that you can control over WiFi. They actually look pretty nice, too.

I wasn’t so keen on mucking around with soldering them, partly because you need to use a 90° header. However, the other day I learned that there is a way to control them (and the Basic) without having to connect to the iTead servers.

When the devices are unable to connect to a remote server (yes, they basically keep a connection to this remote server open 100% of the time, which was part of the rationalé behind flashing the firmware), the go into LAN mode.

When they are in LAN mode, they will respond to WebSocket connections over port 8081, making it easy to control them directly.

In my router (running LEDE), I can set a specific range of IP addresses to be unable to connect out to the internet, and then all I need to do is make sure the devices get one of these IP addresses.

The configuration process is something like:

  • Touch the switch toggle for 7 seconds. This puts it into pairing mode, where it acts as an Access Point (AP).
  • Connect to the new WiFi network ITEAD_100000xxxxxx.
  • Get the MAC address of the device at 10.10.7.1
  • Tell the router to reserve an IP address in the required range for this MAC address.
  • POST data to the device (10.10.7.1) with a JSON object that contains the WiFi credentials. This will trigger the device to disconnect the AP, and connect to the WiFi network. It’s also possible at this time to tell it to connect to a different server (which I may do instead at some point, but this method was quicker for now).
  • Connected to your WiFi, send JSON messages over a WebSocket connection to the device (at it’s fixed IP address).

I’m hoping at some point to automate this, but it’s meaningless to do so until I get a bunch more devices.


So, on to the software.

Ultimately, the plan is to control these devices using HomeKit. I started writing a direct bridge (similar to my MQTT HomeKit bridge), but then decided it would be simpler to just bridge to MQTT - I could then use the correct topic names and values to allow it to interact with that MQTT HomeKit bridge.

There’s really only two things to do:

  • Connect to the Sonoff device, and wait for events from there as to the switch state. Push these changes to our MQTT topic.
  • Connect to the MQTT broker, and subscribe to our topic. When we get events, push these to the Sonoff device.

I attempted to play around with asyncio to get this to work, but I can’t remember enough about how to use it, so I went for an easier (for me) solution.

At this stage, it’s just a single Sonoff being controlled.

import json
import time
import enum

from websocket import create_connection
from paho.mqtt import client as mqtt

API_KEY = 'bba2e54d-7202-4a75-bd26-307597a1dd7d'
TOPIC = 'HomeKit/sonoff-{}/Lightbulb/On'


class State(enum.Enum):
    ON = 'on'
    OFF = 'off'

    @classmethod
    def parse(cls, data):
        if data in [cls.ON.value, True, 'true', 1, '1']:
            return cls.ON
        elif data in [cls.OFF.value, False, 'false', 0, '0']:
            return cls.OFF
        value = json.loads(data)['params']['switch']
        if value == cls.ON.value:
            return cls.ON
        return cls.OFF

    def __invert__(self):
        if self == State.ON:
            return State.OFF
        return State.ON

    def __bool__(self):
        return self == State.ON


class Sonoff:
    def __init__(self, host):
        self._state = None
        timestamp = str(time.time()).replace('.', '')
        self.ws = create_connection('ws://{}:8081/'.format(host))
        self.ws.send(json.dumps({
            'action': 'userOnline',
            'ts': timestamp,
            'version': 6,
            'apikey': API_KEY,
            'sequence': timestamp,
            'userAgent': 'HomeKit'
        }))
        self.deviceid = json.loads(self.ws.recv())['deviceid']
        print('Connectod to {}'.format(self.deviceid))

        self.client = mqtt.Client()
        self.client.on_connect = self.mqtt_init
        self.client.on_message = self.handle_mqtt_message
        self.client.connect('mqtt.lan', 1883, 60)

        self.state = State.parse(self.ws.recv())
        print('Current state is {}'.format(self._state.name))

    @property
    def topic(self):
        return TOPIC.format(self.deviceid)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value: State):
        if value == self.state:
            return
        timestamp = (str(time.time())).replace('.', '')
        self.ws.send(json.dumps({
            "action": "update",
            "deviceid": "nonce",
            "apikey": "nonce",
            "selfApikey": "nonce",
            "params": {
                "switch": value.value
            },
            "sequence": timestamp,
            "userAgent": "app"
        }))
        self._state = value
        self.client.publish(self.topic, int(bool(self.state)), retain=1)

    def on(self):
        self.state = State.ON

    def off(self):
        self.state = State.OFF

    def toggle(self):
        self.state = ~self.state

    def wait_for_ws(self):
        result = self.ws.recv()
        if 'switch' in result:
            self.state = State.parse(result)

    def handle_mqtt_message(self, client, userdata, message):
        self.state = State.parse(message.payload.decode())

    def mqtt_init(self, client, userdata, flags, rc):
        client.subscribe(self.topic)
        print("Subscribed to {}".format(self.topic))

    def start(self):
        self.client.loop_start()
        try:
            while True:
                time.sleep(0.01)
                self.wait_for_ws()
        finally:
            self.client.loop_stop()


if __name__ == '__main__':
    sonoff = Sonoff('10.1.10.140')
    sonoff.start()

I’m still not totally happy with the State stuff: I think I’ll use a simpler mapping there. But this works, and integrates nicely with my MQTT HomeKit bridge.

Postgres ALTER TABLE ... USING

Helped out a person in #django IRC some time ago, and learned something new about Postgres.

I’ve had data type migrations in the past (where you change a database column in some way, and need to alter the data that is already in that column), however I’ve created a function to do so.

It turns out you can just write an expression, and that works too:

  ALTER TABLE defect_defect
 ALTER COLUMN risk_rating
          SET DATA TYPE INTEGER
              USING CASE WHEN risk_rating = 'HIGH'   THEN 3
                         WHEN risk_rating = 'MEDIUM' THEN 2
                         WHEN risk_rating = 'LOW'    THEN 1;

Of course, this still would cause issues if you had code running from the old version (that expected the text values). However, it is nice to know.

Keyset Pagination in Django

Pagination is great. Nothing worse than having an HTML page that renders 25000 rows in a table.

Django Pagination is also great. It makes it super easy to declare that a view (that inherits from MultipleObjectMixin) should paginate its results:

class List(ListView):
    queryset = Foo.objects.order_by('bar', '-baz')
    paginate_by = 10
    template_name = 'foo.html'

Django pagination uses the LIMIT/OFFSET method. This is fine for smaller offsets, but once you start getting beyond a few pages, it can perform really badly. This is because the database needs to fetch all of the previous rows, even though it discards them.

Using Keyset Pagination allows for better performing “next page” fetches, at the cost of not being able to randomly fetch a page. That is, if you know the last element from page N-1, then you may fetch page N, but otherwise you really can’t.

Keyset Pagination, sometimes called the Seek Method, has been documented by Markus Winand and Joe Nelson. If you are not familiar with the concept, I strongly suggest you read the articles above.

Django’s pagination is somewhat pluggable: you may switch out the paginator in a Django ListView, for instance, allowing you to do things like switch the Page class, or how the various parts are computed. I used it recently to allow for a different query to be used when calculating the total number of objects in a queryset, to vastly improve performance of a particular paginated queryset.

However, there are limits. Both the view and the paginator expect (nay, demand) an integer page number, which, as we shall see shortly, will not work in this case. I also feel like the view is over-reaching it’s remit by casting the page number to an integer, as I’ll discuss below.

In order to get consistent results with any type of pagination, you must ensure that the ordering on the queryset is stable: that is, there are no rows that will be ‘tied’. Doing otherwise will mean that the database will “break the tie”, and not always in the same order. I’ve seen a bug that was extremely hard to track down that was caused by exactly this problem (and that was just with OFFSET pagination).

Often, to ensure stable ordering, the primary key is used as the “last” sort column. This is perfectly valid, but is not always necessary.

Because in many cases we will need to sort by multiple columns, we’ll need some mechanism for passing through to the paginator the “last value” in a given page for each of these columns. For instance, if we are ordering by timestamp and then group, we would need to pass through both the timestamp and the group of the last object. Because I like to use GET forms to allow me to paginate filtered results, I’ll want to have all of the values combined into one query parameter. If you were constructing links instead, you could look at having these as different parameters. However, you’d still need to be careful, because you aren’t filtering all results on these parameters. Having them serialised into a single parameter (using JSON) means that they are all in the one place, and you can just use that for the filtering to get the page results.

I’ve built a working implementation of keyset pagination, at least for forwards traversal, at django-keyset-pagination.

We can see from this that there really is not that much that we needed to do. We use a different Page object, which enables us to change what the next_page_number will generate. When I figure out how, it will also allow us to work out the previous_page_number

Likewise, we needed to change how we validate a page number, and how we fetch results for a page. That method, _get_page(number) is the one that does most of the work.

Ultimately, we wind up with a filter that looks like:

  WHERE (A < ?) OR (A = ? AND B > ?) OR (A = ? AND B = ? AND C < ?)

The direction of the test (< vs >) depends upon the sorting of that column, but hopefully you get the idea.

In order to enable the query planner to be able to use an index effectively (if one exists), we need to adjust this to (thanks Markus):

WHERE A <= ? AND (
  (A < ?) OR (A = ? AND B > ?) ...
)

It’s also possible, in Postgres at least, to use a ROW() constructor comparison to order rows. However, this only works if the direction of each column ordering is the same, which in my use case it was not. I have a proof of concept of using ROW() constructors, but I need to figure out how to detect if they are available to the database in use.

In order to use the new paginator, we need to work around some issues in the Django class based views: namely that they force an integer value (or use the special string last, neither of which are acceptable in this case):

class PaginateMixin(object):
    "Make pagination work for non integer page numbers"
    def paginate_queryset(self, queryset, page_size):
        # This is very similar to how django currently (2.1) does it: I may submit a PR to use this
        # mechanism instead, as it is more flexible.
        paginator = self.get_paginator(
            queryset, page_size, orphans=self.get_paginate_orphans(),
            allow_empty_first_page=self.get_allow_empty()
        )
        page_kwarg = self.page_kwarg
        page = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1

        try:
            page_number = paginator.validate_number(page_number)
        except ValueError:
            raise Http404(_('Page could not be parsed.'))

        try:
            page = paginator.page(page_number)
            return (paginator, page, page.object_list, page.has_other_pages())
        except InvalidPage as e:
            raise Http404(
                _('Invalid page (%(page_number)s): %(message)s') % {
                    'page_number': page_number,
                    'message': str(e)
                }
            )

There’s really only one change: instead of just casting the page number to an integer, we let the paginator handle that.

Okay, once all that is done, we can use our paginator:

class List(PaginateMixin, ListView):
    paginator_class = KeysetPaginator
    paginate_by = 10

    def get_queryset(self):
        return Foo.objects.order_by('timestamp', 'bar', 'baz')

We’ll need to change our template rendering to only render a next page link or button, rather than trying to render them for each page. We also don’t have any way to return to the previous page: I’m still working through a mechanism for that.


This post was originally written using the ROW() constructor, and this part of the post discussed the shortcomings. Now that has been resolved, the main shortcoming is that it is not yet possible to traverse to the previous page of results. In many cases that may not be necessary (we could use a browser’s back button, or rely on the fact that if it’s infinite scrolling the data is already in the document), however I would like to investigate how hard it is to actually get the previous page.

Django ComputedField()

A very common pattern, at least in code that I’ve written (and read) is to annotate on a field that uses an expression that is based on one or more other fields. This could then be used to filter the objects, or just in some other way.

The usual method of doing this is:

from django.db import models
from django.db.models.expressions import F, Value
from django.db.models.function import Concat


class PersonQuerySet(models.query.QuerySet):
    def with_name(self):
        return self.annotate(
            name=Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()),
        )


class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

    objects = PersonQuerySet.as_manager()

Yes, I’m aware of falsehoods programmers believe about names, but this is an easy-to-follow example.

In order to be able to access the name field, we must use the with_name() queryset method. This is usually okay, but if it is something that we almost always want, it can be a little tiresome. Alternatively, you could override the get_queryset() method of a custom manager, but that makes it somewhat surprising to a reader of the code. There are also some places where a custom manager will not automatically be used, or where it will be cumbersome to include the fields from a custom manager (select_related, for instance).

It would be much nicer if we could write the field declaratively, and have it use the normal django mechanism of defer and only to remove it from the query if required.

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    name = ComputedField(Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()))

I’ve spent some time digging around in the django source code, and have a fairly reasonable understanding of how fields work, and how queries are built up. But I did wonder how close to a working proof of concept of this type of field we could get without having to change any of the django source code. After all, I was able to backport the entire Subquery expression stuff to older versions of django after writing that. It would be nice to repeat the process here.

There are a few things you need to do to get this to work:

  • store the expression
  • prevent the field from creating a migration
  • ensure the field knows how to interpret data from the database
  • ensure the field adds the expression to it’s serialised version
  • prevent the field from writing data back to the database
  • inject the expression into the query instead of the field name
class ComputedField(models.Field):
    def __init__(self, expression, *args, **kwargs):
        self.expression = expression.copy()
        kwargs.update(editable=False)
        super().__init__(*args, **kwargs)

There is already a mechanism for a field to prevent a migration operation from being generated: it can return a db_type of None.

    def db_type(self, connection):
        return None

We can delegate the responsibility of interpreting the data from the database to the output field of the expression - that’s how it works in the normal operation of expressions.

    def from_db_value(self, value, expression, connection):
        return self.expression.output_field.from_db_value(value, expression, connection)

Storing the expression in the serialised version of a field is explained in the documentation on custom fields:

    def deconstruct(self):
        name, path, args, kwargs = super().deconstruct()
        return name, path, [self.expression] + args, kwargs

To prevent the field from being included in the data we write back to the database turned out to be fairly tricky. There are a couple of mechanisms that can be used, but ultimately the only one that worked in the way I needed was something that is used by the inheritance mechanism. We have to indicate that it is a “private” field. I’m not 100% sure of what the other implications of this might be, but the outcome of making this field private is that it no longer appears in the list of local fields. There is one drawback to this, which I’ll discuss below.

    def contribute_to_class(self, cls, name, private_only=False):
        return super().contribute_to_class(cls, name, True)

So, we only have one task to complete. How do we inject the expression into the query instead of the column?

When django evaluates a queryset, it look at the annotations, and the expressions that are in these. It will then “resolve” these expressions (which means the expression gets told which “query” is being used to evaluate it, allowing it to do whatever it needs to do to make things work).

When a regular field is encountered, it is not resolved: instead it is turned into a Col. This happens in a few different places, but the problem is that a Col should not need to know which query it belongs to: at most it needs to know what the aliased table name is. So, we don’t have a query object we can pass to the resolve_expression method of our expression.

Instead, we’ll need to use Python’s introspection to look up the stack until we find a place that has a reference to this query.

    def get_col(self, alias, output_field=None):
        import inspect

        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        col = self.expression.resolve_expression(query=query)
        col.target = self
        return col

So, how does this code actually work? We go through each frame in the stack, and look for a function (or method, but they are really just functions in python) that matches one of the types we know about that have a reference to the query. Then, we grab that, stop iterating and resolve our expression. We have to set the “target” of our resolved expression to the original field, which is how the Col interface works.

This moves the resolve_expression into the get_col, which is where it needs to be. The (resolved) expression is used as the faked column, and it knows how to generate it’s own SQL, which will be put into the query in the correct location.

And this works, almost.

There is one more situation that needs to be taken into account: when we are referencing the field through a join (the x__y lookup syntax you often see in django filters).

Because F() expressions reference the local query, we need to first turn any of these that we find in our computed field’s expression (at any level) into a Col that refers to the correct model. We need to do this before the resolve_expression takes place.

    def get_col(self, alias, output_field=None):
        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        def resolve_f(expression):
            if hasattr(expression, 'get_source_expressions'):
                expression = expression.copy()
                expression.set_source_expressions([
                  resolve_f(expr) for expr in expression.get_source_expressions()
                ])
            if isinstance(expression, models.F):
                field = self.model._meta.get_field(expression.name)
                if hasattr(field, 'expression'):
                    return resolve_f(field.expression)
                return Col(alias, field)
            return expression

        col = resolve_f(self.expression).resolve_expression(query=query)
        col.target = self
        return col

There is a repo containing this, which has a bunch of tests showing how the different query types can use the computed field:

https://github.com/schinckel/django-computed-field


But wait, there is one more thing…

A very common requirement, especially if you are planning on using this column for filtering, would be to stick an index on there.

Unfortunately, that’s not currently possible: the mechanism for preventing the field name from being in the write queries, making it a private field, prevents using this field in an index. Anyway, function/expression indexes are not currently supported in Django.

It’s not all bad news though: Markus has a Pull Request that will enable this feauture; from there we could (if db_index is set) automatically add an expression index to Model._meta.indexes in contribute_to_class, but it would also be great to be able to use index_together.

I suspect to get that, though, we’ll need another mechansim to prevent it being in the write queries, but still be a local field.

(Thanks to FunkyBob for suggestions, including suggesting the field at all).

Set-returning and row-accepting functions in Django and Postgres

Postgres set-returning functions are an awesome thing. With them, you can do fun things like unnesting and array, and will end up with a new row for each item in the array. For example:

class Post(models.Model):
    author = models.ForeignKey(AUTH_USER_MODEL, related_name='posts')
    tags = ArrayField(base_field=TextField(), null=True, blank=True)
    created_at = models.DateTimeField()
    content = models.TextField()

The equivalent SQL might be something like:

CREATE TABLE blog_post (
  id SERIAL NOT NULL PRIMARY KEY,
  author_id INTEGER NOT NULL REFERENCES auth_user (id),
  tags TEXT[],
  created_at TIMESTAMPTZ NOT NULL,
  content TEXT NOT NULL
);

We can “explode” the table so that we have one tag per row:

SELECT author_id, UNNEST(tags) AS tag, created_at, content
FROM blog_post;

To do the same sort of thing in Django, we can use a Func:

from django.db.models import F, Func

Post.objects.annotate(tag=Func(F('tags'), function='UNNEST'))

In practice, just like in the Django docs, I’ll create a convenience function:

class Unnest(Func):
    function = 'UNNEST'

    @property
    def output_field(self):
        output_fields = [x.output_field for x in self.get_source_expressions()]
        if len(output_fields) == 1:
          return output_fields[0].base_field

        return super(Unnest, self).output_field

The opposite of this is aggregation: in the case of UNNEST, it’s almost ARRAY_AGG, although because of the handling of nested arrays, this doesn’t quite round-trip. We already know how to do aggregation in Django, so I will not discuss that here.

Hovewer, there is another related operation: what if you want to turn a row into something else. In my case, this was turning a row from a result into a JSON object.

SELECT id,
       to_jsonb(myapp_mymodel) - 'id' AS "json"
  FROM myapp_mymodel

This will get all of the columns except ‘id’, and put them into a new column called “json”.

But how do we get Django to output SQL that will enable us to use a Model as the argument to a function? Ultimately, we want to get to the following:

class ToJSONB(Func):
    function = 'TO_JSONB'
    output_field = JSONField()


MyModel.objects.annotate(
  json=ToJSONB(MyModel) - Value('id')
).values('id', 'json')

Our first attempt could be to use RawSQL. However, this has a couple of problems. The first is that we are writing lots of raw SQL, the second is that it won’t work so well if the table is aliased by the ORM. That is, if you use this in a join or subquery, where Django automatically assigns an alias to this table, then referring directly to the table name will not work.

MyModel.objects.annotate(json=Raw("to_jsonb(myapp_mymodel) - 'id'", [], output_field=JSONField()))

Instead, we need to dynamically find out what the current alias for the model is in this query, and use that. We’ll also want to figure out how to “subtract” the id key from the JSON object.

class Table(django.db.models.Expression):
    def __init__(self, model, *args, **kwargs):
        self.model = model
        self.query = None
        super(Table, self).__init__(*args, **kwargs)

    def resolve_expression(self, query, *args, **kwargs):
        clone = super(Table, self).resolve_expression(query, *args, **kwargs)
        clone.query = query
        return clone

    def as_sql(self, compiler, connection, **kwargs):
        if not self.query:
            raise ValueError('Unresolved Table expression')
        alias = self.query.table_map.get(self.model._meta.db_table, [self.model._meta.db_table])[0]
        return compiler.quote_name_unless_alias(alias), []

Okay, there’s a fair bit going on there. Let’s look through it. We’ll start with how we use it:

MyModel.objects.annotate(json=ToJSONB(Table(MyModel)))

We create a Table instance, which stores a reference to the model. Technically, all we need later on is the database table name that will be used, but we’ll keep the model for now.

When the ORM “resolves” the queryset, we grab the query object, and store a reference to that.

When the ORM asks us to generate some SQL, we look at the query object we have a reference to, and see if our model’s table name has an entry in the table_map dict: if so, we get the first entry from that, otherwise we just use the table name.

Okay, what about being able to remove the entry in the JSONB object for ‘id’?

We can’t just use the subtraction operator, because Postgres will try to convert the RHS value into JSONB first, and fail. So, we need to ensure it renders it as TEXT. We also need to wrap it in an ExpressionWrapper, so we can indicate what the output field type will be:

id_value = models.Func(models.Value('id'), template='%(expressions)s::TEXT')
MyModel.objects.annotate(
    json=ExpressionWrapper(
        ToJSONB(Table(MyModel)) - id_value, output_field=JSONField()
    )
)

I also often use a convenience Cast function, that automatically does this based on the supplied output_field, but this is a little easier to use here. Note there is a possible use for ToJSONB in a different context, where it doesn’t take a table, but some other primitive.


There’s one more way we can use this construct: the geo_unique_indexer function from a previous post needs a table name, but also the name of a field to omit from the index. So, we can wrap this up nicely:

class GeoMatch(models.Func):
    function = 'geo_unique_indexer'
    output_field = JSONField()

    def __init__(self, model, *args, **kwargs):
        table = Table(model)
        pk = models.Value(model._meta.pk.db_column or model._meta.pk.name)
        return super(GeoMatch, self).__init__(table, pk, *args, **kwargs)

This is really tidy: it takes the model class (or maybe an instance, I didn’t try), and builds a Table, and gets the primary key. These are just used as the arguments for the function, and then it all works.

Best match in hierarchy using Postgres

The other day, I had an issue where I needed to prevent duplicates in a subset of nullable columns for the purpose of modelling different regional breakdowns; I came up with quite a neat solution that uses JSONB functions.

What I didn’t realise until later was how this same function can be used to detect the “best match” of an object within these rows.

In this case, the best match is the row that has the most specificity (ie, the highest number of non-null values), but does not have any values that differ in the target object.

For instance, given the data (and the function defined in the aforementioned post):

{"country": "AU"}
{"country": "AU", "state": "SA"}
{"country": "NZ"}

We would want an object that is within Australia (AU) to match the first row, unless it also was within South Australia (SA), when it should match the second row.

We can use the json containment operators to query this match:

SELECT geo_unique_indexer(geo_table, 'id')
  FROM geo_table
 WHERE geo_unique_indexer(geo_table, 'id') <@ '{"country": "AU", "state": "SA"}';

Okay, that’s really close: it shows all “parent” data:

{"country": "AU"}
{"country": "AU", "state": "SA"}

So, we need to ensure that only the most specific one of these is actually returned:

SELECT geo_unique_indexer(geo_table, 'id')
  FROM geo_table
 WHERE geo_unique_indexer(geo_table, 'id') <@ '{"country": "AU", "state": "SA"}';
 ORDER BY (SELECT count(*) FROM JSONB_OBJECT_KEYS(geo_unique_indexer(geo_table, 'id'))) DESC
 LIMIT 1;

And, presto, we have our best match:

{"country": "AU", "state": "SA"}

Postgres multi-column unique index

Consider something that requires a “best match” facility, based on the geographic location. There may be a set of behaviours that apply when an object is in Australia, and a different set within the USA. If this is all you have, then it’s simple to just have those behaviours based on a single country column in your database.

But now consider that in some circumstances, it’s possible to have behaviours within a single state of Australia that should override the nationwide behaviours. For instance, Western Australia may just use the Australian rules, but South Australia has a custom set of rules. Likewise, California may use a custom set of rules, but Hawaii just uses the inherited USA rules.

To ensure that a consistent set of rules is applied, there could only be a single “Australia” rule set, but there could be an “Australia+South Australia” set stored. This could also be modelled, in a slightly more complicated fashion, by having two columns, country, and state, and requiring country to be unique if state is NULL, and both country and state to be unique together.

Now, consider that there could be a whole stack more levels, and some of these levels have different names (and different orders in the hierarchy) in different countries. For instance, New Zealand has regions, but Great Britian has Nations (England, Northern Ireland, Scotland and Wales), but also counties (or council areas in Scotland).

Django Localflavor has a whole bunch of these divisions, but there are only nine “names” for these:

  • nation
  • state
  • province
  • provincial district
  • prefecture
  • region
  • county
  • department
  • municipality
CREATE TABLE geo_table (
  id SERIAL PRIMARY KEY,
  country VARCHAR(2) NOT NULL,
  county TEXT,
  department TEXT,
  municipality TEXT,
  nation TEXT,
  prefecture TEXT,
  province TEXT,
  provincial_district TEXT,
  region TEXT,
  state TEXT
);
INSERT INTO geo_table(country, state)
VALUES ('AU', NULL), ('AU', 'SA');

So, to prevent having data in our database that could result in an inability to determine which rule set we should use for a given object, we want to have the following:

A unique constraint that applies to all columns that are not NULL

It could be possible to model this with a number of different unique constraints, but that doesn’t seem like heaps of fun to deal with.

However, postgres allows us to define “functional” indices, that is, they apply a function to some columns from the row, and use that as the stored value in the index.

We can write an expression that takes the whole row, turns it into JSON, removes the primary key column, and then any columns that are NULL, and use that as the index.

SELECT jsonb_strip_nulls(
         to_jsonb(geo_table) - 'id'::TEXT
       )
  FROM geo_table;
  {"country": "AU"}
  {"country": "AU", "state": "SA"}

So, that looks pretty good, but what’s going on here?

In this case, we take the row, and turn it into a JSONB object, remove the primary key (in this case, called id), and then remove any keys that have a NULL value.

Now, we can’t just use this expression in an index, because to_jsonb (and row_to_json) are not IMMUTABLE, because the value they turn a TIMESTAMPTZ into depends upon something that may not be fixed. However, because our data will only contain strings, which can be turned reliably into the same JSON value regardless of anything else. So, we’ll need to create a function:

CREATE OR REPLACE FUNCTION geo_unique_indexer(anyelement, pk TEXT)
RETURNS jsonb AS $$

    SELECT jsonb_strip_nulls(to_jsonb($1) - pk);

$$
LANGUAGE SQL IMMUTABLE;

You’ll notice that I had to use anyelement, as it’s not possible to have an SQL function refer to a record type. I’ve also made it configurable what the primary key column is, so it doesn’t depend on using the column “id”.

Finally, we can create the index:

CREATE UNIQUE INDEX geo_unique_match ON geo_table(
  geo_unique_indexer(geo_table, 'id')
);

Okay, that looks good. Now we can try to insert some values that should not be permitted. We already have a row with just Australia:

INSERT INTO geo_table(country, state) VALUES ('AU', NULL);
ERROR:  duplicate key value violates unique constraint "geo_unique_match"
DETAIL:  Key (geo_unique_indexer(geo_table.*, 'id'::text))=({"country": "AU"}) already exists.

…and another row with both Australia and SA.

INSERT INTO geo_table(country, state) VALUES ('AU', 'SA');
ERROR:  duplicate key value violates unique constraint "geo_unique_match"
DETAIL:  Key (geo_unique_indexer(geo_table.*, 'id'::text))=({"country": "AU", "state": "SA"}) already exists.

Finally, this one should succeed:

INSERT INTO geo_table(country, state) VALUES ('AU', 'NSW');

Excellent.