Django ComputedField()

A very common pattern, at least in code that I’ve written (and read) is to annotate on a field that uses an expression that is based on one or more other fields. This could then be used to filter the objects, or just in some other way.

The usual method of doing this is:

from django.db import models
from django.db.models.expressions import F, Value
from django.db.models.function import Concat


class PersonQuerySet(models.query.QuerySet):
    def with_name(self):
        return self.annotate(
            name=Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()),
        )


class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()

    objects = PersonQuerySet.as_manager()

Yes, I’m aware of falsehoods programmers believe about names, but this is an easy-to-follow example.

In order to be able to access the name field, we must use the with_name() queryset method. This is usually okay, but if it is something that we almost always want, it can be a little tiresome. Alternatively, you could override the get_queryset() method of a custom manager, but that makes it somewhat surprising to a reader of the code. There are also some places where a custom manager will not automatically be used, or where it will be cumbersome to include the fields from a custom manager (select_related, for instance).

It would be much nicer if we could write the field declaratively, and have it use the normal django mechanism of defer and only to remove it from the query if required.

class Person(models.Model):
    first_name = models.TextField()
    last_name = models.TextField()
    name = ComputedField(Concat(F('first_name'), Value(' '), F('last_name'), output_field=models.TextField()))

I’ve spent some time digging around in the django source code, and have a fairly reasonable understanding of how fields work, and how queries are built up. But I did wonder how close to a working proof of concept of this type of field we could get without having to change any of the django source code. After all, I was able to backport the entire Subquery expression stuff to older versions of django after writing that. It would be nice to repeat the process here.

There are a few things you need to do to get this to work:

  • store the expression
  • prevent the field from creating a migration
  • ensure the field knows how to interpret data from the database
  • ensure the field adds the expression to it’s serialised version
  • prevent the field from writing data back to the database
  • inject the expression into the query instead of the field name
class ComputedField(models.Field):
    def __init__(self, expression, *args, **kwargs):
        self.expression = expression.copy()
        kwargs.update(editable=False)
        super().__init__(*args, **kwargs)

There is already a mechanism for a field to prevent a migration operation from being generated: it can return a db_type of None.

    def db_type(self, connection):
        return None

We can delegate the responsibility of interpreting the data from the database to the output field of the expression - that’s how it works in the normal operation of expressions.

    def from_db_value(self, value, expression, connection):
        return self.expression.output_field.from_db_value(value, expression, connection)

Storing the expression in the serialised version of a field is explained in the documentation on custom fields:

    def deconstruct(self):
        name, path, args, kwargs = super().deconstruct()
        return name, path, [self.expression] + args, kwargs

To prevent the field from being included in the data we write back to the database turned out to be fairly tricky. There are a couple of mechanisms that can be used, but ultimately the only one that worked in the way I needed was something that is used by the inheritance mechanism. We have to indicate that it is a “private” field. I’m not 100% sure of what the other implications of this might be, but the outcome of making this field private is that it no longer appears in the list of local fields. There is one drawback to this, which I’ll discuss below.

    def contribute_to_class(self, cls, name, private_only=False):
        return super().contribute_to_class(cls, name, True)

So, we only have one task to complete. How do we inject the expression into the query instead of the column?

When django evaluates a queryset, it look at the annotations, and the expressions that are in these. It will then “resolve” these expressions (which means the expression gets told which “query” is being used to evaluate it, allowing it to do whatever it needs to do to make things work).

When a regular field is encountered, it is not resolved: instead it is turned into a Col. This happens in a few different places, but the problem is that a Col should not need to know which query it belongs to: at most it needs to know what the aliased table name is. So, we don’t have a query object we can pass to the resolve_expression method of our expression.

Instead, we’ll need to use Python’s introspection to look up the stack until we find a place that has a reference to this query.

    def get_col(self, alias, output_field=None):
        import inspect

        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        col = self.expression.resolve_expression(query=query)
        col.target = self
        return col

So, how does this code actually work? We go through each frame in the stack, and look for a function (or method, but they are really just functions in python) that matches one of the types we know about that have a reference to the query. Then, we grab that, stop iterating and resolve our expression. We have to set the “target” of our resolved expression to the original field, which is how the Col interface works.

This moves the resolve_expression into the get_col, which is where it needs to be. The (resolved) expression is used as the faked column, and it knows how to generate it’s own SQL, which will be put into the query in the correct location.

And this works, almost.

There is one more situation that needs to be taken into account: when we are referencing the field through a join (the x__y lookup syntax you often see in django filters).

Because F() expressions reference the local query, we need to first turn any of these that we find in our computed field’s expression (at any level) into a Col that refers to the correct model. We need to do this before the resolve_expression takes place.

    def get_col(self, alias, output_field=None):
        query = None

        for frame in inspect.stack():
            if frame.function in ['get_default_columns', 'get_order_by']:
                query = frame.frame.f_locals['self'].query
                break
            if frame.function in ['add_fields', 'build_filter']:
                query = frame.frame.f_locals['self']
                break
        else:
            # Aaargh! We don't handle this one yet!
            import pdb; pdb.set_trace()

        def resolve_f(expression):
            if hasattr(expression, 'get_source_expressions'):
                expression = expression.copy()
                expression.set_source_expressions([
                  resolve_f(expr) for expr in expression.get_source_expressions()
                ])
            if isinstance(expression, models.F):
                field = self.model._meta.get_field(expression.name)
                if hasattr(field, 'expression'):
                    return resolve_f(field.expression)
                return Col(alias, field)
            return expression

        col = resolve_f(self.expression).resolve_expression(query=query)
        col.target = self
        return col

