Versioning complex database migrations

Recently, I’ve been writing lots of raw SQL code that is either a complex VIEW, or a FUNCTION. Much of the time these will be used as the “source” for a Django model, but not always. Sometimes, there are complex functions that need to be run as a trigger in Postgres, or even a rule to execute when attempting a write operation on a view.

Anyway, these definitions are all code, and should be stored within the project they belong to. Using Django’s migrations you can apply them at the appropriate time, using a RunSQL statement.

Hovewer, you don’t really want to have the raw SQL in the migration file. Depending upon the text editor, it may not syntax highlight correctly, and finding the correct definition can be difficult.

Similarly, you don’t want to just have a single file, because to recreate the database migration sequence, it needs to apply the correct version at the correct time (otherwise, other migrations may fail to apply).

Some time ago, I adopted a policy of manually versioning these files. I have a pattern of naming, that seemed to be working well:

special_app/
  migrations/
    __init__.py
    0001_initial.py
    0002_update_functions.py
  sql/
    foo.function.0001.sql
    foo.function.0002.sql
    foo.trigger.0001.sql
    bar.view.0001.sql

The contents of the SQL files are irrelevant, and the migrations mostly so. There is a custom migration operation I wrote that loads the content from a file:

    LoadSQLScript('special_app', 'foo.function', 1)

The mechanics of how it works are not important.

So, this had been working well for several months, but I had a nagging feeling that the workflow was not ideal. This came to a head in my mind when I recognised that doing code review on database function/view changes was next to impossible.

See, the problem is that there is a completely new file each time you create a new version of a definition.

Instead, I’ve come up with a similar, but different solution. You still need to have versioned scripts for the purpose of historical migrations, but the key difference is that you don’t actually write these. Instead, you have something that notices that the “current” version of a definition is different to the latest version snapshot. You then also have a tool that copies the current version to a new snapshot, and creates a migration.

You can still modify a snapshot (for instance, if you’ve already created one, but it’s only in your current development tree), but mostly you don’t need to.

$ ./manage.py check
System check identified some issues:

WARNINGS:
?: (sql_helpers.W002) The versioned migration file for core: iso8601.function.sql is out of date,
and needs to be updated or a new version created.

Oh, thanks for that. Checking the file, I see that it does indeed need a new version:

$ ./manage.py make_sql_migrations core
...
Copied <project>/core/sql/iso8601.function.sql to version 0002

You still need to ensure that any dependencies between SQL files are dealt with appropriately (for instance, a function that relies on a newly added column to a view needs to have that view’s definition updated before the function can be updated). But this is a much smaller problem, and something that your database should complain about when you try to apply the migrations.

I haven’t packaged this up yet, it’s currently only an internal app, as I want to use it a bit for the next little while and see what else shakes out. I was pretty sure the way I was doing it before was “the best way” just after I thought that up, so we’ll see what happens.

JavaScript Array Widget

I’ve been making more and more use of the django.contrib.postgres classes, and will often store data in an ArrayField where appropriate.

There are two form fields that are supplied with Django for handling these types: one of which has the array values in a single text input (comma separated), and the other has a different text input element for each value.

However, the latter does not really work that well with a dynamic length array (it could work with up to N items, but in my case, I really don’t often have an N).

It could be possible to build similar functionality that you see with the Django Admin’s formset handling like here, however this turns out to be lots of mucking around.

It might be simpler to just have the simple array field rendered, and have JS bolted on that builds the dynamic list of text inputs based on this.

In this instance, I am actually storing the state in the widgets themselves: this means it’s relatively easy to add in the ability to re-order. I’ve done this with the Sortable library.

Django Dynamic Formsets

Django forms are one of the most important parts of the stack: they enable us to write declarative code that will validate user input, and ensure we protect ourselves from malicious input.

Formsets are an extension of this: they deal with a set of homogeous forms, and will ensure that all of the forms are valid independently (and possibly do some inter-form validation, but that’s a topic for a later day).

The Django Admin contains an implementation of a dynamic formset: that is, it handles adding and removing forms from a formset, and maintains the management for accordingly. This post details an alternative implementation.


A Formset contains a Form (and has zero or more instances of that Form). It also contains a “Management Form”, which has metadata about the formset: the number of instances of the form that were provided initially, the number that were submitted by the user, and the maximum number of forms that should be accepted.

A Formset has a “prefix”, which is prepended to each element within the management form:

<input type="hidden" name="prefix-INITIAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-TOTAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MIN_NUM_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MAX_NUM_FORM_COUNT" value="...">

Each Form within the Formset uses the prefix, plus it’s index within the list of forms. For instance, if we have a Formset that contains three forms, each containing a single “name” field, we would have something similar to:

<input type="text" name="prefix-0-name" value="Alice">
<input type="text" name="prefix-1-name" value="Bob">
<input type="text" name="prefix-2-name" value="Carol">

Note that the form’s prefix is <formset_prefix>-<form_index>.

To make a Formset dynamic, we just need to be able to add (and possibly remove, but there’s a little more complexity there) extra forms. The managment form needs to be updated to reflect this, and we need to ensure that the new form’s fields are named appropriately.

A Formset also contains an empty_form. This is an unbound form, where the form’s “index” is set to __prefix__. Thus, the empty form for the above formset might look somewhat like:

<input type="text" name="prefix-__prefix__-name" value="">

We can leverage this to allow us to have simpler code: instead of having to duplicate elements and remove the value, we can just duplicate the empty form, and replace the string __prefix__ with whatever the index of the newly created form should be.

Here’s an implementation that has no dependencies, but does make some assumptions:

Multi-table Inheritance and the Django Admin

Django’s admin interface is a great way to be able to interact with your models without having to write any view code, and, within limits, it’s useful in production too. However, it can quickly get very crowded when you register lots of models.

Consider the situation where you are using Django’s multi-table inheritance:

from django.db import models

from model_utils.managers import InheritanceManager

class Sheep(models.Model):
    sheep_id = models.AutoField(primary_key=True)
    tag_id = models.CharField(max_length=32)
    date_of_birth = models.DateField()
    sire = models.ForeignKey('sheep.Ram', blank=True, null=True, related_name='progeny')
    dam = models.ForeignKey('sheep.Ewe', blank=True, null=True, related_name='progeny')

    objects = InheritanceManager()

    class Meta:
        verbose_name_plural = 'sheep'

    def __str__(self):
        return '{}: {}'.format(self._meta.verbose_name, self.tag_id)


