Apple Watch Stand Hours

As of watchOS 6/iOS 13, I’ve noticed some inconsistencies between stand hours and stand minutes in Health/Activity.

Specifically, there are often hours where, inside Health.app, I can see that I have at least one stand minute, but when I view in Activity or Health Stand Hours, there is no stand hour.

This morning, I think I figured it out.

I got up, as I normally do, at 6:55. This is my life hack to ensure I get a stand hour at 6-7 am. I went about my normal routine, making sure I moved enough before 7am to trigger the stand hour. It did.

At around 7:15, I happened to look at my watch again. I still only had one stand hour. I know I had moved around in the previous 15 minutes, so I went into stand minutes. Yep, there was a stand minute after 7am.

At about 7:35, I glanced at my watch again, and noticed there was a second stand hour.

I live in a place that has a non-integer number of hours offset from UTC (+09:30, +10:30 DST). I believe that stand hours are calculated based on UTC, and then offset for the current timezone, wheras it used to actually use the hour within the timezone.

My workaround is to make sure I have a stand minute within both the first and second half-hours of every hour (which can be helped by having half-hourly chimes enabled on the watch).

Speeding up Postgres using a RAM disk?

I happened to come across Speeding up Django unit tests with SQLite, keepdb and /dev/shm today. I can’t quite do that, because we use a bunch of Postgres specific things in our project, but it did make me think “Can we run Postgres on a RAM disk?”

Following up on this, it turns out that using a TABLESPACE in postgres that is on a RAM disk is a really bad idea, but it should be possible to run a seperate database cluster (on a different port) that could have the whole cluster on the database.

It’s possible to create a RAM disk on macOS, and init a cluster there:

#! /bin/sh

PG_CTL=/Applications/Postgres.app/Contents/Versions/10/bin/pg_ctl