There is a repo containing this, which has a bunch of tests showing how the different query types can use the computed field:

https://github.com/schinckel/django-computed-field


But wait, there is one more thing…

A very common requirement, especially if you are planning on using this column for filtering, would be to stick an index on there.

Unfortunately, that’s not currently possible: the mechanism for preventing the field name from being in the write queries, making it a private field, prevents using this field in an index. Anyway, function/expression indexes are not currently supported in Django.

It’s not all bad news though: Markus has a Pull Request that will enable this feauture; from there we could (if db_index is set) automatically add an expression index to Model._meta.indexes in contribute_to_class, but it would also be great to be able to use index_together.

I suspect to get that, though, we’ll need another mechansim to prevent it being in the write queries, but still be a local field.

(Thanks to FunkyBob for suggestions, including suggesting the field at all).

Set-returning and row-accepting functions in Django and Postgres

Postgres set-returning functions are an awesome thing. With them, you can do fun things like unnesting and array, and will end up with a new row for each item in the array. For example:

class Post(models.Model):
    author = models.ForeignKey(AUTH_USER_MODEL, related_name='posts')
    tags = ArrayField(base_field=TextField(), null=True, blank=True)
    created_at = models.DateTimeField()
    content = models.TextField()

The equivalent SQL might be something like:

CREATE TABLE blog_post (
  id SERIAL NOT NULL PRIMARY KEY,
  author_id INTEGER NOT NULL REFERENCES auth_user (id),
  tags TEXT[],
  created_at TIMESTAMPTZ NOT NULL,
  content TEXT NOT NULL
);

We can “explode” the table so that we have one tag per row:

SELECT author_id, UNNEST(tags) AS tag, created_at, content
FROM blog_post;

To do the same sort of thing in Django, we can use a Func:

from django.db.models import F, Func

Post.objects.annotate(tag=Func(F('tags'), function='UNNEST'))

In practice, just like in the Django docs, I’ll create a convenience function:

class Unnest(Func):
    function = 'UNNEST'

    @property
    def output_field(self):
        output_fields = [x.output_field for x in self.get_source_expressions()]
        if len(output_fields) == 1:
          return output_fields[0].base_field

        return super(Unnest, self).output_field

The opposite of this is aggregation: in the case of UNNEST, it’s almost ARRAY_AGG, although because of the handling of nested arrays, this doesn’t quite round-trip. We already know how to do aggregation in Django, so I will not discuss that here.

Hovewer, there is another related operation: what if you want to turn a row into something else. In my case, this was turning a row from a result into a JSON object.

SELECT id,
       to_jsonb(myapp_mymodel) - 'id' AS "json"
  FROM myapp_mymodel

This will get all of the columns except ‘id’, and put them into a new column called “json”.

But how do we get Django to output SQL that will enable us to use a Model as the argument to a function? Ultimately, we want to get to the following:

class ToJSONB(Func):
    function = 'TO_JSONB'
    output_field = JSONField()


MyModel.objects.annotate(
  json=ToJSONB(MyModel) - Value('id')
).values('id', 'json')

Our first attempt could be to use RawSQL. However, this has a couple of problems. The first is that we are writing lots of raw SQL, the second is that it won’t work so well if the table is aliased by the ORM. That is, if you use this in a join or subquery, where Django automatically assigns an alias to this table, then referring directly to the table name will not work.

MyModel.objects.annotate(json=Raw("to_jsonb(myapp_mymodel) - 'id'", [], output_field=JSONField()))

Instead, we need to dynamically find out what the current alias for the model is in this query, and use that. We’ll also want to figure out how to “subtract” the id key from the JSON object.

class Table(django.db.models.Expression):
    def __init__(self, model, *args, **kwargs):
        self.model = model
        self.query = None
        super(Table, self).__init__(*args, **kwargs)

    def resolve_expression(self, query, *args, **kwargs):
        clone = super(Table, self).resolve_expression(query, *args, **kwargs)
        clone.query = query
        return clone

    def as_sql(self, compiler, connection, **kwargs):
        if not self.query:
            raise ValueError('Unresolved Table expression')
        alias = self.query.table_map.get(self.model._meta.db_table, [self.model._meta.db_table])[0]
        return compiler.quote_name_unless_alias(alias), []

Okay, there’s a fair bit going on there. Let’s look through it. We’ll start with how we use it:

MyModel.objects.annotate(json=ToJSONB(Table(MyModel)))

We create a Table instance, which stores a reference to the model. Technically, all we need later on is the database table name that will be used, but we’ll keep the model for now.

When the ORM “resolves” the queryset, we grab the query object, and store a reference to that.

When the ORM asks us to generate some SQL, we look at the query object we have a reference to, and see if our model’s table name has an entry in the table_map dict: if so, we get the first entry from that, otherwise we just use the table name.

Okay, what about being able to remove the entry in the JSONB object for ‘id’?

