Celery Once

Within our audit logs, which happen at the database level, we also want to be able to show the user a representation of what the object “looks like”: basically, what str(instance) is. This is shown as a supplementary thing, and often makes it easier to identify which object has been edited. Otherwise, the audit logs only show the things that have changed on the object in that event.

However, because this happens in python/django, and not in the database, we need some mechanism for fetching audit logs that don’t have this string representation, and then update those. This is the perfect use case for celery: we can have a periodic task that looks for audit logs missing this related object, and then creates them.

There are a few things that can cause problems:

  • If we run the task infrequently, then there can at times be a large number of new audit logs, which can cause this task to take a long time to run: blocking other tasks, or perhaps even timing out. We should limit the number of objects that may be handled in a given task.
  • If we run the task infrequently, there can be a big delay between an action being logged, and the representation of that action being created. This can make audit logs hard to read.
  • If we run with too few as our batch size, then we don’t deal with the backlog, or we need to run more frequently.
  • If we run too frequently, we end up spending all of our time checking for missing objects (we have tens of millions at last count), and the tasks stack up.
  • If we have multiple instances of the task running at the same time, then creation of the objects in the second finishing task can fail, because the first task has already created them. At this point, we have done a bunch of work for nothing.

The ideal behaviour is to queue a task to run somewhat frequently: perhaps every 2-5 minutes, and do a batch of a reasonable size. Then, if there are still any more objects to process, re-queue a task immediately.

But there is still the problem here of multiple tasks running at the same time. If a task has detected more missing objects, and requeued itself, and this keeps happening before the next time the scheduled task is started, we may well end up with the race condition described above. It’s possible at this time that maybe we are making more objects than we are able to process, but in our case this only happens rarely, not all of the time.

There is a project, celery-once, that will prevent multiple instances of the same task being queued at the same time. This feels like it will do exactly what we want!