class Ram(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ram'
        verbose_name_plural = 'rams'


class Ewe(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ewe'
        verbose_name_plural = 'ewes'

Ignore the fact there is no specialisation on those child models: in practice you’d normally have some.

Also note that I’ve manually included the primary key, and the parent link fields. This has been done so that the actual columns in the database match, and in this case will all be sheep_id. This will make writing joins slightly simpler, and avoids the (not specific to Django) ORM anti-pattern of “always have a column named id”.

We can use the models like this, but it might be nice to have all sheep in the one admin changelist, and just allow filtering by subclass model.

First, we’ll put some extra stuff onto the parent model, to make obtaining the subclasses simpler. Some of these will use a new decorator, which creates a class version of the @property decorator.

class classproperty(property):
    def __get__(self, cls, owner):
      return self.fget.__get__(None, owner)()


class Sheep(models.Model):
    # Fields, etc. defined as above.

    @classproperty
    @classmethod
    def SUBCLASS_OBJECT_CHOICES(cls):
        "All known subclasses, keyed by a unique name per class."
        return {
          rel.name: rel.related_model
          for rel in cls._meta.related_objects
          if rel.parent_link
        }

    @classproperty
    @classmethod
    def SUBCLASS_CHOICES(cls):
        "Available subclass choices, with nice names."
        return [
            (name, model._meta.verbose_name)
            for name, model in cls.SUBCLASS_OBJECT_CHOICES.items()
        ]

    @classmethod
    def SUBCLASS(cls, name):
        "Given a subclass name, return the subclass."
        return cls.SUBCLASS_OBJECT_CHOICES.get(name, cls)

Note that we don’t need to enumerate the subclasses: adding a new subclass later in development will automatically add it to these properties, even though in this case it would be unlikely to happen.

From these, we can write some nice neat stuff to enable using these in the admin.

from django import forms
from django.conf.urls import url
from django.contrib import admin
from django.utils.translation import ugettext as _

from .models import Sheep


class SubclassFilter(admin.SimpleListFilter):
    title = _('gender')
    parameter_name = 'gender'

    def lookups(self, request, model_admin):
      return Sheep.SUBCLASS_CHOICES

    def queryset(self, request, queryset):
      if self.value():
        return queryset.exclude(**{self.value(): None})
      return queryset


@admin.register(Sheep)
class SheepAdmin(admin.ModelAdmin):
    list_display = [
        'tag_id',
        'date_of_birth',
        'gender'
    ]
    list_filter = [SubclassFilter]

    def get_queryset(self, request):
      return super(SheepAdmin, self).get_queryset(request).select_subclasses()

    def gender(self, obj):
        return obj._meta.verbose_name

    def get_form(self, request, obj=None, **kwargs):
        if obj is None:
            Model = Sheep.SUBCLASS(request.GET.get('gender'))
        else:
            Model = obj.__class__

        # When we change the selected gender in the create form, we want to reload the page.
        RELOAD_PAGE = "window.location.search='?gender=' + this.value"
        # We should also grab all existing field values, and pass them as query string values.

        class ModelForm(forms.ModelForm):
            if not obj:
                gender = forms.ChoiceField(
                    choices=[('', _('Please select...'))] + Sheep.SUBCLASS_CHOICES,
                    widget=forms.Select(attrs={'onchange': RELOAD_PAGE})
                )

            class Meta:
                model = Model
                exclude = ()

        return ModelForm

    def get_fields(self, request, obj=None):
        # We want gender to be the first field.
        fields = super(SheepAdmin, self).get_fields(request, obj)

        if 'gender' in fields:
            fields.remove('gender')
            fields = ['gender'] + fields

        return fields

    def get_urls(self):
        # We want to install named urls that match the subclass ones, but bounce to the relevant
        # superclass ones (since they should be able to handle rendering the correct form).
        urls = super(SheepAdmin, self).get_urls()
        existing = '{}_{}_'.format(self.model._meta.app_label, self.model._meta.model_name)
        subclass_urls = []
        for name, model in Sheep.SUBCLASS_OBJECT_CHOICES.items():
            opts = model._meta
            replace = '{}_{}_'.format(opts.app_label, opts.model_name)
            subclass_urls.extend([
                url(pattern.regex.pattern, pattern.callback, name=pattern.name.replace(existing, replace))
                for pattern in urls if pattern.name
            ])

        return urls + subclass_urls

Wow. There’s quite a lot going on there, but the summary is:

  • We create a custom filter that filters according to subclass.
  • The .select_subclasses() means that objects are downcast to their subclass when fetched.
  • There is a custom form, that, when in create mode, has a selector for the desired subclass.
  • When the subclass is changed (only on the create form), the page is reloaded. This is required in a situation where there are different fields on each of the subclass models.
  • We register the subclass admin url paths, but use the superclass admin views.

I’ve had ideas about this for some time, and have just started using something like this in development: in my situation, there will be an arbitrary number of subclasses, all of which will have several new fields. The code in this page is extracted (and changed) from those ideas, so may not be completely correct. Corrections welcome.

(Directly) Testing Django Formsets

Django Forms are excellent: they offer a really nice API for validating user input. You can quite easily pass a dict of data instead of a QueryDict, which is what the request handling mechanism provides. This makes it trivial to write tests that exercise a given Form’s validation directly. For instance:

def test_my_form(self):
    form = MyForm({
        'foo': 'bar',
        'baz': 'qux'
    })
    self.assertFalse(form.is_valid())
    self.assertTrue('foo' in form.errors)

Formsets are also really nice: they expose a neat way to update a group of homogenous objects. It’s possible to pass a list of dicts to the formset for the initial argument, but, alas, you may not do the same for passing data. Instead, it needs to be structured as the QueryDict would be:

def test_my_formset(self):
    formset = MyFormSet({
        'formset-INITIAL_FORMS': '0',
        'formset-TOTAL_FORMS': '2',
        'formset-0-foo': 'bar1',
        'formset-0-baz': 'qux1',
        'formset-1-foo': 'spam',
        'formset-1-baz': 'eggs'
    })
    self.assertTrue(formset.is_valid())

This is fine if you only have a couple of forms in your formset, but it’s a bit tiresome to have to put all of the prefixes, and is far noisier.

Here’s a nice little helper, that takes a FormSet class, and a list (of dicts), and instantiates the formset with the data coerced into the correct format:

def instantiate_formset(formset_class, data, instance=None, initial=None):
    prefix = formset_class().prefix
    formset_data = {}
    for i, form_data in enumerate(data):
        for name, value in form_data.items():
            if isinstance(value, list):
                for j, inner in enumerate(value):
                    formset_data['{}-{}-{}_{}'.format(prefix, i, name, j)] = inner
            else:
                formset_data['{}-{}-{}'.format(prefix, i, name)] = value
    formset_data['{}-TOTAL_FORMS'.format(prefix)] = len(data)
    formset_data['{}-INITIAL_FORMS'.format(prefix)] = 0

    if instance:
        return formset_class(formset_data, instance=instance, initial=initial)
    else:
        return formset_class(formset_data, initial=initial)

This handles a formset or a model formset. Much easier to use:

def test_my_formset(self):
    formset = instantiate_formset(MyFormSet, [
      {
        'foo': 'bar1',
        'baz': 'qux1',
      },
      {
        'foo': 'spam',
        'baz': 'eggs',
      },
    ])

Django Trees via Closure View

After writing up a method of using a Postgres View that generates a materialised path within the context of a Django model, I came across some queries of my data that were getting rather troublesome to write. It occurred to me that having a closure table would be useful. Specifically, I needed all of the descendants of a given set of nodes.

I couldn’t find an existing Postgres extension that will manage the closure table, and didn’t feel like writing my own implemention using triggers just yet. However, it occurred to me that I could use a similar trick to the recursive materialised path view. Thus, we have a Closure View.

We will start with the Django models:

class Node(models.Model):
    node_id = models.AutoField(primary_key=True)
    parent = models.ForeignKey('tree.Node', related_name='children', null=True, blank=True)

    descendants = models.ManyToManyField('tree.Node', related_name='ancestors', through='tree.Closure')

    class Meta:
        app_label = 'tree'


class Closure(models.Model):
    path = ArrayField(base_field=models.IntegerField(), primary_key=True)
    ancestor = models.ForeignKey('tree.Node', related_name='+')
    descendant = models.ForeignKey('tree.Node', related_name='+')
    depth = models.IntegerField()

    class Meta:
        app_label = 'tree'
        managed = False

You may notice I have a path column. I’m using this for the primary key, and it may turn out to be useful later.

Let’s have a look at the View:

CREATE RECURSIVE VIEW tree_closure(path, ancestor_id, descendant_id, depth) AS

SELECT ARRAY[node_id], node_id, node_id, 0 FROM tree_node

UNION ALL

SELECT parent_id || path, parent_id, descendant_id, depth + 1
FROM tree_node INNER JOIN tree_closure ON (ancestor_id = node_id)
WHERE parent_id IS NOT NULL;

This uses a recursive query. The first part builds the self-reference relations, and the second part uses the RECURSIVE function to collect child nodes for each node already in the table (or added in previous iterations of this part of the view).

Now, because we are using the in-built Django Many to Many features, we have some nice queries ready to go:

  • node.ancestors.all() : All ancestors of a given Node instance.
  • node.descendants.all() : All descendants of a given Node instance.
  • Node.objects.filter(ancestors=queryset) : All descendants of all nodes in a queryset.
  • Node.objects.filter(descendants=queryset) : All ancestors of all nodes in a queryset.

Of particular note are the bottom two: these are rather cumbersome to write in older versions of Django.

Adjacency Lists in Django with Postgres

Today, I’m going to walk through modelling a tree in Django, using an Adjacency List, and a Postgres View that dynamically creates the materialised path of ancestors for each node.

With this, we will be able to query the tree for a range of operations using the Django ORM.

We will start with our model:

class Node(models.Model):
    node_id = models.AutoField(primary_key=True)
    parent = models.ForeignKey('tree.node', related_name='children', null=True, blank=True)

    class Meta:
        app_label = 'tree'

We will also build an unmanaged model that will be backed by our view.

from django.contrib.postgres.fields import ArrayField

class Tree(models.Model):
    root = models.ForeignKey(Node, related_name='+')
    node = models.OneToOneField(Node, related_name='tree_node', primary_key=True)
    ancestors = ArrayField(base_field=models.IntegerField())

    class Meta:
        app_label = 'tree'
        managed = False

You’ll notice I’ve included a root relation. This could be obtained by using ancestors[0] if ancestors else node_id, but that’s a bit cumbersome.

So, on to the View:

CREATE RECURSIVE VIEW tree_tree(root_id, node_id, ancestors) AS

SELECT node_id, node_id, ARRAY[]::INTEGER[]
FROM tree_node WHERE parent_id IS NULL

UNION ALL

SELECT tree.root_id, node.node_id, tree.ancestors || node.parent_id
FROM tree_node node INNER JOIN tree_tree tree ON (node.parent_id = tree.node_id)

I’ve written this view before, so I won’t go into any detail.

We can create a tree. Normally I wouldn’t specify the primary key, but since we want to talk about those values shortly, I will. It also means you can delete them, and recreate with this code, and not worry about the sequence values.

from tree.models import Node

Node.objects.bulk_create([
  Node(pk=1),
  Node(pk=2, parent_id=1),
  Node(pk=3, parent_id=1),
  Node(pk=4, parent_id=2),
  Node(pk=5, parent_id=2),
  Node(pk=6, parent_id=3),
  Node(pk=7, parent_id=3),
  Node(pk=8, parent_id=4),
  Node(pk=9, parent_id=8),
  Node(pk=10),
  Node(pk=11, parent_id=10),
  Node(pk=12, parent_id=11),
  Node(pk=13, parent_id=11),
  Node(pk=14, parent_id=12),
  Node(pk=15, parent_id=12),
  Node(pk=16, parent_id=12),
])

Okay, let’s start looking at how we might perform some operations on it.

We’ve already seen how to create a node, either root or leaf nodes. No worries there.

What about inserting an intermediate node, say between 11 and 12?

node = Node.objects.create(parent_id=11)
node.parent.children.exclude(pk=node.pk).update(parent=node)

I’m not sure if it is possible to do it in a single statement.

Okay, let’s jump to some tree-based statements. We’ll start by finding a sub-tree.

Node.objects.filter(tree_node__ancestors__contains=[2])

Oh, that’s pretty nice. It’s not necessarily sorted, but it will do for now.

We can also query directly for a root:

Node.objects.filter(tree_node__root=10)

We could spell that one as tree_node__ancestors__0=10, but I think this is more explicit. Also, that one will not include the root node itself.

Deletions are also simple: if we can build a queryset, we can delete it. Thus, deleting a full tree could be done by following any queryset by a .delete()

Fetching a node’s ancestors is a little trickier: because we only have an array of node ids; thus it does two queries.

Node.objects.filter(pk__in=Node.objects.get(pk=15).tree_node.ancestors)

The count of ancestors doesn’t require the second query:

len(Node.objects.get(pk=15).tree_node.ancestors)

Getting ancestors to a given depth is also simple, although it still requires two queries:

Node.objects.filter(pk__in=Node.objects.get(pk=15).tree_node.ancestors[-2:])

This is a fairly simple way to enable relatively performance-aware queries of tree data. There are still places where it’s not perfect, and in reality, you’d probably look at building up queryset or model methods for wrapping common operations.

slugify() for postgres (almost)

A recent discussion in #django suggested “what we need is a PG slugify function”.

The actual algorithm in Django for this is fairly simple, and easy to follow. Shouldn’t be too hard to write it in SQL.

Function slugify(value, allow_unicode=False).

  • Convert to ASCII if allow_unicode is false
  • Remove characters that aren’t alphanum, underscores, hyphens
  • Strip leading/trailing whitespace
  • Convert to lowercase
  • Convert spaces to hyphens
  • Remove repeated hyphens

(As an aside, the comment in the django function is slightly misleading: if you followed the algorithm there, you’d get a different result with respect to leading trailing whitespace. I shall submit a PR).

We can write an SQL function that uses the Postgres unaccent extension to get pretty close:

CREATE OR REPLACE FUNCTION slugify("value" TEXT, "allow_unicode" BOOLEAN)
RETURNS TEXT AS $$

  WITH "normalized" AS (
    SELECT CASE
      WHEN "allow_unicode" THEN "value"
      ELSE unaccent("value")
    END AS "value"
  ),
  "remove_chars" AS (
    SELECT regexp_replace("value", E'[^\w\s-]', '', 'gi') AS "value"
    FROM "normalized"
  ),
  "lowercase" AS (
    SELECT lower("value") AS "value"
    FROM "remove_chars"
  ),
  "trimmed" AS (
    SELECT trim("value") AS "value"
    FROM "lowercase"
  ),
  "hyphenated" AS (
    SELECT regexp_replace("value", E'[-\s]+', '-', 'gi') AS "value"
    FROM "trimmed"
  )
  SELECT "value" FROM "hyphenated";

$$ LANGUAGE SQL STRICT IMMUTABLE;

I’ve used a CTE to get each step as a seperate query: you can do it with just two levels if you don’t mind looking at nested function calls:

CREATE OR REPLACE FUNCTION slugify("value" TEXT, "allow_unicode" BOOLEAN)
RETURNS TEXT AS $$

  WITH "normalized" AS (
    SELECT CASE
      WHEN "allow_unicode" THEN "value"
      ELSE unaccent("value")
    END AS "value"
  )
  SELECT regexp_replace(
    trim(
      lower(
        regexp_replace(
          "value",
          E'[^\w\s-]',
          '',
          'gi'
        )
      )
    ),
    E'[-\s]+', '-', 'gi'
  ) FROM "normalized";

$$ LANGUAGE SQL STRICT IMMUTABLE;

To get the default value for the second argument, we can have an overloaded version with only a single argument:

CREATE OR REPLACE FUNCTION slugify(TEXT)
RETURNS TEXT AS 'SELECT slugify($1, false)' LANGUAGE SQL IMMUTABLE STRICT;

Now for some tests. I’ve been using pgTAP lately, so here’s some tests using that:

BEGIN;

SELECT plan(7);

SELECT is(slugify('Hello, World!', false), 'hello-world');
SELECT is(slugify('Héllø, Wørld!', false), 'hello-world');
SELECT is(slugify('spam & eggs', false), 'spam-eggs');
SELECT is(slugify('spam & ıçüş', true), 'spam-ıçüş');
SELECT is(slugify('foo ıç bar', true), 'foo-ıç-bar');
SELECT is(slugify('    foo ıç bar', true), 'foo-ıç-bar');
SELECT is(slugify('你好', true), '你好');

SELECT * FROM finish();

ROLLBACK;

And we get one failing test:

=# SELECT is(slugify('你好', true), '你好');

          is
──────────────────────
 not ok 7            ↵
 # Failed test 7     ↵
 #         have:     ↵
 #         want: 你好
(1 row)

Time: 2.004 ms

It seems there is no way to get the equivalent to the python re.U flag on a postgres regular expression function, so that is as close as we can get.

Row Level Security in Postgres and Django

Postgres keeps introducing new things that pique my attention. One of the latest ones of these is Row Level Permissions, which essentially hides rows that a given database user cannot view. There’s a bit more to it than that, a good writeup is at Postgres 9.5 feature highlight: Row-Level Security and Policies.

However, it’s worth noting that the way Django connects to a database uses a single database user. Indeed, if your users table is in your database, then you’ll need some way to connect to it to authenticate. I haven’t come up with a nice way to use the Postgres users for authentication within Django just yet.

I did have an idea about a workflow that may just work.

  • Single Postgres User is used for authentication (Login User).
  • Every Django user gets an associated Postgres User (Session User), that may not log in.
  • This Session User is automatically created using a Postgres trigger, whenever the Django users table is updated.
  • After authentication, a SET SESSION ROLE (or SET SESSION AUTHORIZATION) statement is used to change to the correct Session User for the remainder of the session.

Then, we can implement the Postgres Row Level Security policies as required.

Initially, I had thought that perhaps the Session Users would have the same level of access as the Login User, however tonight it occurred to me that perhaps this could replace the whole Django permissions concept.


We do have a few things we need to work out before this is a done deal.

  • The trigger function that performs the CREATE ROLE statement when the django users table is updated.
  • Some mechanism of handling GRANT and REVOKE statements.
  • Similarly, some mechanism for showing current permissions for the given user.
  • A middleware that sets the SESSION USER according to the django user.

The simplest part of this is the last one, so we will start there. We can (in the meantime) manually create the users and their permissions to see how well it all goes. No point doing work that doesn’t work.

from django.db import connection


class SetSessionAuthorization(object):
    def process_view(self, request, *args, **kwargs):
        if request.user.pk:
          connection.cursor().execute(
            'SET SESSION SESSION AUTHORIZATION "django:{}"'.format(request.user.pk)
          )

We need to add this to our project’s middleware.

You’ll see we are using roles of the form django:<id>, which need to be quoted. We use the user id rather than the username, because usernames may be changed.

We’ll want to create a user for each of the existing Django users: I currently have a single user in this database, with id 1. I also have an existing SUPERUSER with the name django. We need to use a superuser if we are using SET SESSION AUTHORIZATION, which seems to be the best. I haven’t found anything which really does a good job of explaining how this and SET SESSION ROLE differ.

CREATE USER "django:1" NOLOGIN;
GRANT "django:1" TO django;
GRANT ALL ON ALL TABLES IN SCHEMA public TO public;
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO public;

Note we have just for now enabled full access to all tables and sequences. This will remain until we find a good way to handle this.

We can start up a project using this, and see if it works. Unless I’ve missed something, then it should.

Next, we will turn on row-level-security for the auth_user table, and see if it works.

ALTER TABLE auth_user ENABLE ROW LEVEL SECURITY;

Then try to view the list of users (even as a django superuser). You should see an empty list.

We’ll turn on the ability to see our own user object:

CREATE POLICY read_own_data ON auth_user FOR
SELECT USING ('django:' || id = current_user);

Phew, that was close. Now we can view our user.

However, we can’t update it. Let’s fix that:

CREATE POLICY update_own_user_data ON auth_user FOR
UPDATE USING ('django:' || id = current_user)
WITH CHECK ('django:' || id = current_user);

We should be able to do some magic there to prevent a user toggling their own superuser status.

Let’s investigate writing a trigger function that creates a new ROLE when we update the django user.

CREATE OR REPLACE FUNCTION create_shadow_role()
RETURNS TRIGGER AS $$

BEGIN

  EXECUTE 'CREATE USER "django:' || NEW.id || '" NOLOGIN';
  EXECUTE 'GRANT "django:' || NEW.id || '" TO django';

  RETURN NULL;

END;

$$
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path =  public, pg_temp
VOLATILE;


CREATE TRIGGER create_shadow_role
  AFTER INSERT ON auth_user
  FOR EACH ROW
  EXECUTE PROCEDURE create_shadow_role();

Note we still can’t create users from the admin (due to the RLS restrictions thata are there), so we need to resort to ./manage.py createsuperuser again.

Having done that, we should see that our new user gets a ROLE:

# \du
                                        List of roles
 Role name │                         Attributes                         │      Member of
───────────┼────────────────────────────────────────────────────────────┼─────────────────────
 django    │ Superuser                                                  │ {django:1,django:6}
 django:1  │ Cannot login                                               │ {}
 django:6  │ Cannot login                                               │ {}
 matt      │ Superuser, Create role, Create DB                          │ {}
 postgres  │ Superuser, Create role, Create DB, Replication, Bypass RLS │ {}

We should be able to write similar triggers for update. We can, for example, shadow the Django is_superuser attribute to the Postgres SUPERUSER attribute. I’m not sure if that’s a super idea or not.

But we can write a simple function that allows us to see if the current django user is a superuser:

CREATE FUNCTION is_superuser()
RETURNS BOOLEAN AS $$

SELECT is_superuser
FROM auth_user
WHERE 'django:' || id = current_user

$$ LANGUAGE SQL;

We can now use this to allow superuser access to all records:

CREATE POLICY superuser_user_select ON auth_user
FOR SELECT USING (is_superuser);

CREATE POLICY superuser_user_update ON auth_user
FOR UPDATE USING (is_superuser)
WITH CHECK (is_superuser);

That gives us a fair bit of functionality. We still don’t have any mechanism for viewing or setting permissions. Because of the way Django’s permissions work, we can’t quite use the same trick but on the auth_user_user_permissions table, because we’d need to also look at the auth_user_groups table and auth_group_permissions.

I’m still not sure if this is a good idea or not, but it is a fun thought process.

Django second AutoField

Sometimes, your ORM just seems to be out to get you.

For instance, I’ve been investigating a technique for the most important data structure in a system to be essentially immuatable.

That is, instead of updating an existing instance of the object, we always create a new instance.

This requires a handful of things to be useful (and useful for querying).

  • We probably want to have a self-relation so we can see which object supersedes another. A series of objects that supersede one another is called a lifecycle.
  • We want to have a timestamp on each object, so we can view a snapshot at a given time: that is, which phase of the lifecycle was active at that point.
  • We should have a column that unique per-lifecycle: this makes for querying all objects of a lifecycle much simpler (although we can use a recursive query for that).
  • There must be a facility to prevent multiple heads on a lifecycle: that is, at most one phase of a lifecycle may be non-superseded.
  • The lifecycle phases needn’t be in the same order, or really have any differentiating features (like status). In practice they may, but for the purposes of this, they are just “what it was like at that time”.

I’m not sure these ideas will ever get into a released product, but the work behind them was fun (and all my private work).

The basic model structure might look something like:

class Phase(models.Model):
    phase_id = models.AutoField(primary_key=True)
    lifecycle_id = models.AutoField(primary_key=False, editable=False)

    superseded_by = models.OneToOneField('self',
        related_name='supersedes',
        null=True, blank=True, editable=False
    )
    timestamp = models.DateTimeField(auto_now_add=True)

    # Any other fields you might want...

    objects = PhaseQuerySet.as_manager()

So, that looks nice and simple.

Our second AutoField will have a sequence generated for it, and the database will give us a unique value from a sequence when we try to create a row in the database without providing this column in the query.

However, there is one problem: Django will not let us have a second AutoField in a model. And, even if it did, there would still be some problems. For instance, every time we attempt to create a new instance, every AutoField is not sent to the database. Which breaks our ability to keep the lifecycle_id between phases.

So, we will need a custom field. Luckily, all we really need is the SERIAL database type: that creates the sequence for us automatically.

class SerialField(object):
    def db_type(self, connection):
        return 'serial'

So now, using that field type instead, we can write a bit more of our model:

class Phase(models.Model):
    phase_id = models.AutoField(primary_key=True)
    lifecycle_id = SerialField(editable=False)
    superseded_by = models.OneToOneField('self', ...)
    timestamp = models.DateTimeField(auto_now_add=True)

    def save(self, **kwargs):
        self.pk = None
        super(Phase, self).save(**kwargs)

This now ensures each time we save our object, a new instance is created. The lifecycle_id will stay the same.

Still not totally done though. We currently aren’t handling a newly created lifecycle (which should be handled by the associated postgres sequence), nor are we marking the previous instance as superseded.

It’s possible, using some black magic, to get the default value for a database column, and, in this case, execute a query with that default to get the next value. However, that’s pretty horrid: not to mention it also runs an extra two queries.

Similarly, we want to get the phase_id of the newly created instance, and set that as the superseded_by of the old instance. This would require yet another query, after the INSERT, but also has the sinister side-effect of making us unable to apply the not-superseded-by-per-lifecycle requirement.

As an aside, we can investigate storing the self-relation on the other end - this would enable us to just do:

    def save(self, **kwargs):
        self.supersedes = self.pk
        self.pk = None
        super(Phase, self).save(**kwargs)

However, this turns out to be less useful when querying: we are much more likely to be interested in phases that are not superseded, as they are the “current” phase of each lifecycle. Although we could query, it would be running sub-queries for each row.

Our two issues: setting the lifecycle, and storing the superseding data, can be done with one Postgres BEFORE UPDATE trigger function:

CREATE FUNCTION lifecycle_and_supersedes()
RETURNS TRIGGER AS $$

  BEGIN
    IF NEW.lifecycle_id IS NULL THEN
      NEW.lifecycle_id = nextval('phase_lifecycle_id_seq'::regclass);
    ELSE
      NEW.phase_id = nextval('phase_phase_id_seq'::regclass);
      UPDATE app_phase
        SET superseded_by_id = NEW.phase_id
        WHERE group_id = NEW.group_id
        AND superseded_by_id IS NULL;
    END IF;
  END;

$$ LANGUAGE plpgsql VOLATILE;

CREATE TRIGGER lifecycle_and_supersedes
  BEFORE INSERT ON app_phase
  FOR EACH ROW
  EXECUTE PROCEDURE lifecycle_and_supersedes();

So, now all we need to do is prevent multiple-headed lifecycles. We can do this using a UNIQUE INDEX:

CREATE UNIQUE INDEX prevent_hydra_lifecycles
ON app_phase (lifecycle_id)
WHERE superseded_by_id IS NULL;

Wow, that was simple.

So, we have most of the db-level code written. How do we use our model? We can write some nice queryset methods to make getting the various bits easier:

class PhaseQuerySet(models.query.QuerySet):
    def current(self):
        return self.filter(superseded_by=None)

    def superseded(self):
        return self.exclude(superseded_by=None)

    def initial(self):
        return self.filter(supersedes=None)

    def snapshot_at(self, timestamp):
        return filter(timestamp__lte=timestamp).order_by('lifecycle_id', '-timestamp').distinct('lifecycle_id')

The queries generated by the ORM for these should be pretty good: we could look at sticking an index on the lifecycle_id column.

There is one more thing to say on the lifecycle: we can add a model method to fetch the complete lifecycle for a given phase, too:

    def lifecycle(self):
        return self.model.objects.filter(lifecycle_id=self.lifecycle_id)

(That was why I used the lifecycle_id as the column).


Whilst building this prototype, I came across a couple of things that were also interesting. The first was a mechanism to get the default value for a column:

def database_default(table, column):
    cursor = connection.cursor()
    QUERY = """SELECT d.adsrc AS default_value
               FROM   pg_catalog.pg_attribute a
               LEFT   JOIN pg_catalog.pg_attrdef d ON (a.attrelid, a.attnum)
                                                   = (d.adrelid,  d.adnum)
               WHERE  NOT a.attisdropped   -- no dropped (dead) columns
               AND    a.attnum > 0         -- no system columns
               AND    a.attrelid = %s::regclass
               AND    a.attname = %s"""
    cursor.execute(QUERY, [table, column])
    cursor.execute('SELECT {}'.format(*cursor.fetchone()))
    return cursor.fetchone()[0]

You can probably see why I didn’t want to use this. Other than the aforementioned two extra queries, it’s executing a query with data that comes back from the database. It may be possible to inject a default value into a table that causes it to do Very Bad Things™. We could sanitise it, perhaps ensure it matches a regular expression:

NEXTVAL = re.compile(r"^nextval\('(?P<sequence>[a-zA-Z_0-9]+)'::regclass\)$")

However, the trigger-based approach is nicer in every way.

The other thing I discovered, and this one is really nice, is a way to create an exclusion constraint that only applies if a column is NULL. For instance, ensure that no two classes for a given student overlap, but only if they are not superseded (or deleted).

ALTER TABLE "student_enrolments"
ADD CONSTRAINT "prevent_overlaps"
EXCLUDE USING gist(period WITH &&, student_id WITH =)
WHERE (
  superseded_by_id IS NULL
  AND
  status <> 'deleted'
);