We can’t just use the subtraction operator, because Postgres will try to convert the RHS value into JSONB first, and fail. So, we need to ensure it renders it as TEXT. We also need to wrap it in an ExpressionWrapper, so we can indicate what the output field type will be:

id_value = models.Func(models.Value('id'), template='%(expressions)s::TEXT')
MyModel.objects.annotate(
    json=ExpressionWrapper(
        ToJSONB(Table(MyModel)) - id_value, output_field=JSONField()
    )
)

I also often use a convenience Cast function, that automatically does this based on the supplied output_field, but this is a little easier to use here. Note there is a possible use for ToJSONB in a different context, where it doesn’t take a table, but some other primitive.


There’s one more way we can use this construct: the geo_unique_indexer function from a previous post needs a table name, but also the name of a field to omit from the index. So, we can wrap this up nicely:

class GeoMatch(models.Func):
    function = 'geo_unique_indexer'
    output_field = JSONField()

    def __init__(self, model, *args, **kwargs):
        table = Table(model)
        pk = models.Value(model._meta.pk.db_column or model._meta.pk.name)
        return super(GeoMatch, self).__init__(table, pk, *args, **kwargs)

This is really tidy: it takes the model class (or maybe an instance, I didn’t try), and builds a Table, and gets the primary key. These are just used as the arguments for the function, and then it all works.

Django multitenancy using Postgres Row Level Security

Quite some time ago, I did some experiments in using Postgres Row Level Security (RLS) from within Django.

It occurred to me that this philosophy could be used to model a multi-tenant application.

The main big problem with django-boardinghouse is that you have to apply migrations to multiple schemata. With many tenants, this can take a long time. It’s not easy to do this in a way that would be conducive to having limited downtime.

On the other hand, RLS means that the database restricts which rows of specific tables need to be shown in a given circumstance. Normally, examples of RLS show this by using a different user, but this is not necessary.

In fact, in most modern web applications, a single database user is used for all connections. This has some big benefits (in that a connection to the database can belong to a pool, and be shared by different requests). Luckily, there are other ways to have RLS applied.

One method is to use Postgres’ session variables. This is outlined quite well in Application users vs. Row Level Security. I’m going just use simple session variables, as the facility for doing this will be encapsulated, and based on a key in the Django session - which users cannot set directly. If someone has access to this (or access to setting a Postgres session variable directly, then they have enough access to do whatever they want).

There are some caveats: specifically, the Postgres user must not be a SUPERUSER, but that’s easy to sort out. We’ll be able to continue to use PGBouncers or similar, but only if we use use session pooling (not transaction pooling).


Now, mirroring the previous post, we have a few things that need to happen:

  • We will need some middleware that sets the (postgres) session variable.
  • We may want to have some mechanism for switching tenants (unless a user is tied to a single tenant).
  • We must have a Tenant model of some sort (because we’ll be using foreign keys to this to indicate a given row belongs to a given tenant).
  • We’ll want to be able to enable/force/disable RLS for a given table.
  • We should be able to detect the USING clause (and WITH CHECK clause) for a given table.
  • We must allow the user to overwrite the USING/WITH CHECK clauses for a given table.

It turns out this is much simpler than all of the things that django-boardinghouse needs to do.

It also turns out that we can cascade the USING/WITH CHECK clauses for dependent tables, but we’ll get to that. I’m not sure how well that will perform, but it might be reasonable.


Since all good projects need a clever name, I’ve chosen django-occupation for this one (as a play on multi-tenancy). Thus, you may see the name occupation used in a few places. Also, this will be a strictly Django 2.0+ (and therefore Python3) app!

Let’s start with the easy bits:

# occupation/middleware.py
def ActivateTenant(get_response):
    def middleware(request):
        connection.cursor().execute(
            'SET occupation.active_tenant = %s',
            [request.session.get('active_tenant', '')]
        )
        return get_response(request)
    return middleware

This middleware will set a session variable. Importantly, it always sets this variable, because the rules we will be creating later rely on this being present: exceptions will result from a missing current_setting. Setting it to an empty string will mean that no rows will be returned when no tenant is selected, which is acceptable.

The code for switching tenants is a bit more complicated, and it probably needs to be. It will need some method of detecting if the given user is indeed permitted to switch to the target tenant, which could be dependent on a range of other things. For instance, in our multi-tenant application, an employee needs to be currently (or in the future) employed in order to get access, but some users may get access for other reasons (ie, a Payroll company).

We can use a view that specifically handles this, but with django-boardinghouse I also came up with a middleware that can handle this. There are, in that project, three mechanisms for switching tenants: a query parameter, an HTTP header, and a raw view. The rationalé for this was that a URL (containing a query parameter) could be used to have a permanent link to an object (which works across tenants). The drawback is that it does leak some information (about the tenant id). In practice, having this as a UUID may be nice.

Having a view that switches tenant makes doing a switch (and getting a success code if it works) easy, and having a header might make it easier for an API to switch.

Anyway, we can ignore this requirement for now.

I’ve used the same “swappable” concept in django-boardinghouse that django.contrib.auth uses for swappable user models. This has some nice side effects, but an understanding of how this works is not necessary for understanding what is about to happen next. Instead, let’s look at the definition of some models. Please keep in mind that this is a simplified example, and some parts have been omitted for clarity.

class School(models.Model):
    "This is our Tenant model."
    name = models.CharField(unique=True)

    def __str__(self):
        return self.name