new_disk=$(hdid -nomount ram://1280000)
newfs_hfs $new_disk
mkdir /tmp/ramdisk
mount -t hfs $new_disk /tmp/ramdisk
mkdir /tmp/ramdisk/pg

PG_CTL initdb -D /tmp/ramdisk/pg

This needs to be run once, after bootup (and probably before Postgres.app starts it’s servers). You’d also need to do the setup in Postgres.app to show where it is (alternatively, you could just start it on the port you want in the script above).

But, is it faster? Let’s look at the results. The “Initial run” is the first run of tests after deleting the test database.

Database location Initial run Keepdb runs
Disk 6m39s 0m16s
0m19s
0m15s
RAM 6m11s 0m26s
0m17s
0m15s

So, it’s probably not actually worth it. I’m guessing that the variance is more to do with just how busy my machine was at each of the times I ran the test.

Postgres ENUM types in Django

Postgres has the ability to create custom types. There are several kinds of CREATE TYPE statement:

  • composite types
  • domain types
  • range types
  • base types
  • enumerated types

I’ve used a metaclass that is based on Django’s Model classes to do Composite Types in the past, and it’s been working fairly well. The current stuff I have been working on made sense to use an Enumerated Type, because there are four possible values, and having a human readable version of them is going to be nicer than using a lookup table.

In the first iteration, I used just a TEXT column to store the data. However, when I then started to use an enum.Enum class for handling the values in python, I discovered that it was actually storing str(value) in the database, rather than value.value.

So, I thought I would implement something similar to my Composite Type class. Not long after starting, I realised that I could make a cleaner implementation (and easier to declare) using a decorator:

@register_enum(db_type='change_type')
class ChangeType(enum.Enum):
    ADDED = 'added'
    CHANGED = 'changed'
    REMOVED = 'removed'
    CANCELLED = 'cancelled'


ChangeType.choices = [
    (ChangeType.ADDED, _('hours added')),
    (ChangeType.REMOVED, _('hours subtracted')),
    (ChangeType.CHANGED, _('start/finish changed with no loss of hours')),
    (ChangeType.CANCELLED, _('shift cancelled')),
]

Because I’m still on an older version of Python/Django, I could not use the brand new Enumeration types, so in order to make things a bit easier, I then annotate onto the class some extra helpers. It’s important to do this after declaring the class, because otherwise the attributes you define will become “members” of the enumeration. When I move to Django 3.0, I’ll probably try to update this register_enum decorator to work with those classes.

So, let’s get down to business with the decorator. I spent quite some time trying to get it to work using wrapt, before realising that I didn’t actually need to use it. In this case, the decorator is only valid for decorating classes, and we just add things onto the class (and register some things), so it can just return the new class, rather than having to muck around with docstrings and names.

from psycopg2.extensions import (
    new_array_type,
    new_type,
    QuotedString,
    register_adapter,
    register_type,
)
known_types = set()


CREATE_TYPE = 'CREATE TYPE {0} AS ENUM ({1})'
SELECT_OIDS = 'SELECT %s::regtype::oid AS "oid", %s::regtype::oid AS "array_oid"'


class register_enum(object):
    def __init__(self, db_type, managed=True):
        self.db_type = db_type
        self.array_type = '{}[]'.format(db_type)
        self.managed = managed

    def __call__(self, cls):
        # Tell psycopg2 how to turn values of this class into db-ready values.
        register_adapter(cls, lambda value: QuotedString(value.value))

        # Store a reference to this instance's "register" method, which allows
        # us to do the magic to turn database values into this enum type.
        known_types.add(self.register)

        self.values = [
            member.value
            for member in cls.__members__.values()
        ]

        # We need to keep a reference to the new class around, so we can use it later.
        self.cls = cls

        return cls

    def register(self, connection):
        with connection.cursor() as cursor:
            try:
                cursor.execute(SELECT_OIDS, [self.db_type, self.array_type])
                oid, array_oid = cursor.fetchone()
            except ProgrammingError:
                if self.managed:
                    cursor.execute(self.create_enum(connection), self.values)
                else:
                    return

        custom_type = new_type(
            (oid,),
            self.db_type,
            lambda data, cursor: data and self.cls(data) or None
        )
        custom_array = new_array_type(
            (array_oid,),
            self.array_type,
            custom_type
        )
        register_type(custom_type, cursor.connection)
        register_type(custom_array, cursor.connection)

    def create_enum(self, connection):
        qn = connection.ops.quote_name
        return CREATE_TYPE.format(
            qn(self.db_type),
            ', '.join(['%s' for value in self.values])
        )

I’ve extracted out the create_enum method, because it’s then possible to use this in a migration (but I’m not totally happy with the code that generates this migration operation just yet). I also have other code that dynamically creates classes for a ModelField and FormField as attributes on the Enum subclass, but that complicates it a bunch.

Testing WizardView in Django

Sometimes, you just have to resort to a multi-step view to simplify data entry for a user. Django had an “included” package for this: django.contrib.formtools, until some years ago, when it moved into it’s own repository. It’s still very useful though, although there is a bit of a learning curve.

I have some multi-step wizard views, for things where there are a bunch of different (sometimes dependent, but not necessarily) things we need to get from the user, and commit in an all-or-nothing way.

Writing tests for these views is quite cumbersome, but I’ve recently come up with a couple of things that can make this less painful. The reason it’s somewhat difficult is that the wizard, and the current step form both have form data, so to ensure there are no clashes between name fields, the forms are all prefixed. However, prefixes on forms mean that the data you need to pass to the form must also be prefixed in the correct way.

class TestWizard(TestCase):
    def test_wizard_view(self):
        # user has been created...

        self.client.force_login(user)

        # url is the url for this wizard view.

        response = self.client.get(url)
        self.assertEqual(200, response.status_code)

        response = self.client.post(url, {
            'wizard-current_step': 'first',
            'first-field_a': 'value',
            'first-field_b': 'value',
            'first-field_c': 'value',
        })
        self.assertEqual(200, response.status_code)
        # Because django by default returns a 200 on a form validation error,
        # we need another way to check this. If this is a re-rendering of the
        # same form it will be bound, otherwise it will not be.
        self.assertFalse(response.context['form'].is_bound)
        self.assertEqual(
            response.context['wizard']['management_form']['current_step'].value(),
            'second'
        )

        # This next one has a formset instead of a form!
        response = self.client.post(url, {
          'wizard-current_step': 'second',
          'second-0-field_a': 'a',
          'second-0-field_b': 'a',
          'second-1-field_a': 'a',
          'second-1-field_b': 'a',
          'second-INITIAL_FORMS': '0',
          'second-TOTAL_FORMS': '2'
        })
        self.assertEqual(200, response.status_code)
        self.assertFalse(response.context['form'].is_bound)
        # Okay, I'm bored with this already!

It would be much nicer if we could just do something like:

class TestWizard(TestCase):
    def test_wizard_view(self):
        # ...
        self.client.force_login(user)

        test_wizard(self, url, 'wizard', [
          ('first', {'field_a': 'value', 'field_b': 'value', ...}),
          ('second', [
            {'field_a': 'a', 'field_b': 'b'},
            {'field_a', 'a', 'field_b': 'a'},
          ])
        ])

The last part, the formset handling stuff is something I have touched on before, and I’m not sure if I’ll get to that bit now, but the first part is still useful.

def test_wizard(test, url, name, data):
    """
    Using the supplied TestCase, execute the wizard view installed at
    the "url", having the given name, with each item of data.
    """

    test.assertEqual(200, test.client.get(url).status_code)
    for step, step_data in data:
      step_data = {
        '{}-{}'.format(step, key): value
        for key, value in step_data.items()
      }
      step_data['{}-current_step'.format(name)] = step
      response = test.client.post(url, step_data, follow=True)
      test.assertEqual(200, response.status_code)
      if 'form' in response.context:
          test.assertFalse(response.context['form'].errors)

We might come back to the formset thing later.


But what about when we have a bunch of steps where we already have initial data, and we mostly just want to click next in each page of the wizard, possibly altering just a value here or there?

def just_click_next(test, url, override_data):
    response = test.client.get(url)
    while response.status_code != 302:
        form = response.context['form']
        test.assertFalse(form.errors)

        wizard = response.context['wizard']
        current_step = wizard['steps'].current

        # If we have a formset, then we need to do custom handling.
        if hasattr(form, 'forms'):
            current_step_data = step_data.get(current_step, [])
            data = {
                '{}-{}'.format(current_step, key): value
                for key, value in form.management_form.initial.items()
            }
            for i, _form in enumerate(form.forms):
                current_form_data = current_step_data[i] if len(current_step_data) > i else {}
                data.update({
                    '{}-{}'.format(_form.prefix, field): current_form_data.get(
                        field,
                        _form.initial.get(field)
                    )
                    for field in _form.fields
                    if field in current_form_data or field in _form.initial
                })
        else:
            current_step_data = step_data.get(current_step, {})
            data = {
                '{}-{}'.format(form.prefix, field): current_step_data.get(
                    field,
                    form.initial.get(field)
                )
                for field in form.fields
                if field in current_step_data or field in form.initial
            }

        # Add the wizard management form step data.
        data['{}-current_step'.format(wizard['management_form'].prefix)] = current_step

        response = test.client.post(url, data)

    return response

Those paying attention may notice that this one actually does handle formsets in a fairly nice way.

So how do we use it?

class TestViews(TestCase):
    def test_new_revision_wizard(self):
        # Setup required for wizard goes here.
        just_click_next(self, url, {
            'date': {'start_date': '2019-01-01'},
            'confirm': {'confirm': 'on'}
        })
        # Make assertions about the wizard having been run.

This replaces some 80 lines of a test: which actually becomes important when we need to test this with a bunch of subtly different preconditions.

Subquery and Subclasses

Being able to use correlated subqueries in the Django ORM arrived in 1.11, and I also backported it to 1.8.

Quite commonly, I am asked questions about how to use these, so here is an attempt to document them further.

There are three classes that are supplied with Django, but it’s easy to write extensions using subclassing.

Let’s first look at an example of how you might want to use the included classes. We’ll consider a set of temperature sensors, each with a name and a code, both of which are unique. These sensors will log their current temperature at some sort of interval: maybe it’s regular, maybe it varies between devices. We want to keep every reading, but want to only allow one reading for a given sensor+timestamp.

class Sensor(models.Model):
    location = models.TextField(unique=True)
    code = models.TextField(unique=True)


class Reading(models.Model):
    sensor = models.ForeignKey(Sensor, related_name='readings')
    timestamp = models.DateTimeField()
    temperature = models.DecimalField(max_digits=6, decimal_places=3)

    class Meta:
        unique_together = (('sensor', 'timestamp'),)

Some of the things we might want to do for a given sensor:

  • Get the most recent temperature
  • Get the average temperature over a given period
  • Get the maximum temperature over a given period
  • Get the minimum temperature over a given period

If we start with a single sensor instance, we can do each of these without having to use Subquery and friends:

from django.db.models import Avg, Min, Max

most_recent_temperature = sensor.readings.order_by('-timestamp').first().temperature
period_readings = sensor.readings.filter(
    timestamp__gte=start,
    timestamp__lte=finish,
).aggregate(
    average=Avg('temperature'),
    minimum=Min('temperature'),
    maximum=Max('temperature'),
)

We could also get the minimum or maximum using ordering, like we did with the most_recent_temperature.

If we want to do the same for a set of sensors, mostly we can still achieve this (note how similar the code is to the block above):

sensor_readings = Reading.objects.filter(
  timestamp__gte=start,
  timestamp__lte=finish
).values('sensor').annotate(
  average=Avg('temperature'),
  minimum=Min('temperature'),
  maximum=Max('temperature'),
)

We might get something like:

[
    {
        'sensor': 1,
        'average': 17.5,
        'minimum': 11.3,
        'maximum': 25.9
    },
    {
        'sensor': 2,
        'average': 19.63,
        'minimum': 13.6,
        'maximum': 24.33
    },
]

However, it’s not obvious how we would get all of the sensors, and their current temperature in a single query.

Subquery to the rescue!

from django.db.models.expressions import Subquery, OuterRef

current_temperature = Reading.objects.filter(sensor=OuterRef('pk'))\
                                     .order_by('-timestamp')\
                                     .values('temperature')[:1]

Sensor.objects.annotate(
    current_temperature=Subquery(current_temperature)
)

What’s going on here as that we are filtering the Reading objects inside our subquery to only those associated with the sensor in the outer query. This uses the special OuterRef class, that will, when the queryset is “resolved”, build the association. It does mean that if we tried to inspect the current_temperature queryset, we would get an error that it is unresolved.

We then order the filtered readings by newest timestamp first; this, coupled with the slice at the end will limit us to a single row. This is required because the database will reject a query that results in multiple rows being returned for a subquery.

Additionally, we may only have a single column in our subquery: that’s achieved by the .values('temperature').

But maybe there is a problem here: we actually want to know when the reading was taken, as well as the temperature.

We can do that a couple of ways. The simplest is to use two Subqueries:

current_temperature = Reading.objects.filter(sensor=OuterRef('pk'))\
                                     .order_by('-timestamp')[:1]

Sensor.objects.annotate(
    current_temperature=Subquery(current_temperature.values('temperature')),
    last_reading_at=Subquery(current_temperature.values('timestamp')),
)

However, this will do two subqueries at the database level. Since these subqueries will be performed seperately for each row, each additional correlated subquery will result in more work for the database, with possible performance implications.

What about if we are using Postgres, and are okay with turning the temperature and timestamp pair into a JSONB object?

from django.db.models.expressions import Func, F, Value, OuterRef, Subquery
from django.contrib.postgres.fields import JSONField


class JsonBuildObject(Func):
    function = 'jsonb_build_object'
    output_field = JSONField()


last_temperature = Reading.objects.filter(sensor=OuterRef('pk'))\
                                  .order_by('-timestamp')\
                                  .annotate(
                                      json=JsonBuildObject(
                                          Value('timestamp'), F('timestamp'),
                                          Value('temperature'), F('temperature'),
                                      )
                                   ).values('json')[:1]

Sensor.objects.annotate(
    last_temperature=JsonBuildObject(last_temperature)
)

Now, your Sensor instances would have an attribute last_temperature, which will be a dict with the timestamp and temperature of the last reading.


There is also a supplied Exists subquery that can be used to force the database to emit an EXISTS statement. This could be used to set a boolean field on our sensors to indicate they have data from within the last day:

recent_readings = Reading.objects.filter(
    sensor=OuterRef('pk'),
    timestamp__gte=datetime.datetime.utcnow() - datetime.timedelta(1)
)
Sensor.objects.annotate(
    has_recent_readings=Exists(recent_readings)
)

Sometimes we’ll have values from multiple rows that we will want to annotate on from the subquery. This can’t be done directly: you will need to aggregate those values in some way. Postgres has a neat feature where you can use an ARRAY() constructor and wrap a subquery in that:

SELECT foo,
       bar,
       ARRAY(SELECT baz
               FROM qux
              WHERE qux.bar = base.bar
              ORDER BY fizz
              LIMIT 5) AS baz
  FROM base

We can build this type of structure using a subclass of Subquery.

from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import FieldError
from django.db.models.expressions import Subquery

class SubqueryArray(Subquery):
    template = 'ARRAY(%(subquery)s)'

    @property
    def ouput_field(self):
        ouput_fields = [x.ouput_field for x in self.get_source_expressions()]

        if len(output_fields) > 1:
            raise FieldError('More than one column detected')

        return ArrayField(base_field=output_fields[0])

And now we can use this where we’ve used a Subquery, but we no longer need to slice to a single row:

json_reading = JsonBuildObject(
    Value('timestamp'), F('timestamp'),
    Value('temperature'), F('temperature'),
)

last_five_readings = Reading.objects.filter(
    sensor=OuterRef('pk')
).order_by('-timestamp').annotate(
    json=json_reading
).values('json')[:5]

Sensor.objects.annotate(last_five_readings=SubqueryArray(last_five_readings))

Each sensor instance would now have up to 5 dicts in a list in it’s attribute last_five_readings.

We could get this data in a slightly different way: let’s say instead of an array, we want a dict keyed by a string representation of the timestamp:

sensor.last_five_readings = {
    '2019-01-01T09:12:35Z': 15.35,
    '2019-01-01T09:13:35Z': 14.33,
    '2019-01-01T09:14:35Z': 14.90,
    ...
}

There is a Postgres aggregate we can use there to do that, too:

class JsonObjectAgg(Subquery):
    template = '(SELECT json_object_agg("_j"."key", "_j"."value") FROM (%(subquery)s) "_j")'
    output_field = JSONField()


last_five_readings = Reading.objects.filter(
    sensor=OuterRef('pk')
).order_by('-timestamp').annotate(
    key=F('timestamp'),
    value=F('temperature'),
).values('key', 'value')[:5]

Sensor.objects.annotate(last_five_readings=JsonObjectAgg(last_five_readings))

Indeed, we can wrap any aggregate in a similar way: to get the number of values of a subquery:

class SubqueryCount(Subquery):
    template = '(SELECT count(*) FROM (%(subquery)s) _count)'
    output_field = models.IntegerField()

Since other aggregates need to operate on a single field, we’ll need something that ensures there is a single value in our .values(), and extract that out and use that in the query.

class SubquerySum(Subquery):
    template = '(SELECT SUM(%(field)s) FROM (%(subquery)s) _sum)'

    def as_sql(self, compiler, connection, template=None, **extra_context):
        if 'field' not in extra_context and 'field' not in self.extra:
            if len(self.queryset._fields) > 1:
                raise FieldError('You must provide the field name, or have a single column')
            extra_context['field'] = self.queryset._fields[0]
        return super(SubquerySum, self).as_sql(
          compiler, connection, template=template, **extra_context
        )

As I mentioned, it’s possible to write a subclass like that for any aggregate function, although it would be far nicer if there was a way to write that purely in the ORM. Maybe one day…

Expression Exclusion Constraints

Today I was working with a junior developer, and was lucky enough to be able to explain exclusion constraints to them. I got partway through it before I realised that the Django model we were working on did not have a range field, but instead had a start and a finish.

class Leave(models.Model):
    person = models.ForeignKey(
        'person.Person',
        related_name='approved_leave',
        on_delete=models.CASCADE,
    )
    start = models.DateTimeField()
    finish = models.DateTimeField()

It turns out that this is not a problem. You can use any expression in a constraint:

ALTER TABLE leave_leave
ADD CONSTRAINT prevent_overlapping_leave
EXCLUDE USING gist(person_id WITH =, TSTZRANGE(start, finish) WITH &&)

Whilst we have application-level validation in place to prevent this, there is a code path that allows it (hence the desire to implement this). Because this is an exclusion constraint, we won’t be able to use the NOT VALID syntax, but will instead have to either fix the invalid data, or use a WHERE clause to only apply the constraint to “new” data.

ALTER TABLE leave_leave
ADD CONSTRAINT prevent_overlapping_leave
EXCLUDE USING gist(person_id WITH =, TSTZRANGE(start, finish) WITH &&)
WHERE start > '2019-07-19';

The other benefit of this is that it creates an index that includes TSTZRANGE(start, finish), which could be used for querying, but also will ensure that start <= finish for all rows.

Infinite Recursion in Postgres

I’m not sure how useful it is, but it turns out it is possible to create a view with an infinite number of rows in postgres:

WITH year AS (
  SELECT 2000 AS year

  UNION ALL

  SELECT year + 1 FROM year
)
SELECT *
  FROM year;

As this stands it really isn’t useful, because it probably won’t start returning rows to the user. However, if you don’t know how many rows you will be generating, you could do something like:

WITH year AS (
  SELECT 2000 AS year

  UNION ALL

  SELECT year + 1 FROM year
)
SELECT *
  FROM year
 LIMIT %s;

Again, this may not seem to be that useful, as you could just use a generate_series(2000, 2000 + %s, 1). But I’m currently working on something that doesn’t always have a fixed count or interval (implementing RRULE repeats in SQL), and I think that maybe this might just be useful…

Merging Adjacent Ranges in Postgres

Previously, I detailed a solution to split/trim/replace overlapping items in a table. Subsequently, I decided I needed to merge all adjacent items that could be merged. In this case, that was with two other fields (only one of which was subject to the exclusion constraint) being identical in adjacent periods.

CREATE EXTENSION IF NOT EXISTS btree_gist;

CREATE TABLE team_membership (
  membership_id SERIAL,
  player_id INTEGER,
  team_id INTEGER,
  period DATERANGE,
  CONSTRAINT prevent_overlapping_memberships EXCLUDE USING gist(player_id WITH =, period WITH &&)
);

Before we can implement the plpgsql trigger function, we need to tell Postgres how to aggregate ranges:

CREATE AGGREGATE sum(anyrange) (
  stype = anyrange,
  sfunc = range_union
);

We should note at this point that range_union, or + (hence the reason I’ve called it SUM) will fail with an error if the two ranges that are being combined do not overlap or touch. We must make sure that in any queries where we are going to use it, that all of the ranges will overlap (and I believe they also must be in “order”, so that as we perform the union on each range in a “reduce” manner we never end up with non-contiguous ranges).

So, let’s look at the trigger function. Initially, I wrote this as two queries:

CREATE OR REPLACE FUNCTION merge_adjacent()
  RETURNS TRIGGER AS $$
  BEGIN
    NEW.period = (SELECT SUM(period) FROM (SELECT NEW.period UNION ALL ...));
    DELETE FROM team_membership ...;
    RETURN NEW;
  END;
  $$ LANGUAGE plpgsql STRICT;

This required me to duplicate the WHERE clauses, and was messy.

Then I remembered you can use the RETURNING clause, and use a CTE, with a SELECT INTO:

CREATE OR REPLACE FUNCTION merge_adjacent()
  RETURNS TRIGGER AS $$

  BEGIN
    WITH matching AS (
      DELETE FROM team_membership mem
            WHERE mem.person_id = NEW.person_id
              AND mem.team_id = NEW.team_id
              AND (mem.period -|- NEW.period OR mem.period && NEW.period)
        RETURNING period
    )
    SELECT INTO NEW.period (
      SELECT SUM(period) FROM (
        SELECT NEW.period
         UNION ALL
        SELECT period FROM matching
    ) _all ORDER BY period
    );
    RETURN NEW
  END;

  $$ LANGUAGE plpgsql STRICT;

CREATE TRIGGER merge_adjacent
BEFORE INSERT OR UPDATE ON team_membership
FOR EACH ROW EXECUTE PROCEDURE merge_adjacent();

The other thing to note about this construct is that it will only work on “already merged” data: if you had ranges:

[2019-01-01, 2019-01-04)
[2019-01-04, 2019-02-02)
# Note there is a gap here...
[2019-05-01, 2019-05-11)
[2019-05-11, 2020-01-01)

and you added in a value to the missing range:

INSERT INTO range (period) VALUES ('[2019-02-02, 2019-05-01)')

You would not merge all of the ranges, only those immediately adjacent. That is, you would wind up with rows:

[2019-01-01, 2019-01-04)
[2019-01-04, 2019-05-11)
[2019-05-11, 2020-01-01)

However, if this trigger is active on the table you would never get to the stage where your data was adjacent but not merged.

Graphs in Django and Postgres

I have written a bunch of posts about dealing with trees in Postgres and Django, and Funkybob used some of this to start the package django-closure-view.

Today, someone was looking for similar functionality, but for a graph. Specifically, a Directed Acyclic Graph. Now, not every graph, or even every DAG is a tree, but every tree is a DAG.

So, the difference between a tree and a graph in this context is that a given node may have an arbitrary number of parents. But, and this is worth noting now, none of it’s parents may also be dependencies.

The first part of this tells us that we can no longer just use a simple self-relation in our model to store the relationship: because there could be multiple parents. Instead, we will need to have a many-to-many relation to store that.

from django.db import models


class Node(models.Model):
    node_id = models.AutoField(primary_key=True)
    name = models.TextField(unique=True)
    parents = models.ManyToManyField(
        'self',
        related_name='children',
        symmetrical=False,
    )

We can put some meaningful data into this graph to make it a little more obvious if our queries are sane:

django, pytz, sqlparse, asgiref = Node.objects.bulk_create([
    Node(name='django'),
    Node(name='pytz'),
    Node(name='sqlparse'),
    Node(name='asgiref'),
])

django.parents.add(pytz, sqlparse, asgiref)

graph_demo, psycopg2 = Node.objects.bulk_create([
    Node(name='graph_demo'),
    Node(name='psycopg2')
])

graph_demo.parents.add(psycopg2, django)

Let’s have a bit of a look at some of the queries we might need to think about.

-- All root nodes
SELECT node_id, name
  FROM graph_node
  LEFT OUTER JOIN graph_node_parents ON (node_id = from_node_id)
 WHERE to_node_id IS NULL;

As expected, this gives us back all packages that have no dependencies (parents):

 node_id │   name
─────────┼──────────
       6 │ psycopg2
       2 │ pytz
       4 │ asgiref
       3 │ sqlparse
(4 rows)

And now, all packages which are not depended upon by any other packages (no parents):

SELECT node_id, name
  FROM graph_node
  LEFT OUTER JOIN graph_node_parents ON (node_id = to_node_id)
 WHERE from_node_id IS NULL;

We should only have one package here: graph_demo.

From each of these, we can build up a recursive query to get all descendants, or all ancestors of each root/leaf node.

WITH RECURSIVE ancestors AS (
  SELECT node_id, '{}'::INTEGER[] AS ancestors
    FROM graph_node
    LEFT OUTER JOIN graph_node_parents ON (node_id = from_node_id)
   WHERE to_node_id IS NULL

   UNION

  SELECT node.from_node_id,
         ancestors.ancestors || ancestors.node_id
    FROM ancestors
   INNER JOIN graph_node_parents node
           ON (ancestors.node_id = to_node_id)
) SELECT * FROM ancestors;

From here, we can annotate on the names to double check:

WITH RECURSIVE ancestors AS (
  SELECT node_id, '{}'::INTEGER[] AS ancestors
    FROM graph_node
    LEFT OUTER JOIN graph_node_parents ON (node_id = from_node_id)
   WHERE to_node_id IS NULL

   UNION

  SELECT node.from_node_id,
         ancestors.ancestors || ancestors.node_id
    FROM ancestors
   INNER JOIN graph_node_parents node
           ON (ancestors.node_id = to_node_id)
)
SELECT node_id,
       node.name,
       ancestors,
       ARRAY(SELECT name
               FROM unnest(ancestors) node_id
              INNER JOIN graph_node USING (node_id)
       ) AS ancestor_names
  FROM ancestors
  INNER JOIN graph_node node USING (node_id);

So that has given us all ancestor chains: but what about if we just want the closure table: all ancestor/descendant pairs?

WITH RECURSIVE closure_table AS (
  SELECT from_node_id AS descendant,
         to_node_id AS ancestor
    FROM graph_node_parents

   UNION

  SELECT descendant,
         to_node_id AS ancestor
    FROM closure_table
   INNER JOIN graph_node_parents ON (from_node_id = ancestor)
)
SELECT * FROM closure_table

Okay, that was even easier than the previous query.

Once we have our closure table query, then we can look at preventing cycles.

CREATE OR REPLACE RECURSIVE VIEW
graph_closure_table (descendant, ancestor) AS (

  SELECT from_node_id AS descendant,
         to_node_id AS ancestor
    FROM graph_node_parents

   UNION

  SELECT descendant,
         to_node_id AS ancestor
    FROM graph_closure_table
   INNER JOIN graph_node_parents ON (from_node_id = ancestor)
);

And we can now use this in a function to prevent cycles

CREATE OR REPLACE FUNCTION prevent_cycles()
RETURNS TRIGGER AS $$

BEGIN
  IF EXISTS(SELECT 1
              FROM graph_closure_table
             WHERE ancestor = NEW.from_node_id
               AND descendant = NEW.to_node_id
           ) THEN
    RAISE EXCEPTION 'cycle detected';
  END IF;
  RETURN NEW;
END;

$$ LANGUAGE plpgsql STRICT;

CREATE TRIGGER prevent_cycles
BEFORE UPDATE OR INSERT ON graph_node_parents
FOR EACH ROW EXECUTE PROCEDURE prevent_cycles();

And this will prevent us from being able to set an invalid dependency relationship: ie, one that would trigger a cycle:

>>> django.parents.add(graph_demo)
Traceback (most recent call last):
  File "...django/db/backends/utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
psycopg2.errors.RaiseException: cycle detected
CONTEXT:  PL/pgSQL function prevent_cycles() line 9 at RAISE

It’s not totally ideal, but it does show how it protects against saving invalid relationships.


Interestingly, if we drop that constraint, we can still run the closure table query: it doesn’t give us an infinite loop, because the view uses a UNION instead of a UNION ALL: it’s going to drop any rows that are already in the output when it deals with each row - and since there are not an infinite number of combinations for a given set of dependencies, it will eventually return data.

So, where from here? I’m not sure. This was just something that I thought about while answering a question in IRC, and I felt like I needed to explore the idea.

Handling overlapping values

One of the things that I enjoy most about Postgres are the rich types. Using these types can help reduce the amount of validation that the application needs to do.

Take for instance anything which contains a start date and a finish date. If you model this using two fields, then you also need to include validation about start <= finish (or perhaps start < finish, depending upon your requirements).

If you use a date range instead, then the database will do this validation for you. It is not possible to create a range value that is “backwards”. Sure, you’ll also need to do application-level (and probably client-side) validation, but there is something nice about having a reliable database that ensures you cannot possibly have invalid data.

Django is able to make good use of range types, and most of my new code seemingly has at least one range type: often a valid_period. So much so that I have a Mixin and a QuerySet that make dealing with these easier:

class ValidPeriodMixin(models.Model):
    valid_period = DateRangeField()

    class Meta:
        abstract = True

    @property
    def start(self):
        if self.valid_period.lower_inc:
            return self.valid_period.lower
        elif self.valid_period.lower is not None:
            return self.valid_period.lower + datetime.timedelta(1)

    @property
    def finish(self):
        if self.valid_period.upper_inc:
            return self.valid_period.upper
        elif self.valid_period.upper is not None:
            return self.valid_period.upper - datetime.timedelta(1)

    @property
    def forever(self):
        return self.valid_period.lower is None and self.valid_period.upper is None

    def get_valid_period_display(self):
        if self.forever:
            message = _('Always applies')
        elif self.start is None:
            message = _('{start} \u2092 no end date')
        elif self.finish is None:
            message = _('no start date \u2092 {finish}')
        else:
            message = _('{start} \u2092 {finish}')

        return message.format(
            start=self.start,
            finish=self.finish,
        )


def ensure_date_range(period):
    """
    If we have a 2-tuple of dates (or strings that are valid dates),
    ensure we turn that into a DateRange instance. This is because
    otherwise Django may mis-interpret this.
    """
    if not isintance(period, DateRange):
        return DateRange(period[0] or None, period[1] or None, '[]')
    return period


class OverlappingQuerySet(models.query.QuerySet):
    def overlapping(self, period):
        return self.filter(valid_period__overlap=ensure_date_range(period))

    def on_date(self, date):
        return self.filter(valid_period__contains=date)

    def today(self):
        return self.on_date(datetime.date.today())

As you may notice from this, it is possible to do some filtering based on range types: specifically, you can use the && Postgres operator using .filter(field__overlap=value), and the containment operators (<@ and @>) using .filter(field__contains=value) and .filter(field__contained_by=value). There are also other operators we will see a bit later using other lookups.


If you have a legacy table that stores a start and a finish, you would need to have a validator on the model (or forms that write to the model) that ensures start < finish, as mentioned above. Also, there is no way (without extra columns) to tell if the upper and lower values should be inclusive or exclusive of the bounds. In Postgres, we write range values using a notation like a mathematical range: using ‘[’, ‘]’ and ‘(‘, ‘)’ to indicate inclusive and exclusive bounds.

SELECT '[2019-01-01,2020-01-01)'::DATERANGE AS period;

One caveat when dealing with discrete range types (like dates and integers) is that Postgres will, if it is able to, convert the range to a normalised value: it will store (2019-01-01,2019-12-31] as [2019-01-02,2020-01-01). This can become a problem when showing the value back to the user, because depending upon context, it’s likely that you will want to use inclusive bounds when showing and editing the values.

You can manage this by using a form field subclass that detects an exclusive upper bound and subtracts one “unit” accordingly:

import datetime

from django.contrib.postgres.forms.ranges import (
    DateRangeField, IntegerRangeField
)


class InclusiveRangeMixin(object):
    _unit_value = None

    def compress(self, values):
        range_value = super().compress(values)
        if range_value:
          return self.range_type(
              range_value.lower,
              range_value.upper,
              bounds='[]'
          )

    def prepare_value(self, value):
        value = super().prepare_value(value)
        value = [
            field.clean(val)
            for field, val in zip(self.fields, value)

        ]
        if value[1] is not None:
            value[1] = value[1] - self._unit_value
        return value


class InclusiveDateRangeField(
    InclusiveRangeMixin, DateRangeField
):
      _unit_value = datetime.timedelta(1)


class InclusiveIntegerRangeField(
    InclusiveRangeMixin, IntegerRangeField
):
    _unit_value = 1

Back on to the topic of storing two values instead of a range: it’s possible to add an expression index on the table that uses DATERANGE:

CREATE INDEX thing_period_idx
          ON thing_thing (DATERANGE(start, finish));

You would be able to annotate on this value, do some querying, and it should use the index, allowing you to build querysets like:

Thing.objects.annotate(
    period=Func(
      F('start'),
      F('finish'),
      function='DATERANGE',
      output_field=DateRangeField())
).filter(period__overlap=other_period)

Range types show their full power when used with exclusion constraints. These allow you to prevent writing rows that violate the constraint. For instance, consider this model (and some largely irrelevant other models, Team and Player):

class TeamMembership(ValidPeriodMixin):
    ployer = models.ForeignKey(
        Player,
        related_name='team_memberships',
        on_delete=models.CASCADE,
    )
    team = models.ForeignKey(
        Team,
        related_name='player_memberships',
        on_delete=models.CASCADE,
    )

A player may only belong to one team at a time: that is, we may not have any overlapping valid_periods for a player.

You can do this using an exclusion constraint, but it does need the btree_gist extension installed:

CREATE EXTENSION IF NOT EXISTS btree_gist;

ALTER TABLE team_teammembership
        ADD CONSTRAINT prevent_overlapping_team_memberships
    EXCLUDE USING gist(person_id WITH =, valid_period WITH &&)
 DEFERRABLE INITIALLY DEFERRED;

Since this type of constraint is not yet supported in Django, you’ll have to do it in a RunSQL migration.

From here, we can attempt to write conflicting data, but the database will forbid it. You will still need to write code that checks before writing - this enables you to return a ValidationError to the user when you detect this conflict in a form, but having the exclusion constraint means that we can avoid the race condition where:

  • Check for overlapping ranges
  • Other process creates a range that will overlap
  • Save our data

You could possibly also use select_for_update in this context, but I prefer adding database constraints.

Note that the DEFERRABLE INITIALLY DEFERRED clause is important: it allows you, within a transaction, to write conflicting data, and it’s only when the transaction commits that the constraint is checked. This makes rewriting a bunch of values in one transaction much simpler: if you do not have this flag enabled then you will need to ensure you update them in an order that maintained no overlaps at each stage. I’m pretty confident this is always possible, but it’s a bunch of work (and it is possible that you might need to write some rows multiple times to maintain that).


So, now we can store range values (with database validation), and prevent overlapping data (with database validation).

What about a process that enables us to say “this row should replace, trim or split any that overlap with it”? I’m glad you asked.

It turns out given two rows, where one should “supersede” the other, there are five different conditions we need to take into account:

  • The rows do not overlap: no action required
  • The new row completely covers the old row: remove the old row
  • The old row has bounds that exceed the new row in both directions: split the old row into two rows
  • The old row has a lower bound that is smaller than the new row: trim the old row at the upper end
  • The old row has an upper bound that is larger than the new row: trim the old row at the lower end

It turns out we can perform this query with the Django range field lookups:

class OverlappingQuerySet(models.query.QuerySet):
    def with_overlap_type(self, period):
        period = ensure_date_range(period)
        return self.annotate(
            overlap_type=Case(
                # The objects do not overlap.
                When(~Q(valid_period__overlap=period,
                        then=Value(None))),
                # The existing value is covered by the new value
                When(valid_period__contained_by=period,
                     then=Value('replace')),
                # The existing value has no values
                # less than the new value
                When(valid_period__not_lt=period,
                     then=Value('trim:lower')),
                # The existing value has no values
                # greater than the new value
                When(valid_period__not_gt=period,
                     then=Value('trim:upper')),
                # The existing value contains the new value
                When(valid_period__contains=period,
                      then=Value('split')),
                output_field=models.TextField()
            )
        )

This works because a CASE WHEN stops evaluating when it finds a match: technically a trim:lower value could also match on containment (split), so we need to test that one earlier.

We are going to have to (possibly) perform multiple queries when writing back the data. If there are any than need to be “removed”, they will need a DELETE. Any that have a “trim” operation will require an UPDATE.

new_instance = Thing(valid_period=('2019-01-01', '2019-02-09'))
overlapping = Thing.objects.overlapping(
  new_instance.valid_period
).with_overlap_type(new_instance.valid_period)

overlapping.filter(overlap_type='replace').delete()
overlapping.filter(
    overlap_type__in=('trim:upper', 'trim:lower')
).update(
    valid_period=valid_period - new_instance.valid_period
)

But the tricky part is that any that are “split” will require at least two: either a DELETE followed by an INSERT (that inserts two rows), or a single UPDATE and a single INSERT. The tricky part here is that we also need to read the values first, if we are going to manipulate them in python. Instead, we can look at how to do it in raw SQL, with the benefit that we can perform this in a single operation.

WITH new_period AS (
  SELECT %s AS new_period
),
split AS (
  SELECT thing_id,
         valid_period,
         other_field,
         new.new_period
    FROM thing_thing old
    INNER JOIN new_period new ON (
          LOWER(old.valid_period) < LOWER(new.new_period)
      AND UPEER(old.valid_period) > UPEER(new.new_period)
    )
), new_rows AS (
  SELECT other_field,
         DATERANGE(LOWER(valid_period),
                   LOWER(new_period)) AS valid_period
    FROM split

   UNION ALL

  SELECT other_field,
         DATERANGE(UPPER(new_period),
                   UPPER(valid_period)) AS valid_period
),
removed AS (
  DELETE FROM thing_thing
   WHERE thing_id IN (SELECT thing_id FROM split)
)
INSERT INTO thing_thing (other_field, valid_period)
SELECT other_field, valid_period FROM new_rows;

This is less than ideal, because we need to enumerate all of the fields (instead of just other_field), so this code is not especially reusable as-is.

Let’s look at alternatives:

# Fetch the existing items.
splits = list(overlapping.filter(overlap_type='split').values())
to_create = []
to_delete = []
for overlap in splits:
    to_delete.append(overlap.pop('thing_id'))
    valid_period = overlap.pop('valid_period')
    to_create.append(Thing(
        valid_period=(valid_period.lower, new_instance.valid_period.lower),
        **overlap
    ))
    to_create.append(Thing(
        valid_period=(new_instance.valid_period.upper, valid_period.upper),
        **overlap
    ))
overlapping.filter(pk__in=to_delete).delete()
Thing.objects.bulk_create(to_create)

We can stick all of that into a queryset method, to make it easier to manage.

import copy


class OverlappingQuerySet(models.query.QuerySet):
    def trim_overlapping(self, period):
        """
        Trim/split/remove all overlapping objects.

        * Remove objects in the queryset that are
          "covered" by the period.
        * Split objects that completely cover the
          new period with overlap at both sides
        * Trim objects that intersect with the new
          period and extend in one direction or the
          other, but not both.

        This will do a single query to trim object that need
        trimming, another query that fetches those that need
        splitting, a single delete query to remove all
        split/replaced objects, and finally an optional query
        to create replacement objects for those split.

        That means this method _may_ perform 3 or 4 queries.

        This particular algorithm should work without a
        transaction needing to be present, but in practice
        this action and the create of a new one should be
        in the same transaction, so they can all roll-back
        if anything goes wrong.
        """
        period = ensure_date_range(period)

        overlapping = self.overlapping(period)\
                          .with_overlap_type(period)

        # Easy first: update those that we can just update.
        overlapping.filter(
            overlap_type__startswith=('trim')
        ).update(
            valid_period=models.F('valid_period') - period
        )

        # Create the new objects for each of the ones that
        # extend either side of the new value.
        # There will alwasy be two of them: one for the lower
        # section, and one for the upper section.
        to_create = []
        for instance in overlapping.filter(overlap_type='split'):
            # Setting the primary key to None will trigger a new
            # instance.
            instance.pk = None
            # We need to create two instances, each with a different
            # valid_period.
            valid_period = instance.valid_period
            # The one _before_ the new value.
            instance.valid_period = DateRange(
                valid_period.lower, period.lower, bounds='[)'
            )
            to_create.append(instance)
            # And a new copy to go _after_ the new value.
            instance = copy.deepcopy(instance)
            instance.valid_period = DateRange(
                period.upper, valid_period.upper, bounds='(]'
            )
            to_create.append(instance)


        # Now clean up any that we need to get rid of.
        overlapping.filter(
            overlap_type__in=('replace', 'split')
        ).delete()

        # And finally add back in any replacement objects
        # that extended either side of the new value.
        if to_create:
            self.model._default_manager.bulk_create(to_create)

Yeah, I think that will do for now.