@app.task(base=QueueOnce)
def fix_missing_instances(batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if missing.exists():
        fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

But, alas, this won’t quite work. The task is still “in the queue”, just marked as “reserved”, so celery_once will not add a new item to the queue.

As an aside, there’s actually a better pattern for knowing if there are more objects to process: we can compare the number of objects we just created to our batch size, and if they are the same, then we probably need to process more.

But, back onto the first issue. We tried to delay (using the countdown argument) our task, but this was not enough.

We can sidestep the problem using a second task, which just queues the first task:

@app.task(bind=True, base=QueueOnce)
def fix_missing_instances(self, batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if len(created) == batch_size and not self.request.called_directly:
        requeue_fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)


@app.task
def requeue_fix_missing_instances(batch_size):
    fix_missing_instances.delay(batch_size=batch_size)

Note that I’ve also done the “optimisation” where it no longer does a seperate .exists() query after processing.

I also think there is a bug somewhere in celery-once related to handling the keys argument to the once dict, that can be used to limit the conditions that would indicate a task was already running (that is, with the same kwargs). But I haven’t been able to isolate this and write a test case/PR yet. In our case, we don’t really care about the task args anyway.

Opening Hours Redux

A few years ago, I wrote up some stuff about Postgres Composite Types in Django. Holy cow, that appears to be 5 years ago.

Anyway, it’s come up a bit recently on #postgresql on IRC, and I thought I might expand a little on how I’m currently using that concept, and some ideas that could be used to do more.

The composite type itself is quite straightforward: we store two values representing the opening time, and then the length of time that the business is open. This allows us to model things that go over midnight without having to worry about a bunch of checks about (start > finish), and whatever that means.

CREATE TYPE open_period AS (
  start TIME,
  length INTERVAL
);

We could have use a DOMAIN TYPE to limit the length to less than or equal to 24 hours, however I’ll omit that for now.

From there, we can use the new type wherever we would use any other type: including in an array.

CREATE TABLE stores (
  store_id SERIAL PRIMARY KEY,
  name TEXT,
  default_opening_hours open_period[7]
);

Nothing new here since the last post.

However, let’s look at coming up with a mechanism that prevents subsequent days from overlapping with one another. Since we have all of these in an array, we can write a single function that ensures the values are acceptable together. There are a couple of different approaches we could use. One would be to “materialise” the open periods, and then compare them to one another.

CREATE OR REPLACE FUNCTION materialise(open_period, DATE)
RETURNS TSRANGE AS $$

  SELECT TSRANGE(
    ($2 || 'T' || $1.start || 'Z')::TIMESTAMP,
    ($2 || 'T' || $1.start || 'Z')::TIMESTAMP + $1.length
  );

$$ LANGUAGE SQL STRICT IMMUTABLE;



CREATE OR REPLACE FUNCTION materialise(open_period)
RETURNS TSRANGE AS $$

  SELECT materialise($1, '1979-01-01'::DATE);

$$ LANGUAGE SQL STRICT IMMUTABLE;

We have a version there that takes a specific day, but also one that just uses the epoch date. That may be useful later…

…but right now we want to be able to apply subsequent days to each item in the array, and then look for overlaps.

WITH default_opening_hours AS (
  SELECT UNNEST(ARRAY[
    ('09:00', '08:00')::open_period,  -- Monday, but we won't really use that today.
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '12:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('10:00', '07:00')::open_period,
    ('11:00', '06:00')::open_period
  ]) AS hours
), materialised_opening_hours AS (
  SELECT materialise(hours, (now() + INTERVAL '1 day' * row_number() OVER ())::DATE) AS hours
    FROM default_opening_hours
), overlapping_periods AS (
  SELECT hours && LEAD(hours, 1) OVER () AS overlap
    FROM materialised_opening_hours
)
SELECT * FROM overlapping_periods WHERE overlap;

We don’t (at this point in time) really mind if the weekdays that the open periods refer to is the correct weekday: instead we just need to ensure that we have 7 consecutive days, with the sequence of open_periods materialised to the correct value based on the offset from the first day.

This is pretty close: it will find any overlaps between days, except for if the finish of the last day overlaps with the start of the next day. We can cheat a little to make that work:

WITH default_opening_hours AS (
  SELECT UNNEST(ARRAY[
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '12:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('10:00', '07:00')::open_period,
    ('11:00', '06:00')::open_period
  ]) AS hours
), materialised_opening_hours AS (
  SELECT materialise(hours, (now() + INTERVAL '1 day' * row_number() OVER ())::DATE) AS hours
    FROM default_opening_hours

   UNION ALL

  SELECT materialise((SELECT hours FROM default_opening_hours LIMIT 1),
                     (now() + INTERVAL '8 days')::DATE
  )
), overlapping_periods AS (
  SELECT hours && LEAD(hours, 1) OVER () AS overlap
    FROM materialised_opening_hours
)
SELECT * FROM overlapping_periods WHERE overlap;

Let’s put a couple of values in there to see that the overlaps are detected:

WITH default_opening_hours AS (
  SELECT UNNEST(ARRAY[
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '28:00')::open_period,
    ('09:00', '12:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('10:00', '07:00')::open_period,
    ('11:00', '24:00')::open_period
  ]) AS hours
), materialised_opening_hours AS (
  SELECT materialise(hours, (now() + INTERVAL '1 day' * row_number() OVER ())::DATE) AS hours
    FROM default_opening_hours

   UNION ALL

  SELECT materialise((SELECT hours FROM default_opening_hours LIMIT 1),
                     (now() + INTERVAL '8 days')::DATE)
), overlapping_periods AS (
  SELECT hours && LEAD(hours, 1) OVER () AS overlap
    FROM materialised_opening_hours
)
SELECT * FROM overlapping_periods WHERE overlap;
 overlap
─────────
 t
 t
(2 rows)

Now, we can bundle this up into a function that we can then use in a CHECK CONSTRAINT (as we cannot use a subquery directly in a check constraint):

CREATE OR REPLACE FUNCTION find_subsequent_day_overlaps(open_period[])
RETURNS BOOLEAN AS $$
  SELECT NOT EXISTS (
      WITH materialised_opening_hours AS (
        SELECT materialise(hours, (now() + INTERVAL '1 day' * row_number() OVER ())::DATE) AS hours
          FROM unnest($1) hours

         UNION ALL

        SELECT materialise($1[1], (now() + INTERVAL '8 days')::DATE)
      ), overlapping_periods AS (
        SELECT hours && LEAD(hours, 1) OVER () AS overlap FROM materialised_opening_hours
      )
      SELECT * FROM overlapping_periods WHERE overlap
    )
$$ LANGUAGE SQL STRICT IMMUTABLE;
ALTER TABLE store
ADD CONSTRAINT prevent_default_opening_hours_overlap
CHECK (find_subsequent_day_overlaps(default_opening_hours));

And, now to check:

INSERT INTO stores (name, default_opening_hours) VALUES
(
  'John Martins',
  ARRAY[
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('09:00', '12:00')::open_period,
    ('09:00', '08:00')::open_period,
    ('10:00', '07:00')::open_period,
    ('11:00', '06:00')::open_period
  ]
);

And with invalid data:

INSERT INTO stores (name, default_opening_hours) VALUES (
  'Foo',
  ARRAY[('09:00', '08:00')::open_period,
        ('09:00', '08:00')::open_period,
        ('09:00', '08:00')::open_period,
        ('09:00', '12:00')::open_period,
        ('09:00', '08:00')::open_period,
       ('10:00', '07:00')::open_period,
       ('11:00', '24:00')::open_period]);

…which throws an exception:

ERROR:  new row for relation "store" violates check constraint "prevent_default_opening_hours_overlap"
DETAIL:  Failing row contains (2, Foo, {"(09:00:00,08:00:00)","(09:00:00,08:00:00)","(09:00:00,08:00:00...).

Righto, what other things might we want to do with these composite types?

Some businesses have a concept of “Day Parts”, for instance, within a single day we may want to look at a sub-set of that day. For instance, sales during Breakfast may have a different set of Key Performance Indicators than those during Lunch or Tea. So, we may want to store something like:

+------------+------------+-------------+
| Day Period | Start time | Finish time |
+============+============+=============+
| Breakfast  |    06:00   |     10:00   |
| Lunch      |    11:00   |     14:00   |
| Tea        |    16:00   |     21:00   |
+------------+------------+-------------+

Again, it might make sense to store these as an open_period instead, because they could go over midnight. We’ll also want the name to be unique per store, but that’s something we can do with a plain old unique index:

CREATE TABLE day_parts (
  day_part_id SERIAL PRIMARY KEY,
  store_id INTEGER REFERENCES stores(store_id),
  name TEXT,
  period OPEN_PERIOD
);
CREATE UNIQUE INDEX distinct_name_per_day_period ON day_parts (store_id, name)

We can use an exclusion constraint to prevent overlaps, however you may need to enable support first:

CREATE EXTENSION btree_gist;

Now, let’s see the exclusion constraint:

ALTER TABLE day_parts
ADD CONSTRAINT prevent_overlapping_day_parts
EXCLUDE USING gist(
  materialise(period) WITH &&,
  store_id WITH =
);

Turns out that is actually easier to implement than the values in the array!


The other thing we may want to do is annotate on the Day Period to an object of some sort. To do this we will need to materialise all of the day periods for the given day(s), and see which one of them our timestamp is within. We will expand on a couple of things here: specifically, we need to have a timezone within which our store is located. To make things easier to follow, we will have all of the DDL code anew. This is partly because this example will not use the concept of default opening hours.

CREATE TABLE stores (
  store_id SERIAL PRIMARY KEY,
  name TEXT UNIQUE NOT NULL,
  timezone TEXT NOT NULL CHECK (now() AT TIME ZONE timezone IS NOT NULL)
  -- Note we validate that this column contains a valid timezone by
  -- attempting to coerce now() to that timezone: this will report
  -- back an error if the timezone name is not recognised.
);

CREATE TABLE day_parts (
  day_part_id SERIAL PRIMARY KEY,
  store_id INTEGER REFERENCES stores (store_id),
  name TEXT,
  period OPEN_PERIOD,
  CONSTRAINT prevent_overlapping_day_parts EXCLUDE USING gist(
    materialise(period) WITH &&,
    store_id WITH =
  )
);

CREATE UNIQUE INDEX distinct_name_per_day_period ON day_parts(store_id, name);

CREATE TABLE transactions (
  transaction_id SERIAL PRIMARY KEY,
  store_id INTEGER REFERENCES stores (store_id),
  timestamp TIMESTAMPTZ,
  amount NUMERIC
);

And now add some data:

INSERT INTO stores (name, timezone)
     VALUES ('John Martins', 'Australia/Adelaide');

INSERT INTO day_parts (store_id, name, period)
     VALUES (1, 'Morning',   ('09:00', '02:00')),
            (1, 'Lunch',     ('11:00', '03:00')),
            (1, 'Afternoon', ('14:00', '03:00')),
            (1, 'Evening',   ('17:00', '04:00'));


INSERT INTO transactions (store_id, timestamp, amount)
     VALUES (1, '2019-05-27T01:25:22', '33.77'),
            (1, '2019-05-27T04:33:47', '724.75'),
            (1, '2019-05-27T06:00:42', '47.48'),
            (1, '2019-05-27T08:33:12', '3.44');

The first thing we want to do is show the transactions at the time it was in the store when they were completed:

SELECT transactions.*,
       transactions.timestamp AT TIME ZONE stores.timezone AS local_time
  FROM transactions
 INNER JOIN stores USING (store_id)
 transaction_id │ store_id │       timestamp        │ amount │     local_time
              1 │        1 │ 2019-05-27 01:25:22+00 │  33.77 │ 2019-05-27 10:55:22
              2 │        1 │ 2019-05-27 04:33:47+00 │ 724.75 │ 2019-05-27 14:03:47
              3 │        1 │ 2019-05-27 06:00:42+00 │  47.48 │ 2019-05-27 15:30:42
              4 │        1 │ 2019-05-27 08:33:12+00 │   3.44 │ 2019-05-27 18:03:12

Next, we want to annotate on which day part corresponds to that local time:

SELECT trans.*,
       day_part.name AS day_part
  FROM (
    SELECT transactions.*,
           transactions.timestamp AT TIME ZONE stores.timezone AS local_time
      FROM transactions
     INNER JOIN stores USING (store_id)
  ) trans
  LEFT OUTER JOIN LATERAL (
    SELECT materialise(day_parts.period, trans.local_time::DATE) AS day_part,
           day_parts.name
      FROM day_parts
     WHERE day_parts.store_id = trans.store_id
  ) day_part ON (day_part @> local_time)
 transaction_id │ store_id │       timestamp        │ amount │     local_time      │ day_part
────────────────┼──────────┼────────────────────────┼────────┼─────────────────────┼───────────
              1 │        1 │ 2019-05-27 01:25:22+00 │  33.77 │ 2019-05-27 10:55:22 │ Morning
              2 │        1 │ 2019-05-27 04:33:47+00 │ 724.75 │ 2019-05-27 14:03:47 │ Afternoon
              3 │        1 │ 2019-05-27 06:00:42+00 │  47.48 │ 2019-05-27 15:30:42 │ Afternoon
              4 │        1 │ 2019-05-27 08:33:12+00 │   3.44 │ 2019-05-27 18:03:12 │ Evening

From there, we could look at aggregation within day parts, or comparisons between different days, but only the same day part.


Those of you paying attention may notice that I used TSRANGE instead of TSTZRANGE in the materialise functions. Can we look at a version of these functions that accepts a timezone as well as a date (and open_period), and gives back a TSTZRANGE?

CREATE OR REPLACE FUNCTION materialise(open_period, DATE, timezone TEXT)
RETURNS TSTZRANGE AS $$

  SELECT TSTZRANGE(
    ($2 || 'T' || $1.start)::TIMESTAMP AT TIME ZONE timezone,
    (($2 || 'T' || $1.start)::TIMESTAMP + $1.length) AT TIME ZONE timezone
  );

$$ LANGUAGE SQL STRICT IMMUTABLE;

Now we can rewrite our last query:

SELECT transactions.*,
       day_part.name AS day_part
  FROM transactions
  LEFT OUTER JOIN LATERAL (
    SELECT materialise(day_parts.period, transactions.timestamp::DATE, stores.timezone) AS day_part,
           day_parts.name
      FROM day_parts
      INNER JOIN stores USING (store_id)
     WHERE day_parts.store_id = transactions.store_id
  ) day_part ON (day_part.day_part @> transactions.timestamp)
 transaction_id │ store_id │       timestamp        │ amount │ day_part
              1 │        1 │ 2019-05-27 01:25:22+00 │  33.77 │ Morning
              2 │        1 │ 2019-05-27 04:33:47+00 │ 724.75 │ Afternoon
              3 │        1 │ 2019-05-27 06:00:42+00 │  47.48 │ Afternoon
              4 │        1 │ 2019-05-27 08:33:12+00 │   3.44 │ Evening

Although, I think this might be a bit harder to do aggregation per-day, because you’d still need to get the “local” timestamp to put them on the same day, although, that’s actually part of the materialisation of the store’s full open period anyway.

Postgres Generated Columns

A little while ago, I wrote about creating a nice way to have a Django ComputedField. It is pretty neat, except it needs to do some black magic to sniff up the stack to work around a limitation in the way a Ref/Col works in Django.

The way it works is that you define the expression in Python, and it evaluates it in the database, allowing you to query based on this, and have it automatically annotated on.

What it doesn’t do, however, is actually store that value in the database. Indeed, if you are actually querying on this column, you’d probably want to have a functional index that uses the same expression, so that the database can do a reasonable job of improving query times on that column.

New in Postgres 12 is a feature that really piqued my interest: Generated Columns.

These are basically what the ComputedField does, but at the database level. And, instead of it being an expression that is evaluated at query time, it is instead an expression that is evaluated at write time, and stored in an actual column (that could then have an index applied to it).

Let’s have a look at an example:

CREATE TABLE person (
  person_id integer PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
  first_name TEXT,
  last_name TEXT,
  full_name TEXT GENERATED ALWAYS AS (
    COALESCE(first_name, '') || ' ' || COALESCE(last_name, '')
  ) STORED
);

Again, I’m aware I’m failing to note at least one of the falsehoods programmers believe about names.

Notes about this:

  • I’ve used the similar (preferred) syntax for generating the primary key.
  • You must have the keyword STORED at the end of the column definition: or more specifically, the syntax must be <column> <type> GENERATED ALWAYS AS (<expression>) STORED.
  • You may only refer to other columns within the same row: similar to how a functional index would work.
  • You may not refer to other generated columns: that would likely require parsing the expressions to determine which one to calculate first. I’d love to see postgres implement that at some point though!

So, let’s have a look at that with some data:

INSERT INTO person (first_name, last_name)
VALUES
    ('alice', 'aardvark'),
    ('bob', 'burger'),
    ('chuck', NULL),
    (NULL, 'darris');

And when we query it:

SELECT * FROM person;
 person_id │ first_name │ last_name │   full_name
 ------------------------------------------------------
         1 │ alice      │ aardvark  │ alice aardvark
         2 │ bob        │ burger    │ bob burger
         3 │ chuck      │ <NULL>    │ chuck
         4 │ <NULL>     │ darris    │  darris
(4 rows)

Oh, bother. We didn’t want the space before ‘darris’ (or the one you can’t see, after ‘chuck’). We’ll have to fix that in a sec.

So, what happens when we try to write to the full_name column?

UPDATE person SET first_name = 'dave', full_name='foo' WHERE first_name IS NULL;
ERROR:  column "full_name" can only be updated to DEFAULT
DETAIL:  Column "full_name" is a generated column.

Okay, that’s nice to know. If the error was ignored, we could have just used a custom django field and ignored the value, but we’ll need something similar to how ComputedField prevents writing values. I’ll have to investigate that further.

But, back onto the fact I forgot to trim any leading or trailing spaces. It turns out that there is no way to alter the expression that is being used in a generated column. Which, when you think a little more about it, sort-of makes sense. At the very least, it would need to write new values to each column where the new value was different to the old value.

Instead, you need to drop the column, and re-add it with the correct expression. You’ll almost certainly want to do this in a transaction:

BEGIN;
ALTER TABLE person DROP COLUMN full_name;
ALTER TABLE person ADD COLUMN full_name TEXT
      GENERATED ALWAYS AS (TRIM(
        COALESCE(first_name, '') || ' ' ||
        COALESCE(last_name, '')
      )) STORED;
COMMIT;

And now we can query our table again:

SELECT * FROM person;
 person_id │ first_name │ last_name │   full_name
 ------------------------------------------------------
         1 │ alice      │ aardvark  │ alice aardvark
         2 │ bob        │ burger    │ bob burger
         3 │ chuck      │ <NULL>    │ chuck
         4 │ <NULL>     │ darris    │ darris
(4 rows)

Sweet.

Form and Formset

Sometimes, you’ll have an object that you want to save, and, at the same time, some related objects that should also be updated, created and/or deleted.

Django has really nice tools for doing both of these operations (ModelForm for the individual instance, and InlineFormSet for the group of related objects). Both of these are really well documented. However, it is nice to be able to encapsulate these operations into a single functional unit.

We can leverage the fact that all request data is passed to a form class when it is instantiated, along with some nice use of the django cached_property decorator to make this really quite neat.

Let’s consider this model structure: we have a Person, and each Person may have zero or more Addresses. Every Person has a name, and an optional date of birth. All of the fields for the address are required:

class Person(models.Model):
    name = models.TextField()
    date_of_birth = models.DateField(null=True, blank=True)


class Address(models.Model):
    person = models.ForeignKey(Person, related_name='addresses')
    street = models.TextField()
    suburb = models.TextField()
    postcode = models.TextField()
    country = django_countries.fields.CountryField()

We can have a view for updating the Person model instance that is very simple:

class PersonForm(forms.ModelForm):
    name = forms.TextInput()
    date_of_birth = forms.DateInput()

    class Meta:
        model = Person
        fields = ('name', 'date_of_birth')


class UpdatePerson(UpdateView):
    form_class = PersonForm

Likewise, we can have a view for updating a person’s addresses:

AddressFormSet = inlineformset_factory(
    Person,
    Address,
    fields=('street', 'suburb', 'postcode', 'country'),
)


class UpdateAddresses(UpdateView):
    form_class = AddressFormSet

As mentioned above, we’d like to have a page where a Person’s name, date of birth and addresses may be modified in one go, rather than having to have two seperate pages.

from django.utils.functional import cached_property
from django.utils.translation import ugettext as _


class PersonForm(forms.ModelForm):
    name = forms.TextInput()
    date_of_birth = forms.DateInput()

    class Meta:
        model = Person
        fields = ('name', 'date_of_birth')

    @cached_property
    def addresses(self):
        return inlineformset_factory(
            Person, Address, fields=('street', 'suburb', 'postcode', 'country')
        )(
            data=self.data,
            files=self.files,
            instance=self.instance,
            prefix='address',
        )

    def clean(self):
        # Just in case we are subclassing some other form that does something in `clean`.
        super().clean()
        if not self.addresses.is_valid():
            self.add_error(None, _('Please check the addresses'))

    def save(self, commit=True):
        result = super().save(commit=commit)
        self.addresses.save(commit=commit)
        return result


class UpdatePerson(UpdateView):
    form_class = PersonForm

So, how does this work?

When the form.addresses attribute is accessed, the decorator looks up to see if it has been accessed within this request-response cycle. On the first access, a new formset class is generated from the factory, which is then instantiated with the arguments as shown. Every other access will result in the cached value from the instantiation being used, keeping everything working.

Within our template, we can just render the formset normally, however, we may want to use some fancy javascript to make it dynamic. In this case, I’ll just use the default rendering as seen in the django formset documentation.

<form action="{% url 'person:update' form.instance.pk %}"
      method="POST">
  {% csrf_token %}
  
  

  <button type="submit">
    {% trans 'Save' %}
  </button>
</form>

Query Zen is no queries at all

Performing no queries is always going to be faster than performing a query.

Today I had two instances of the same problem: I have two tables, one of which essentially stores calculated data based on other data (and data in other tables, or involving a process that uses application code, and cannot be purely determined within the database).

In one case, we have an audit logging table (which is purely handled within postgres) and another related table that stores a string representation of what the audited object looked like according to the application at that point in time, which needs to be calculated after the fact in Django.

The other case stores some cached values that can be calculated in the database: basically some metadata about a shift according to the location that the shift is at. Changes to the shift table will cause this value to automatically be updated, however we have several million shifts that do not currently have this value, but we need to create items for all shifts that currently don’t have the annotation.

In both cases, we have a celery task that will create a (relatively small, to prevent locks and other performance issues) number of the related objects, but only for those that don’t already have one. The tricky bit is that we need to trigger another instance of the celery task if we still have remaining objects in the database that don’t yet have the related item.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if not missing.exists():
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Since we have some 15 million audit logs (so far), it turns out that this missing.exists() was taking several seconds to run. I tried to write an optimised version, but was not able to improve the performance.

Then, it occurred to me (thanks mlt- on #postgres), that we can look at the number of items we created, and see if it was the same as the batch_size. If it was smaller than the batch size, then we know we are up to date, and don’t need to reschedule our task.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if len(created) == batch_size:
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Bingo: since we needed to execute the query to fetch the objects to begin with, we are now doing no extra work to see if we need to run our task again.


The other situation can be done in the database, however a single query of several million rows will block other things from happening, so we want to run the update in batches. There is a trigger on the table so that new or updated rows will already have a value, which actually makes it the same problem, but in SQL:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    JOIN ... ON (...)
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    LIMIT 1000
), step_2 AS (
  ...
),
..., step_N AS (
  ...
)
INSERT INTO annotations (shift_id, ...) SELECT * FROM step_N;

There’s actually a bunch more to this, but it’s not really important: building up the annotations hits a couple of other tables, and I’ve used a CTE because each value is based on a previous annotation.

We can put this query into a task too, but we need some way of determining how many inserts we did. Luckily, Postgres has the RETURNING clause on an INSERT. It would be really nice if we could do:

WITH step_1 AS (...)
INSERT INTO annotations (shift_id, ...)
SELECT * FROM step_N
RETURNING COUNT(*)

Alas, that’s not possible. However, we can just extend our CTE:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    ...
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    -- NOTE: the LIMIT value is a parameter!
    LIMIT %s
),
...,
step_N AS (...),
inserts AS (
  INSERT INTO annotations(shift_id, ...)
  SELECT * FROM step_N
  RETURNING shift_id
)
SELECT COUNT(*) FROM inserts

Then, we can write our celery task:

from django.db import connection

@app.task
def update_missing_annotations(batch_size):
    with connection.cursor() as cursor:
        cursor.execute(QUERY, [batch_size])
        if cursor.fetchone()[0] == batch_size:
            update_missing_annotations.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Partially failing tests

$ ack unittest.expected | grep expect | wc -l
        12

I’m not proud to say it, but we have about (exactly?) a dozen tests in our system that for one reason or another are currently failing. These were determined to not be enough to block development, so they are still there. As sort-of shown in the command line above, it’s possible to mark these tests as “Expected to Fail”. From time to time, I’ll go back and revisit these, and see if there’s anything that we can fix in the short term.

Today, however, I was working on something, and since it was going to touch some of the same code that my big query performance branch is already doing, I decided to work from there. I merged it all in, and wrote some new tests, and updated the queries.

But it turns out that the query performance branch is missing some bits that are required for one of the queries. I didn’t realise this until I’d written the tests, and the migrations to update the queries.

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

I was able to remove the migration easily enough, but I didn’t really want to lose the tests.

I could have just commented them out, but I thought maybe I’d just try to mark those parts as expectedFailure:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        with unittest.expectedFailure:
          # This assertion will fail, but we don't care right now.
          self.assertEqual('baz', qux)

However, this doesn’t work.

Hmm, I thought, what if we put that into a seperate method, not prefixed with test_:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        self.optional_tests()

    @unittest.expectedFailure
    def optional_tests(self):
        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

And this actually works. If you have a failure in the inner method, this will not trigger a failed build, but if you have a failure in the actual test method, then it will.

Hallelujah

I have been listening to Malcolm Gladwell’s podcast Revisionist History. It’s been quite entertaining, but I sometimes do feel that he is a little less than honest with some of his opinions. I know there are people who have strong opinions about Gladwell, but I’m not really one of them. I have read some of his stuff, and find myself sometimes in agreement with his hypothesis, but not always. I think not blindly accepting what he talks about is prudent: but the same is true of everyone.

In episode Hallelujah, the discussion starts with a foray into Elvis Costello, but at some point then pivots and talks about the Leonard Cohen song, covered by almost everyone, but most famously Jeff Buckley. Gladwell’s point is that some songs (or works of art, or other works of genius) are “fully formed”, but others take iteration, and in some cases, a bunch of it.

Specifically, he states that the song Hallelujah had this really long, really unlikely string of events that all needed to happen in order for the song to become recognised as the magical entity that it is. I’m not denying that, but he does miss the point that there are probably many more chains of events that could have resulted in songs just as good as this; maybe even better.

We only know about Hallelujah because in our universe, there was a completed chain of events that lead to the song being released by Buckley, and then his unfortunate demise. I also wonder if perhaps the song may still have become as famous as it did even if he had not drowned: my iTunes library tells me another song of his was in the Triple J Hottest 100 the year before he died.

I’m not doubting at all that Leonard Cohen, Elvis Costello or Jeff Buckley were all musical geniuses: but this selection bias hides the fact that there could be many more musical geniuses out there that wrote songs that did fade into obvlivion.

I’m still glad we got Hallelujah, though.

Sonoff Touch LAN Mode and MQTT

I’ve had a couple of the Sonoff Basic devices for quite some time. It’s fairly easy to solder some header pins onto these which makes flashing the firmware somewhat of a non event, but it’s still a bit of a pain.

The other thing I bought (again, some time ago, but a bit after the Sonoff Basic) is a Sonoff Touch. This is an in-wall light switch replacement, which means you can replace your existing light switches with something that you can control over WiFi. They actually look pretty nice, too.

I wasn’t so keen on mucking around with soldering them, partly because you need to use a 90° header. However, the other day I learned that there is a way to control them (and the Basic) without having to connect to the iTead servers.

When the devices are unable to connect to a remote server (yes, they basically keep a connection to this remote server open 100% of the time, which was part of the rationalé behind flashing the firmware), the go into LAN mode.

When they are in LAN mode, they will respond to WebSocket connections over port 8081, making it easy to control them directly.

In my router (running LEDE), I can set a specific range of IP addresses to be unable to connect out to the internet, and then all I need to do is make sure the devices get one of these IP addresses.

The configuration process is something like:

  • Touch the switch toggle for 7 seconds. This puts it into pairing mode, where it acts as an Access Point (AP).
  • Connect to the new WiFi network ITEAD_100000xxxxxx.
  • Get the MAC address of the device at 10.10.7.1
  • Tell the router to reserve an IP address in the required range for this MAC address.
  • POST data to the device (10.10.7.1) with a JSON object that contains the WiFi credentials. This will trigger the device to disconnect the AP, and connect to the WiFi network. It’s also possible at this time to tell it to connect to a different server (which I may do instead at some point, but this method was quicker for now).
  • Connected to your WiFi, send JSON messages over a WebSocket connection to the device (at it’s fixed IP address).

I’m hoping at some point to automate this, but it’s meaningless to do so until I get a bunch more devices.


So, on to the software.

Ultimately, the plan is to control these devices using HomeKit. I started writing a direct bridge (similar to my MQTT HomeKit bridge), but then decided it would be simpler to just bridge to MQTT - I could then use the correct topic names and values to allow it to interact with that MQTT HomeKit bridge.

There’s really only two things to do:

  • Connect to the Sonoff device, and wait for events from there as to the switch state. Push these changes to our MQTT topic.
  • Connect to the MQTT broker, and subscribe to our topic. When we get events, push these to the Sonoff device.

I attempted to play around with asyncio to get this to work, but I can’t remember enough about how to use it, so I went for an easier (for me) solution.

At this stage, it’s just a single Sonoff being controlled.

import json
import time
import enum

from websocket import create_connection
from paho.mqtt import client as mqtt

API_KEY = 'bba2e54d-7202-4a75-bd26-307597a1dd7d'
TOPIC = 'HomeKit/sonoff-{}/Lightbulb/On'


class State(enum.Enum):
    ON = 'on'
    OFF = 'off'

    @classmethod
    def parse(cls, data):
        if data in [cls.ON.value, True, 'true', 1, '1']:
            return cls.ON
        elif data in [cls.OFF.value, False, 'false', 0, '0']:
            return cls.OFF
        value = json.loads(data)['params']['switch']
        if value == cls.ON.value:
            return cls.ON
        return cls.OFF

    def __invert__(self):
        if self == State.ON:
            return State.OFF
        return State.ON

    def __bool__(self):
        return self == State.ON


class Sonoff:
    def __init__(self, host):
        self._state = None
        timestamp = str(time.time()).replace('.', '')
        self.ws = create_connection('ws://{}:8081/'.format(host))
        self.ws.send(json.dumps({
            'action': 'userOnline',
            'ts': timestamp,
            'version': 6,
            'apikey': API_KEY,
            'sequence': timestamp,
            'userAgent': 'HomeKit'
        }))
        self.deviceid = json.loads(self.ws.recv())['deviceid']
        print('Connectod to {}'.format(self.deviceid))

        self.client = mqtt.Client()
        self.client.on_connect = self.mqtt_init
        self.client.on_message = self.handle_mqtt_message
        self.client.connect('mqtt.lan', 1883, 60)

        self.state = State.parse(self.ws.recv())
        print('Current state is {}'.format(self._state.name))

    @property
    def topic(self):
        return TOPIC.format(self.deviceid)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value: State):
        if value == self.state:
            return
        timestamp = (str(time.time())).replace('.', '')
        self.ws.send(json.dumps({
            "action": "update",
            "deviceid": "nonce",
            "apikey": "nonce",
            "selfApikey": "nonce",
            "params": {
                "switch": value.value
            },
            "sequence": timestamp,
            "userAgent": "app"
        }))
        self._state = value
        self.client.publish(self.topic, int(bool(self.state)), retain=1)

    def on(self):
        self.state = State.ON

    def off(self):
        self.state = State.OFF

    def toggle(self):
        self.state = ~self.state

    def wait_for_ws(self):
        result = self.ws.recv()
        if 'switch' in result:
            self.state = State.parse(result)

    def handle_mqtt_message(self, client, userdata, message):
        self.state = State.parse(message.payload.decode())

    def mqtt_init(self, client, userdata, flags, rc):
        client.subscribe(self.topic)
        print("Subscribed to {}".format(self.topic))

    def start(self):
        self.client.loop_start()
        try:
            while True:
                time.sleep(0.01)
                self.wait_for_ws()
        finally:
            self.client.loop_stop()


if __name__ == '__main__':
    sonoff = Sonoff('10.1.10.140')
    sonoff.start()

I’m still not totally happy with the State stuff: I think I’ll use a simpler mapping there. But this works, and integrates nicely with my MQTT HomeKit bridge.

Postgres ALTER TABLE ... USING

Helped out a person in #django IRC some time ago, and learned something new about Postgres.

I’ve had data type migrations in the past (where you change a database column in some way, and need to alter the data that is already in that column), however I’ve created a function to do so.

It turns out you can just write an expression, and that works too:

  ALTER TABLE defect_defect
 ALTER COLUMN risk_rating
          SET DATA TYPE INTEGER
              USING CASE WHEN risk_rating = 'HIGH'   THEN 3
                         WHEN risk_rating = 'MEDIUM' THEN 2
                         WHEN risk_rating = 'LOW'    THEN 1;

Of course, this still would cause issues if you had code running from the old version (that expected the text values). However, it is nice to know.

Keyset Pagination in Django

Pagination is great. Nothing worse than having an HTML page that renders 25000 rows in a table.

Django Pagination is also great. It makes it super easy to declare that a view (that inherits from MultipleObjectMixin) should paginate its results:

class List(ListView):
    queryset = Foo.objects.order_by('bar', '-baz')
    paginate_by = 10
    template_name = 'foo.html'

Django pagination uses the LIMIT/OFFSET method. This is fine for smaller offsets, but once you start getting beyond a few pages, it can perform really badly. This is because the database needs to fetch all of the previous rows, even though it discards them.

Using Keyset Pagination allows for better performing “next page” fetches, at the cost of not being able to randomly fetch a page. That is, if you know the last element from page N-1, then you may fetch page N, but otherwise you really can’t.

Keyset Pagination, sometimes called the Seek Method, has been documented by Markus Winand and Joe Nelson. If you are not familiar with the concept, I strongly suggest you read the articles above.

Django’s pagination is somewhat pluggable: you may switch out the paginator in a Django ListView, for instance, allowing you to do things like switch the Page class, or how the various parts are computed. I used it recently to allow for a different query to be used when calculating the total number of objects in a queryset, to vastly improve performance of a particular paginated queryset.

However, there are limits. Both the view and the paginator expect (nay, demand) an integer page number, which, as we shall see shortly, will not work in this case. I also feel like the view is over-reaching it’s remit by casting the page number to an integer, as I’ll discuss below.

In order to get consistent results with any type of pagination, you must ensure that the ordering on the queryset is stable: that is, there are no rows that will be ‘tied’. Doing otherwise will mean that the database will “break the tie”, and not always in the same order. I’ve seen a bug that was extremely hard to track down that was caused by exactly this problem (and that was just with OFFSET pagination).

Often, to ensure stable ordering, the primary key is used as the “last” sort column. This is perfectly valid, but is not always necessary.

Because in many cases we will need to sort by multiple columns, we’ll need some mechanism for passing through to the paginator the “last value” in a given page for each of these columns. For instance, if we are ordering by timestamp and then group, we would need to pass through both the timestamp and the group of the last object. Because I like to use GET forms to allow me to paginate filtered results, I’ll want to have all of the values combined into one query parameter. If you were constructing links instead, you could look at having these as different parameters. However, you’d still need to be careful, because you aren’t filtering all results on these parameters. Having them serialised into a single parameter (using JSON) means that they are all in the one place, and you can just use that for the filtering to get the page results.

I’ve built a working implementation of keyset pagination, at least for forwards traversal, at django-keyset-pagination.

We can see from this that there really is not that much that we needed to do. We use a different Page object, which enables us to change what the next_page_number will generate. When I figure out how, it will also allow us to work out the previous_page_number

Likewise, we needed to change how we validate a page number, and how we fetch results for a page. That method, _get_page(number) is the one that does most of the work.

Ultimately, we wind up with a filter that looks like:

  WHERE (A < ?) OR (A = ? AND B > ?) OR (A = ? AND B = ? AND C < ?)

The direction of the test (< vs >) depends upon the sorting of that column, but hopefully you get the idea.

In order to enable the query planner to be able to use an index effectively (if one exists), we need to adjust this to (thanks Markus):

WHERE A <= ? AND (
  (A < ?) OR (A = ? AND B > ?) ...
)

It’s also possible, in Postgres at least, to use a ROW() constructor comparison to order rows. However, this only works if the direction of each column ordering is the same, which in my use case it was not. I have a proof of concept of using ROW() constructors, but I need to figure out how to detect if they are available to the database in use.

In order to use the new paginator, we need to work around some issues in the Django class based views: namely that they force an integer value (or use the special string last, neither of which are acceptable in this case):

class PaginateMixin(object):
    "Make pagination work for non integer page numbers"
    def paginate_queryset(self, queryset, page_size):
        # This is very similar to how django currently (2.1) does it: I may submit a PR to use this
        # mechanism instead, as it is more flexible.
        paginator = self.get_paginator(
            queryset, page_size, orphans=self.get_paginate_orphans(),
            allow_empty_first_page=self.get_allow_empty()
        )
        page_kwarg = self.page_kwarg
        page = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1

        try:
            page_number = paginator.validate_number(page_number)
        except ValueError:
            raise Http404(_('Page could not be parsed.'))

        try:
            page = paginator.page(page_number)
            return (paginator, page, page.object_list, page.has_other_pages())
        except InvalidPage as e:
            raise Http404(
                _('Invalid page (%(page_number)s): %(message)s') % {
                    'page_number': page_number,
                    'message': str(e)
                }
            )

There’s really only one change: instead of just casting the page number to an integer, we let the paginator handle that.

Okay, once all that is done, we can use our paginator:

class List(PaginateMixin, ListView):
    paginator_class = KeysetPaginator
    paginate_by = 10

    def get_queryset(self):
        return Foo.objects.order_by('timestamp', 'bar', 'baz')

We’ll need to change our template rendering to only render a next page link or button, rather than trying to render them for each page. We also don’t have any way to return to the previous page: I’m still working through a mechanism for that.


This post was originally written using the ROW() constructor, and this part of the post discussed the shortcomings. Now that has been resolved, the main shortcoming is that it is not yet possible to traverse to the previous page of results. In many cases that may not be necessary (we could use a browser’s back button, or rely on the fact that if it’s infinite scrolling the data is already in the document), however I would like to investigate how hard it is to actually get the previous page.