class Student(models.Model):
    name = models.CharField(max_length=128)
    student_number = models.CharField(max_length=16)
    school = models.ForeignKey('School', related_name='students', on_delete=models.CASCADE)

    class Meta:
        unique_together = (
            ('school', 'student_number'),
        )

    def __str__(self):
        return self.name


class Subject(models.Model):
    name = models.CharField(unique=True, max_length=64)

    def __str__(self):
        return self.name

GRADES = [
  # ...
]


class Enrolment(models.Model):
    student = models.ForeignKey(Student, related_name='enrolments', on_delete=models.CASCADE)
    subject = models.ForeignKey(Subject, related_name='enrolments', on_delete=models.CASCADE)
    grade = models.CharField(choices=GRADES, max_length=3, null=True, blank=True)

    def __str__(self):
        if self.grade:
            return '{student} studied {subject}. Grade was {grade}.'.format(
                student=self.student.name,
                subject=self.subject.name,
                grade=self.get_grade_display(),
            )
        return '{student} is enrolled in {subject}.'.format(
            student=self.student.name,
            subject=self.subject.name,
        )

Okay, wall of code done. There are a few things to note about these models:

  • School is the tenant model.
  • Student has a direct relationship to the tenant model. This is a candidate for RLS.
  • Subject has no relationship to the tenant model. This is a non-tenant (ie, global) model. All instances will be visible to all users.
  • Enrolment has a chained relationship to the tenant model. Because of this, it’s likely that this will also be an RLS model (if the prior models in the chain have RLS restrictions).

Now a digression into some mechanics of RLS.

Enabling RLS for a given table is quite simple. We’ll do two a FORCE, because we are probably the table owner, and without FORCE, table owners may view all rows.

ALTER TABLE school_student ENABLE ROW LEVEL SECURITY;
ALTER TABLE school_student FORCE ROW LEVEL SECURITY;

In the case of a student, the user should only be able to view them if they are currently viewing the school the student belongs to:

CREATE POLICY access_tenant_data ON school_student
USING (school_id::TEXT = current_setting('occupation.active_tenant'))
WITH CHECK (school_id::TEXT = current_setting('occupation.active_tenant'))

Notice that we used the current_setting('occupation.active_tenant') that we configured before. As I mentioned, this policy will throw an exception if the setting is not set, so our middleware sets it to an empty string - which should not match any rows.

The other thing that may look out of place is that we are coercing the school_id to a TEXT. This is because current_setting() returns a text value, even if it was set using a number.

So, what does this actually do?

It restricts the query to only rows that match the USING clause (in the case of a SELECT, UPDATE or DELETE), and then ensures that any rows that are being written (in the case of UPDATE or INSERT) meet the same restriction. This prevents a user accidentally (or on purpose) writing a row that they could not currently view.

So, that’s the SQL. Can we generate this in a nice way from our Django models?

CREATE_POLICY = '''
CREATE POLICY access_tenant_data ON {table}
USING ({fk}::TEXT = current_setting('occupation.active_tenant'))
WITH CHECK ({fk}::TEXT = current_setting('occupation.active_tenant'))'''

def build_policy_clause(model):
    for field in model._meta.fields:
        if field.related_model is School:
            return CREATE_POLICY.format(fk=field.db_column, table=model._meta.db_table)

Again, this is simplified. It only works for a direct link, and naïvely assumes the db_column exists. In practice there’s more to it than that. But that will do for now.

So, given our knowledge of our models, we don’t need to enable RLS for our Subject model, but we want to enable it for our Enrolment model. In fact, we will need to - otherwise a user would be able to load up an Enrolment object, but not be able to see the related Student.

In fact, we use this relation (and the fact that the restriction is already applied to all queries) to make our policy for that table simpler:

CREATE POLICY access_tenant_data ON school_enrolment
USING (student_id IN (SELECT id FROM school_student))
WITH CHECK (student_id IN (SELECT id FROM school_student))

Notably, this sort of CHECK happens every time Postgres writes a FOREIGN KEY reference: we need to repeat it because FK references are not subject to RLS, but we basically want to make it so they are.

Interestingly, because of the cascading nature of this configuration, we don’t need to include the current_setting call at all, because that happens in the inner query.

However, it does concern me that this will result in more work in the database. I’ll have to run some tests on larger data sets to see how this performs.

Building up the SQL to use there is slightly more complicated: we need to look at every foreign key on the model and see which of them can trace a chain up to the tenant model. Then we’d need a clause in the USING/WITH CHECK for each of those foreign keys.

I do have some code that does this, but it’s not very pretty.

Also, I’d like to be able to come up with a way that generates this SQL using more of the ORM, but I’m not sure it’s really necessary, since the resulting code is quite simple.

As for applying these changes - the two solutions are to create a RunSQL call for each required statement, and writing this directly to the migration file, or having a migration operation that executes the SQL. I’m not sure which way I’ll drop with that just yet.


I do have a proof of concept for this up and running (code is available at django-occupation). There are still some things I want to figure out.

  • Cross-tenant queries are a thing in my domain - what is the best mechanism for doing this. Should there be a postgres session variable that ignores it, or could we enumerate tenants? That would allow restricted cross-tenant queries.
  • Just how well does this perform at scale?
  • How much of this stuff is not really related to multi-tenancy, and could be extracted out into a more generic RLS package?

Django bulk_update without upsert

Postgres 9.5 brings a fantastic feature, that I’ve really been looking forward to. However, I’m not on 9.5 in production yet, and I had a situation that would really have benefitted from being able to use it.

I have to insert lots of objects, but if there is already an object in a given “slot”, then I need to instead update that existing object.

Doing this using the Django ORM can be done one a “one by one” basis, by iterating through the objects, finding which one (if any) matches the criteria, updating that, or creating a new one if there wasn’t a match.

However, this is really slow, as it does two queries for each object.

Instead, it would be great to:

  • fetch all of the instances that could possibly overlap (keyed by the matching criteria)
  • iterate through the new data, looking for a match
    • modify the instance if an existing match is made, and stash into pile “update”
    • create a new instance if no match is found, and stash into the pile “create”
  • bulk_update all of the “update” objects
  • bulk_create all of the “create” objects

Those familiar with Django may recognise that there is only one step here that cannot be done as of “now”.

So, how can we do a bulk update?

There are two ways I can think of doing it (at least with Postgres):

  • create a temporary table (cloning the structure of the table)
  • insert all of the data into this table
  • update the rows in the original table from the temporary table, based on pk column

and:

  • come up with some mechanism of using the UPDATE the_table SET ... FROM () sq WHERE sq.pk = the_table.pk syntax

It’s possible to use some of the really nice features of Postgres to create a temporary table, that clones an existing table, and will automatically be dropped at the end of the transaction:

BEGIN;

CREATE TEMPORARY TABLE upsert_source (LIKE my_table INCLUDING ALL) ON COMMIT DROP;

-- Bulk insert into upsert_source

UPDATE my_table
   SET foo = upsert_source.foo,
       bar = upsert_source.bar
  FROM upsert_source
 WHERE my_table.id = upsert_source.id;

The drawbacks of this are that it does two extra queries, but it is possible to implement fairly simply:

from django.db import transaction, connection

@transaction.atomic
def bulk_update(model, instances, *fields):
    cursor = connection.cursor()
    db_table = model._meta.db_table

    try:
        cursor.execute(
            'CREATE TEMPORARY TABLE update_{0} (LIKE {0} INCLUDING ALL) ON COMMIT DROP'.format(db_table)
        )

        model._meta.db_table = 'update_{}'.format(db_table)
        model.objects.bulk_create(instances)

        query = ' '.join([
            'UPDATE {table} SET ',
            ', '.join(
                ('%(field)s=update_{table}.%(field)s' % {'field': field})
                for field in fields
            ),
            'FROM update_{table}',
            'WHERE {table}.{pk}=update_{table}.{pk}'
        ]).format(
            table=db_table,
            pk=model._meta.pk.get_attname_column()[1]
        )
        cursor.execute(query)
    finally:
        model._meta.db_table = db_table

The avantage of this is that it mostly just uses the ORM. There’s limited scope for SQL injection (although you’d probably want to validate the field names).

It’s also possible to do the update directly from a subquery, but without the nice column names:

UPDATE my_table
   SET foo = upsert_source.column2,
       column2 = upsert_source.column3
  FROM (
    VALUES (...), (...)
  ) AS upsert_source
 WHERE upsert_source.column1 = my_table.id;

Note that you must make sure your values are in the correct order (with the primary key first).

Attempting to prevent some likely SQL injection vectors, we want to build up the fixed parts of the query (and the parts that are controlled by the django model, like the table and field names), and then pass the values in as query parameters.

from django.db import connection

def bulk_update(model, instances, *fields):
    set_fields = ', '.join(
        ('%(field)s=update_{table}.column%(i)s' % {'field': field, 'i': i + 2})
        for i, field in enumerate(fields)
    )
    value_placeholder = '({})'.format(', '.join(['%s'] * (len(fields) + 1)))
    values = ','.join([value_placeholder] * len(instances))
    query = ' '.join([
        'UPDATE {table} SET ',
        set_fields,
        'FROM (VALUES ', values, ') update_{table}',
        'WHERE {table}.{pk} = update_{table}.column1'
    ]).format(table=model._meta.db_table, pk=model._meta.pk.get_attname_column()[1])
    params = []
    for instance in instances:
        data.append(instance.pk)
        for field in fields:
            params.append(getattr(instance, field))

    connection.cursor().execute(query, params)

This feels like a reasonable first draft, however I’d probably want to go look at how the query for bulk_create is created, and modify that. There’s a fair bit going on there that I haven’t followed as yet though. Note that this does not need the @transaction.atomic decorator, as it is only a single statement.

From here, we can build an upsert that assumes all objects with a PK need to be updated, and those without need to be inserted:

from django.utils.functional import partition
from django.db import transaction

@transaction.atomic
def bulk_upsert(model, instances, *fields):
    update, create = partition(lambda obj: obj.pk is None, instances)
    if update:
        bulk_update(model, update, *fields)
    if create:
        model.objects.bulk_create(create)

Versioning complex database migrations

Recently, I’ve been writing lots of raw SQL code that is either a complex VIEW, or a FUNCTION. Much of the time these will be used as the “source” for a Django model, but not always. Sometimes, there are complex functions that need to be run as a trigger in Postgres, or even a rule to execute when attempting a write operation on a view.

Anyway, these definitions are all code, and should be stored within the project they belong to. Using Django’s migrations you can apply them at the appropriate time, using a RunSQL statement.

Hovewer, you don’t really want to have the raw SQL in the migration file. Depending upon the text editor, it may not syntax highlight correctly, and finding the correct definition can be difficult.

Similarly, you don’t want to just have a single file, because to recreate the database migration sequence, it needs to apply the correct version at the correct time (otherwise, other migrations may fail to apply).

Some time ago, I adopted a policy of manually versioning these files. I have a pattern of naming, that seemed to be working well:

special_app/
  migrations/
    __init__.py
    0001_initial.py
    0002_update_functions.py
  sql/
    foo.function.0001.sql
    foo.function.0002.sql
    foo.trigger.0001.sql
    bar.view.0001.sql

The contents of the SQL files are irrelevant, and the migrations mostly so. There is a custom migration operation I wrote that loads the content from a file:

    LoadSQLScript('special_app', 'foo.function', 1)

The mechanics of how it works are not important.

So, this had been working well for several months, but I had a nagging feeling that the workflow was not ideal. This came to a head in my mind when I recognised that doing code review on database function/view changes was next to impossible.

See, the problem is that there is a completely new file each time you create a new version of a definition.

Instead, I’ve come up with a similar, but different solution. You still need to have versioned scripts for the purpose of historical migrations, but the key difference is that you don’t actually write these. Instead, you have something that notices that the “current” version of a definition is different to the latest version snapshot. You then also have a tool that copies the current version to a new snapshot, and creates a migration.

You can still modify a snapshot (for instance, if you’ve already created one, but it’s only in your current development tree), but mostly you don’t need to.

$ ./manage.py check
System check identified some issues:

WARNINGS:
?: (sql_helpers.W002) The versioned migration file for core: iso8601.function.sql is out of date,
and needs to be updated or a new version created.

Oh, thanks for that. Checking the file, I see that it does indeed need a new version:

$ ./manage.py make_sql_migrations core
...
Copied <project>/core/sql/iso8601.function.sql to version 0002

You still need to ensure that any dependencies between SQL files are dealt with appropriately (for instance, a function that relies on a newly added column to a view needs to have that view’s definition updated before the function can be updated). But this is a much smaller problem, and something that your database should complain about when you try to apply the migrations.

I haven’t packaged this up yet, it’s currently only an internal app, as I want to use it a bit for the next little while and see what else shakes out. I was pretty sure the way I was doing it before was “the best way” just after I thought that up, so we’ll see what happens.

JavaScript Array Widget

I’ve been making more and more use of the django.contrib.postgres classes, and will often store data in an ArrayField where appropriate.

There are two form fields that are supplied with Django for handling these types: one of which has the array values in a single text input (comma separated), and the other has a different text input element for each value.

However, the latter does not really work that well with a dynamic length array (it could work with up to N items, but in my case, I really don’t often have an N).

It could be possible to build similar functionality that you see with the Django Admin’s formset handling like here, however this turns out to be lots of mucking around.

It might be simpler to just have the simple array field rendered, and have JS bolted on that builds the dynamic list of text inputs based on this.

In this instance, I am actually storing the state in the widgets themselves: this means it’s relatively easy to add in the ability to re-order. I’ve done this with the Sortable library.

Django Dynamic Formsets

Django forms are one of the most important parts of the stack: they enable us to write declarative code that will validate user input, and ensure we protect ourselves from malicious input.

Formsets are an extension of this: they deal with a set of homogeous forms, and will ensure that all of the forms are valid independently (and possibly do some inter-form validation, but that’s a topic for a later day).

The Django Admin contains an implementation of a dynamic formset: that is, it handles adding and removing forms from a formset, and maintains the management for accordingly. This post details an alternative implementation.


A Formset contains a Form (and has zero or more instances of that Form). It also contains a “Management Form”, which has metadata about the formset: the number of instances of the form that were provided initially, the number that were submitted by the user, and the maximum number of forms that should be accepted.

A Formset has a “prefix”, which is prepended to each element within the management form:

<input type="hidden" name="prefix-INITIAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-TOTAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MIN_NUM_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MAX_NUM_FORM_COUNT" value="...">

Each Form within the Formset uses the prefix, plus it’s index within the list of forms. For instance, if we have a Formset that contains three forms, each containing a single “name” field, we would have something similar to:

<input type="text" name="prefix-0-name" value="Alice">
<input type="text" name="prefix-1-name" value="Bob">
<input type="text" name="prefix-2-name" value="Carol">

Note that the form’s prefix is <formset_prefix>-<form_index>.

To make a Formset dynamic, we just need to be able to add (and possibly remove, but there’s a little more complexity there) extra forms. The managment form needs to be updated to reflect this, and we need to ensure that the new form’s fields are named appropriately.

A Formset also contains an empty_form. This is an unbound form, where the form’s “index” is set to __prefix__. Thus, the empty form for the above formset might look somewhat like:

<input type="text" name="prefix-__prefix__-name" value="">

We can leverage this to allow us to have simpler code: instead of having to duplicate elements and remove the value, we can just duplicate the empty form, and replace the string __prefix__ with whatever the index of the newly created form should be.

Here’s an implementation that has no dependencies, but does make some assumptions:

Multi-table Inheritance and the Django Admin

Django’s admin interface is a great way to be able to interact with your models without having to write any view code, and, within limits, it’s useful in production too. However, it can quickly get very crowded when you register lots of models.

Consider the situation where you are using Django’s multi-table inheritance:

from django.db import models

from model_utils.managers import InheritanceManager

class Sheep(models.Model):
    sheep_id = models.AutoField(primary_key=True)
    tag_id = models.CharField(max_length=32)
    date_of_birth = models.DateField()
    sire = models.ForeignKey('sheep.Ram', blank=True, null=True, related_name='progeny')
    dam = models.ForeignKey('sheep.Ewe', blank=True, null=True, related_name='progeny')

    objects = InheritanceManager()

    class Meta:
        verbose_name_plural = 'sheep'

    def __str__(self):
        return '{}: {}'.format(self._meta.verbose_name, self.tag_id)


class Ram(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ram'
        verbose_name_plural = 'rams'


class Ewe(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ewe'
        verbose_name_plural = 'ewes'

Ignore the fact there is no specialisation on those child models: in practice you’d normally have some.

Also note that I’ve manually included the primary key, and the parent link fields. This has been done so that the actual columns in the database match, and in this case will all be sheep_id. This will make writing joins slightly simpler, and avoids the (not specific to Django) ORM anti-pattern of “always have a column named id”.

We can use the models like this, but it might be nice to have all sheep in the one admin changelist, and just allow filtering by subclass model.

First, we’ll put some extra stuff onto the parent model, to make obtaining the subclasses simpler. Some of these will use a new decorator, which creates a class version of the @property decorator.

class classproperty(property):
    def __get__(self, cls, owner):
      return self.fget.__get__(None, owner)()


class Sheep(models.Model):
    # Fields, etc. defined as above.

    @classproperty
    @classmethod
    def SUBCLASS_OBJECT_CHOICES(cls):
        "All known subclasses, keyed by a unique name per class."
        return {
          rel.name: rel.related_model
          for rel in cls._meta.related_objects
          if rel.parent_link
        }

    @classproperty
    @classmethod
    def SUBCLASS_CHOICES(cls):
        "Available subclass choices, with nice names."
        return [
            (name, model._meta.verbose_name)
            for name, model in cls.SUBCLASS_OBJECT_CHOICES.items()
        ]

    @classmethod
    def SUBCLASS(cls, name):
        "Given a subclass name, return the subclass."
        return cls.SUBCLASS_OBJECT_CHOICES.get(name, cls)

Note that we don’t need to enumerate the subclasses: adding a new subclass later in development will automatically add it to these properties, even though in this case it would be unlikely to happen.

From these, we can write some nice neat stuff to enable using these in the admin.

from django import forms
from django.conf.urls import url
from django.contrib import admin
from django.utils.translation import ugettext as _

from .models import Sheep


class SubclassFilter(admin.SimpleListFilter):
    title = _('gender')
    parameter_name = 'gender'

    def lookups(self, request, model_admin):
      return Sheep.SUBCLASS_CHOICES

    def queryset(self, request, queryset):
      if self.value():
        return queryset.exclude(**{self.value(): None})
      return queryset


@admin.register(Sheep)
class SheepAdmin(admin.ModelAdmin):
    list_display = [
        'tag_id',
        'date_of_birth',
        'gender'
    ]
    list_filter = [SubclassFilter]

    def get_queryset(self, request):
      return super(SheepAdmin, self).get_queryset(request).select_subclasses()

    def gender(self, obj):
        return obj._meta.verbose_name

    def get_form(self, request, obj=None, **kwargs):
        if obj is None:
            Model = Sheep.SUBCLASS(request.GET.get('gender'))
        else:
            Model = obj.__class__

        # When we change the selected gender in the create form, we want to reload the page.
        RELOAD_PAGE = "window.location.search='?gender=' + this.value"
        # We should also grab all existing field values, and pass them as query string values.

        class ModelForm(forms.ModelForm):
            if not obj:
                gender = forms.ChoiceField(
                    choices=[('', _('Please select...'))] + Sheep.SUBCLASS_CHOICES,
                    widget=forms.Select(attrs={'onchange': RELOAD_PAGE})
                )

            class Meta:
                model = Model
                exclude = ()

        return ModelForm

    def get_fields(self, request, obj=None):
        # We want gender to be the first field.
        fields = super(SheepAdmin, self).get_fields(request, obj)

        if 'gender' in fields:
            fields.remove('gender')
            fields = ['gender'] + fields

        return fields

    def get_urls(self):
        # We want to install named urls that match the subclass ones, but bounce to the relevant
        # superclass ones (since they should be able to handle rendering the correct form).
        urls = super(SheepAdmin, self).get_urls()
        existing = '{}_{}_'.format(self.model._meta.app_label, self.model._meta.model_name)
        subclass_urls = []
        for name, model in Sheep.SUBCLASS_OBJECT_CHOICES.items():
            opts = model._meta
            replace = '{}_{}_'.format(opts.app_label, opts.model_name)
            subclass_urls.extend([
                url(pattern.regex.pattern, pattern.callback, name=pattern.name.replace(existing, replace))
                for pattern in urls if pattern.name
            ])

        return urls + subclass_urls

Wow. There’s quite a lot going on there, but the summary is:

  • We create a custom filter that filters according to subclass.
  • The .select_subclasses() means that objects are downcast to their subclass when fetched.
  • There is a custom form, that, when in create mode, has a selector for the desired subclass.
  • When the subclass is changed (only on the create form), the page is reloaded. This is required in a situation where there are different fields on each of the subclass models.
  • We register the subclass admin url paths, but use the superclass admin views.

I’ve had ideas about this for some time, and have just started using something like this in development: in my situation, there will be an arbitrary number of subclasses, all of which will have several new fields. The code in this page is extracted (and changed) from those ideas, so may not be completely correct. Corrections welcome.

(Directly) Testing Django Formsets

Django Forms are excellent: they offer a really nice API for validating user input. You can quite easily pass a dict of data instead of a QueryDict, which is what the request handling mechanism provides. This makes it trivial to write tests that exercise a given Form’s validation directly. For instance:

def test_my_form(self):
    form = MyForm({
        'foo': 'bar',
        'baz': 'qux'
    })
    self.assertFalse(form.is_valid())
    self.assertTrue('foo' in form.errors)

Formsets are also really nice: they expose a neat way to update a group of homogenous objects. It’s possible to pass a list of dicts to the formset for the initial argument, but, alas, you may not do the same for passing data. Instead, it needs to be structured as the QueryDict would be:

def test_my_formset(self):
    formset = MyFormSet({
        'formset-INITIAL_FORMS': '0',
        'formset-TOTAL_FORMS': '2',
        'formset-0-foo': 'bar1',
        'formset-0-baz': 'qux1',
        'formset-1-foo': 'spam',
        'formset-1-baz': 'eggs'
    })
    self.assertTrue(formset.is_valid())

This is fine if you only have a couple of forms in your formset, but it’s a bit tiresome to have to put all of the prefixes, and is far noisier.

Here’s a nice little helper, that takes a FormSet class, and a list (of dicts), and instantiates the formset with the data coerced into the correct format:

def instantiate_formset(formset_class, data, instance=None, initial=None):
    prefix = formset_class().prefix
    formset_data = {}
    for i, form_data in enumerate(data):
        for name, value in form_data.items():
            if isinstance(value, list):
                for j, inner in enumerate(value):
                    formset_data['{}-{}-{}_{}'.format(prefix, i, name, j)] = inner
            else:
                formset_data['{}-{}-{}'.format(prefix, i, name)] = value
    formset_data['{}-TOTAL_FORMS'.format(prefix)] = len(data)
    formset_data['{}-INITIAL_FORMS'.format(prefix)] = 0

    if instance:
        return formset_class(formset_data, instance=instance, initial=initial)
    else:
        return formset_class(formset_data, initial=initial)

This handles a formset or a model formset. Much easier to use:

def test_my_formset(self):
    formset = instantiate_formset(MyFormSet, [
      {
        'foo': 'bar1',
        'baz': 'qux1',
      },
      {
        'foo': 'spam',
        'baz': 'eggs',
      },
    ])

Django Trees via Closure View

After writing up a method of using a Postgres View that generates a materialised path within the context of a Django model, I came across some queries of my data that were getting rather troublesome to write. It occurred to me that having a closure table would be useful. Specifically, I needed all of the descendants of a given set of nodes.

I couldn’t find an existing Postgres extension that will manage the closure table, and didn’t feel like writing my own implemention using triggers just yet. However, it occurred to me that I could use a similar trick to the recursive materialised path view. Thus, we have a Closure View.

We will start with the Django models:

class Node(models.Model):
    node_id = models.AutoField(primary_key=True)
    parent = models.ForeignKey('tree.Node', related_name='children', null=True, blank=True)

    descendants = models.ManyToManyField('tree.Node', related_name='ancestors', through='tree.Closure')

    class Meta:
        app_label = 'tree'


class Closure(models.Model):
    path = ArrayField(base_field=models.IntegerField(), primary_key=True)
    ancestor = models.ForeignKey('tree.Node', related_name='+')
    descendant = models.ForeignKey('tree.Node', related_name='+')
    depth = models.IntegerField()

    class Meta:
        app_label = 'tree'
        managed = False

You may notice I have a path column. I’m using this for the primary key, and it may turn out to be useful later.

Let’s have a look at the View:

CREATE RECURSIVE VIEW tree_closure(path, ancestor_id, descendant_id, depth) AS

SELECT ARRAY[node_id], node_id, node_id, 0 FROM tree_node

UNION ALL

SELECT parent_id || path, parent_id, descendant_id, depth + 1
FROM tree_node INNER JOIN tree_closure ON (ancestor_id = node_id)
WHERE parent_id IS NOT NULL;

This uses a recursive query. The first part builds the self-reference relations, and the second part uses the RECURSIVE function to collect child nodes for each node already in the table (or added in previous iterations of this part of the view).

Now, because we are using the in-built Django Many to Many features, we have some nice queries ready to go:

  • node.ancestors.all() : All ancestors of a given Node instance.
  • node.descendants.all() : All descendants of a given Node instance.
  • Node.objects.filter(ancestors=queryset) : All descendants of all nodes in a queryset.
  • Node.objects.filter(descendants=queryset) : All ancestors of all nodes in a queryset.

Of particular note are the bottom two: these are rather cumbersome to write in older versions of Django.