NeMeSiS

A long time ago, I was lucky enough to get selected to attend the National Mathematic Summer School, an annual camp run by the Australian National University and the Australian Association of Mathematics Teachers. Around 60 promising students who have just completed Year 11 are invited to attend a 12 day retreat at ANU in Canberra.

In the time since, I’ve been getting the bi-annual (maybe, everything blurs together) newsletter, and reading it with some interest, coupled at times with a feeling of inadequacy as I thought about how little further mathematics study I actually did.

It turns out that I attended NeMeSiS (as the students refer to it) back in 1992, and that was 25 years ago.

In the time since then, I’ve come across exactly two other alumni: one was a student with me at The Levels in 1993 (we had attended NeMeSiS at the same time, I think we met there, and we were friends for a year or two, but I can’t remember his name), and the other I met several years later through Touch Football, and several years after that discovered that we had this shared-although-offset-by-a-year history. She’s a surgeon, and we still bump into one another occasionally.

NeMeSiS was for me a real eye-opener. I went from being (I felt at the time) the smartest kid in the room at all times, to just being some kid. In some ways, I probably didn’t actually deal with it quite the right way - I knew I was no where near as smart as some of the other students (Ben Burton, for instance), I came out of it still feeling superior to everyone I studied with at school/university after that point in time. That probably explains how someone with “lots of potential” ended up failing first-year Engineering Mathematics.

I remember catching the plane from Adelaide, and I think I was seated with a group of other NeMeSiS students. They all knew one another, and I was somewhat of an outsider, as I was actually from the Victorian quota (having been at school in Hamilton, in western Victoria). I have a feeling now that there was more segregation and aloofness of some students, but I did find a home within a small group. Perhaps we were the outsiders, but I didn’t feel at the time that I was being ostracised.

After dropping out of my Engineering degree, I then went and completed an Education degree (which was much less work, I must say). I taught for nearly 10 years, and then did a graduate entry Computer Science degree. I’d taught myself almost everything that was covered in the course work of that degree, so sailed through it with mostly High Distinctions.

I hear lots of people talk about imposter syndrome, and it’s really interesting (to me, at least) that I don’t often feel that in my second career. I think maybe I did have it when I was a teacher, and I feel so much more confident about what I am doing within the scope of my work now that it doesn’t affect me so much. Maybe that’s Dunning-Kruger, but I hope not. I think it’s more about having felt, not exactly out of my depth, but like I was doing something that I was never really supposed to be doing.

Anyway, these thoughts were brought on by the arrival today of the latest newsletter, with mention of how the next one will be the 50th. I’m yet to attend one of my school reunions (both 10 and 20-year versions have passed me by), but maybe I’ll think about going to one for NeMeSiS.

Update: I found a list of “lost alumni”, and it seems that I can remember way too many names: Michael Plavins was (I think) the friend from Uni, and I know that Robert Dunbabin (Bobbit), Kathryn Vaughan and I were quite good friends. Effie Hatzigiannis, Irina Shainsky and Zoran Vukojevic are all names that I had forgotten I knew.

Tree data as a nested list redux

Some time ago, I wrote about using python to aggregate data that is stored with a Materialized Path into a Nested List structure.

But we should be able to do that same aggregation using Postgres, and from an Adjacency List structure.

Let’s start with a table definition:

CREATE TABLE location (
  node_id SERIAL PRIMARY KEY,
  name TEXT,
  parent_id INTEGER REFERENCES location(node_id)
);

And some data:

INSERT INTO location (node_id, name, parent_id) VALUES
  (1, 'Australia', NULL),
  (2, 'South Australia', 1),
  (3, 'Victoria', 1),
  (4, 'South-East', 2),
  (5, 'Western Districts', 3),
  (6, 'New Zealand', NULL),
  (7, 'Barossa Valley', 2),
  (8, 'Riverland', 2),
  (9, 'South Island', 6),
  (10, 'North Island', 6),
  (11, 'Eastern Bay of Plenty', 10);

To begin with, we need to get all of the items, and their depth:

WITH RECURSIVE location_with_level AS (
  SELECT *,
         0 AS lvl
    FROM location
   WHERE parent_id IS NULL

  UNION ALL

  SELECT child.*,
         parent.lvl + 1
    FROM location child
    JOIN location_with_level parent ON parent.node_id = child.parent_id
)
SELECT * FROM location_with_level;
 node_id │         name          │ parent_id │ lvl
─────────┼───────────────────────┼───────────┼─────
       1 │ Australia             │    <NULL> │   0
       6 │ New Zealand           │    <NULL> │   0
       2 │ South Australia       │         1 │   1
       3 │ Victoria              │         1 │   1
       9 │ South Island          │         6 │   1
      10 │ North Island          │         6 │   1
       4 │ South-East            │         2 │   2
       5 │ Western Districts     │         3 │   2
       7 │ Barossa Valley        │         2 │   2
       8 │ Riverland             │         2 │   2
      11 │ Eastern Bay of Plenty │        10 │   2
(11 rows)

Because of the way recursive queries work, we need to find the deepest node(s), and start there:

WITH RECURSIVE location_with_level AS (
  SELECT *,
         0 AS lvl
    FROM location
   WHERE parent_id IS NULL

  UNION ALL

  SELECT child.*,
         parent.lvl + 1
    FROM location child
    JOIN location_with_level parent ON parent.node_id = child.parent_id
),
maxlvl AS (
  SELECT max(lvl) maxlvl FROM location_with_level
)

SELECT * FROM maxlvl;

We then need to build up the tree (this clause is the next one in our CTE chain, I’ve omitted the first two for clarity):

c_tree AS (
  SELECT location_with_level.*,
         NULL::JSONB children
    FROM location_with_level, maxlvl
   WHERE lvl = maxlvl

   UNION

   (
     SELECT (branch_parent).*,
            jsonb_agg(branch_child)
       FROM (
         SELECT branch_parent,
                to_jsonb(branch_child) - 'lvl' - 'parent_id' - 'node_id' AS branch_child
           FROM location_with_level branch_parent
           JOIN c_tree branch_child ON branch_child.parent_id = branch_parent.node_id
       ) branch
       GROUP BY branch.branch_parent

       UNION

       SELECT c.*,
              NULL::JSONB
       FROM location_with_level c
       WHERE NOT EXISTS (SELECT 1
                           FROM location_with_level hypothetical_child
                          WHERE hypothetical_child.parent_id = c.node_id)
   )
)

The first part of this query gets all of the deepest leaf nodes.

This is then combined with another recursive subquery, that creates branches. This relies on the fact it’s possible use the “type”, and have records as columns in a query. The second part of this subquery finds all remaining leaf nodes, and combines them in. This second subquery will keep executing until it doesn’t find any new rows, which will happen when all root nodes have been processed.

We can see from the results of this last clause that we just need to limit this to root nodes:

 node_id │         name          │ parent_id │ lvl │                                                   children
─────────┼───────────────────────┼───────────┼─────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────
       4 │ South-East            │         2 │   2 │ <NULL>
       5 │ Western Districts     │         3 │   2 │ <NULL>
       7 │ Barossa Valley        │         2 │   2 │ <NULL>
       8 │ Riverland             │         2 │   2 │ <NULL>
      11 │ Eastern Bay of Plenty │        10 │   2 │ <NULL>
       3 │ Victoria              │         1 │   1 │ [{"name": "Western Districts", "children": null}]
      10 │ North Island          │         6 │   1 │ [{"name": "Eastern Bay of Plenty", "children": null}]
       9 │ South Island          │         6 │   1 │ <NULL>
       2 │ South Australia       │         1 │   1 │ [{"name": "Riverland", "children": null}, {"name": "Barossa Valley", "children": null}, {"name": "South-East…
         │                       │           │     │…", "children": null}]
       6 │ New Zealand           │    <NULL> │   0 │ [{"name": "South Island", "children": null}, {"name": "North Island", "children": [{"name": "Eastern Bay of …
         │                       │           │     │…Plenty", "children": null}]}]
       1 │ Australia             │    <NULL> │   0 │ [{"name": "South Australia", "children": [{"name": "Riverland", "children": null}, {"name": "Barossa Valley"…
         │                       │           │     │…, "children": null}, {"name": "South-East", "children": null}]}, {"name": "Victoria", "children": [{"name": …
         │                       │           │     │…"Western Districts", "children": null}]}]
(11 rows)

So our final query, using the new jsonb_pretty function:

WITH RECURSIVE location_with_level AS (
  SELECT *,
         0 AS lvl
    FROM location
   WHERE parent_id IS NULL

  UNION ALL

  SELECT child.*,
         parent.lvl + 1
    FROM location child
    JOIN location_with_level parent ON parent.node_id = child.parent_id
),
maxlvl AS (
  SELECT max(lvl) maxlvl FROM location_with_level
),
c_tree AS (
  SELECT location_with_level.*,
         NULL::JSONB children
    FROM location_with_level, maxlvl
   WHERE lvl = maxlvl

   UNION

   (
     SELECT (branch_parent).*,
            jsonb_agg(branch_child)
       FROM (
         SELECT branch_parent,
                to_jsonb(branch_child) - 'lvl' - 'parent_id' - 'node_id' AS branch_child
           FROM location_with_level branch_parent
           JOIN c_tree branch_child ON branch_child.parent_id = branch_parent.node_id
       ) branch
       GROUP BY branch.branch_parent

       UNION

       SELECT c.*,
              NULL::JSONB
       FROM location_with_level c
       WHERE NOT EXISTS (SELECT 1
                           FROM location_with_level hypothetical_child
                          WHERE hypothetical_child.parent_id = c.node_id)
   )
)

SELECT jsonb_pretty(
         array_to_json(
           array_agg(
             row_to_json(c_tree)::JSONB - 'lvl' - 'parent_id' - 'node_id'
           )
         )::JSONB
       ) AS tree
  FROM c_tree
  WHERE lvl=0;

And our results:

                           tree
 ──────────────────────────────────────────────────────────
  [
      {
          "name": "New Zealand",
          "children": [
              {
                  "name": "South Island",
                  "children": null
              },
              {
                  "name": "North Island",
                  "children": [
                      {
                          "name": "Eastern Bay of Plenty",
                          "children": null
                      }
                  ]
              }
          ]
      },
      {
          "name": "Australia",
          "children": [
              {
                  "name": "South Australia",
                  "children": [
                      {
                          "name": "Riverland",
                          "children": null
                      },
                      {
                          "name": "Barossa Valley",
                          "children": null
                      },
                      {
                          "name": "South-East",
                          "children": null
                      }
                  ]
              },
              {
                  "name": "Victoria",
                  "children": [
                      {
                          "name": "Western Districts",
                          "children": null
                      }
                  ]
              }
          ]
      }
  ]
 (1 row)

Oh, that is rather neat.

This query is mostly cribbed from a fantastic Stack Overflow answer by David Guillot.

Django bulk_update without upsert

Postgres 9.5 brings a fantastic feature, that I’ve really been looking forward to. However, I’m not on 9.5 in production yet, and I had a situation that would really have benefitted from being able to use it.

I have to insert lots of objects, but if there is already an object in a given “slot”, then I need to instead update that existing object.

Doing this using the Django ORM can be done one a “one by one” basis, by iterating through the objects, finding which one (if any) matches the criteria, updating that, or creating a new one if there wasn’t a match.

However, this is really slow, as it does two queries for each object.

Instead, it would be great to:

  • fetch all of the instances that could possibly overlap (keyed by the matching criteria)
  • iterate through the new data, looking for a match
    • modify the instance if an existing match is made, and stash into pile “update”
    • create a new instance if no match is found, and stash into the pile “create”
  • bulk_update all of the “update” objects
  • bulk_create all of the “create” objects

Those familiar with Django may recognise that there is only one step here that cannot be done as of “now”.

So, how can we do a bulk update?

There are two ways I can think of doing it (at least with Postgres):

  • create a temporary table (cloning the structure of the table)
  • insert all of the data into this table
  • update the rows in the original table from the temporary table, based on pk column

and:

  • come up with some mechanism of using the UPDATE the_table SET ... FROM () sq WHERE sq.pk = the_table.pk syntax

It’s possible to use some of the really nice features of Postgres to create a temporary table, that clones an existing table, and will automatically be dropped at the end of the transaction:

BEGIN;

CREATE TEMPORARY TABLE upsert_source (LIKE my_table INCLUDING ALL) ON COMMIT DROP;

-- Bulk insert into upsert_source

UPDATE my_table
   SET foo = upsert_source.foo,
       bar = upsert_source.bar
  FROM upsert_source
 WHERE my_table.id = upsert_source.id;

The drawbacks of this are that it does two extra queries, but it is possible to implement fairly simply:

from django.db import transaction, connection

@transaction.atomic
def bulk_update(model, instances, *fields):
    cursor = connection.cursor()
    db_table = model._meta.db_table

    try:
        cursor.execute(
            'CREATE TEMPORARY TABLE update_{0} (LIKE {0} INCLUDING ALL) ON COMMIT DROP'.format(db_table)
        )

        model._meta.db_table = 'update_{}'.format(db_table)
        model.objects.bulk_create(instances)

        query = ' '.join([
            'UPDATE {table} SET ',
            ', '.join(
                ('%(field)s=update_{table}.%(field)s' % {'field': field})
                for field in fields
            ),
            'FROM update_{table}',
            'WHERE {table}.{pk}=update_{table}.{pk}'
        ]).format(
            table=db_table,
            pk=model._meta.pk.get_attname_column()[1]
        )
        cursor.execute(query)
    finally:
        model._meta.db_table = db_table

The avantage of this is that it mostly just uses the ORM. There’s limited scope for SQL injection (although you’d probably want to validate the field names).

It’s also possible to do the update directly from a subquery, but without the nice column names:

UPDATE my_table
   SET foo = upsert_source.column2,
       column2 = upsert_source.column3
  FROM (
    VALUES (...), (...)
  ) AS upsert_source
 WHERE upsert_source.column1 = my_table.id;

Note that you must make sure your values are in the correct order (with the primary key first).

Attempting to prevent some likely SQL injection vectors, we want to build up the fixed parts of the query (and the parts that are controlled by the django model, like the table and field names), and then pass the values in as query parameters.

from django.db import connection

def bulk_update(model, instances, *fields):
    set_fields = ', '.join(
        ('%(field)s=update_{table}.column%(i)s' % {'field': field, 'i': i + 2})
        for i, field in enumerate(fields)
    )
    value_placeholder = '({})'.format(', '.join(['%s'] * (len(fields) + 1)))
    values = ','.join([value_placeholder] * len(instances))
    query = ' '.join([
        'UPDATE {table} SET ',
        set_fields,
        'FROM (VALUES ', values, ') update_{table}',
        'WHERE {table}.{pk} = update_{table}.column1'
    ]).format(table=model._meta.db_table, pk=model._meta.pk.get_attname_column()[1])
    params = []
    for instance in instances:
        data.append(instance.pk)
        for field in fields:
            params.append(getattr(instance, field))

    connection.cursor().execute(query, params)

This feels like a reasonable first draft, however I’d probably want to go look at how the query for bulk_create is created, and modify that. There’s a fair bit going on there that I haven’t followed as yet though. Note that this does not need the @transaction.atomic decorator, as it is only a single statement.

From here, we can build an upsert that assumes all objects with a PK need to be updated, and those without need to be inserted:

from django.utils.functional import partition
from django.db import transaction

@transaction.atomic
def bulk_upsert(model, instances, *fields):
    update, create = partition(lambda obj: obj.pk is None, instances)
    if update:
        bulk_update(model, update, *fields)
    if create:
        model.objects.bulk_create(create)

Versioning complex database migrations

Recently, I’ve been writing lots of raw SQL code that is either a complex VIEW, or a FUNCTION. Much of the time these will be used as the “source” for a Django model, but not always. Sometimes, there are complex functions that need to be run as a trigger in Postgres, or even a rule to execute when attempting a write operation on a view.

Anyway, these definitions are all code, and should be stored within the project they belong to. Using Django’s migrations you can apply them at the appropriate time, using a RunSQL statement.

Hovewer, you don’t really want to have the raw SQL in the migration file. Depending upon the text editor, it may not syntax highlight correctly, and finding the correct definition can be difficult.

Similarly, you don’t want to just have a single file, because to recreate the database migration sequence, it needs to apply the correct version at the correct time (otherwise, other migrations may fail to apply).

Some time ago, I adopted a policy of manually versioning these files. I have a pattern of naming, that seemed to be working well:

special_app/
  migrations/
    __init__.py
    0001_initial.py
    0002_update_functions.py
  sql/
    foo.function.0001.sql
    foo.function.0002.sql
    foo.trigger.0001.sql
    bar.view.0001.sql

The contents of the SQL files are irrelevant, and the migrations mostly so. There is a custom migration operation I wrote that loads the content from a file:

    LoadSQLScript('special_app', 'foo.function', 1)

The mechanics of how it works are not important.

So, this had been working well for several months, but I had a nagging feeling that the workflow was not ideal. This came to a head in my mind when I recognised that doing code review on database function/view changes was next to impossible.

See, the problem is that there is a completely new file each time you create a new version of a definition.

Instead, I’ve come up with a similar, but different solution. You still need to have versioned scripts for the purpose of historical migrations, but the key difference is that you don’t actually write these. Instead, you have something that notices that the “current” version of a definition is different to the latest version snapshot. You then also have a tool that copies the current version to a new snapshot, and creates a migration.

You can still modify a snapshot (for instance, if you’ve already created one, but it’s only in your current development tree), but mostly you don’t need to.

$ ./manage.py check
System check identified some issues:

WARNINGS:
?: (sql_helpers.W002) The versioned migration file for core: iso8601.function.sql is out of date,
and needs to be updated or a new version created.

Oh, thanks for that. Checking the file, I see that it does indeed need a new version:

$ ./manage.py make_sql_migrations core
...
Copied &lt;project&gt;/core/sql/iso8601.function.sql to version 0002

You still need to ensure that any dependencies between SQL files are dealt with appropriately (for instance, a function that relies on a newly added column to a view needs to have that view’s definition updated before the function can be updated). But this is a much smaller problem, and something that your database should complain about when you try to apply the migrations.

I haven’t packaged this up yet, it’s currently only an internal app, as I want to use it a bit for the next little while and see what else shakes out. I was pretty sure the way I was doing it before was “the best way” just after I thought that up, so we’ll see what happens.

JavaScript Array Widget

I’ve been making more and more use of the django.contrib.postgres classes, and will often store data in an ArrayField where appropriate.

There are two form fields that are supplied with Django for handling these types: one of which has the array values in a single text input (comma separated), and the other has a different text input element for each value.

However, the latter does not really work that well with a dynamic length array (it could work with up to N items, but in my case, I really don’t often have an N).

It could be possible to build similar functionality that you see with the Django Admin’s formset handling like here, however this turns out to be lots of mucking around.

It might be simpler to just have the simple array field rendered, and have JS bolted on that builds the dynamic list of text inputs based on this.

In this instance, I am actually storing the state in the widgets themselves: this means it’s relatively easy to add in the ability to re-order. I’ve done this with the Sortable library.

Django Dynamic Formsets

Django forms are one of the most important parts of the stack: they enable us to write declarative code that will validate user input, and ensure we protect ourselves from malicious input.

Formsets are an extension of this: they deal with a set of homogeous forms, and will ensure that all of the forms are valid independently (and possibly do some inter-form validation, but that’s a topic for a later day).

The Django Admin contains an implementation of a dynamic formset: that is, it handles adding and removing forms from a formset, and maintains the management for accordingly. This post details an alternative implementation.


A Formset contains a Form (and has zero or more instances of that Form). It also contains a “Management Form”, which has metadata about the formset: the number of instances of the form that were provided initially, the number that were submitted by the user, and the maximum number of forms that should be accepted.

A Formset has a “prefix”, which is prepended to each element within the management form:

<input type="hidden" name="prefix-INITIAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-TOTAL_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MIN_NUM_FORM_COUNT" value="...">
<input type="hidden" name="prefix-MAX_NUM_FORM_COUNT" value="...">

Each Form within the Formset uses the prefix, plus it’s index within the list of forms. For instance, if we have a Formset that contains three forms, each containing a single “name” field, we would have something similar to:

<input type="text" name="prefix-0-name" value="Alice">
<input type="text" name="prefix-1-name" value="Bob">
<input type="text" name="prefix-2-name" value="Carol">

Note that the form’s prefix is <formset_prefix>-<form_index>.

To make a Formset dynamic, we just need to be able to add (and possibly remove, but there’s a little more complexity there) extra forms. The managment form needs to be updated to reflect this, and we need to ensure that the new form’s fields are named appropriately.

A Formset also contains an empty_form. This is an unbound form, where the form’s “index” is set to __prefix__. Thus, the empty form for the above formset might look somewhat like:

<input type="text" name="prefix-__prefix__-name" value="">

We can leverage this to allow us to have simpler code: instead of having to duplicate elements and remove the value, we can just duplicate the empty form, and replace the string __prefix__ with whatever the index of the newly created form should be.

Here’s an implementation that has no dependencies, but does make some assumptions:

On Fences and Functions

I grew up on a farm.

We had fences on the farm.

Whilst the jobs associated with fences and fencing are less than fun, the fences themselves are extremely important. They keep the livestock in the correct location. When you have a damaged or incomplete fence, even if it is only damaged in a small way, it can cost significant amounts of money, even human lives. This can vary between keeping Rams from a flock of Ewes that you don’t want them to mate with (because you need to know which Ram mated with which Ewes in order to track progeny), to livestock escaping onto a public road and causing accidents.

Fences are a good thing.


My first career was as a Design and Technology Teacher.

We use fences in woodwork. They are attachments to fixed power tools, such as drill presses and circular saws. They allow us to work safely and to get accurate, easily repeatable results. For instance. we can use a fence to cut sheets of MDF to exactly the same width, ensuring the bookcase we are making is square. Without a fence, it can still be done, but it will certainly be much harder.

Fences are a good thing.


I’d heard people describe Postgres’s CTEs (Common Table Expressions) as an “optimisation fence”. Given my previous uses of the word “fence”, I assumed that this was widely regarded as a good thing.

However, after spending some time writing really complex queries (that are most easily described using a CTE), I happened to read PostgreSQL’s CTEs are optimisation fences. It had (throughout my work within Postgres) become plain to me that each term in a CTE is materialised (if it is referenced at all), before any filtering that might occur later would allow it to be filtered earlier. Postgres is pretty good about pushing these changes down into a sub-query, but it can mean that a CTE performs worse, as it might have to do more work. However, this article points this out in some detail, and it occurred to me that perhaps some people see fences (in general) as an obstacle. Perhaps fencing something in has negative connotations?

I’m not sure that that’s exactly what the author meant (I wonder if it was sarcasm, perhaps), but it did get me thinking about how different backgrounds could result in opposite interpretations of the same terms.


I do want to veer back a bit into a technical manner, and discuss how I have been overcoming the fact that it’s not possible to push the filtering back up the stack in a CTE.

Largely, the issue exists in my code because I have lots of complex queries (as I just mentioned) that are far easier to write, reason about and debug when written using a CTE. I would like to write them as a VIEW, and then stick Django models in front of them, and I’d be able to query them using the ORM, and just have the view as the db_table of the model. It would be really nice.

But this doesn’t work, because some of the queries require aggregating data across models of which there are millions of rows, and some of the database tables are less than optimal. For instance, I have several tables that store an effective_from field, and in the case of superseding, the same set of other fields (person, for instance) means we can know which one applies on a given date. However, to query this, we end up writing a more complex query (instead of being able to do a date <@ daterange query, if the valid period was stored in the table). I’ve learned from this in newer models, but some stuff is too deeply ingrained to be able to be changed just yet.

So, I have a VIEW that turns this into a data that actually contains dateranges, and I can query against that. But, if I use this in a CTE, then it can materialise the whole lot, which can be slow. So, I needed to come up with a way to filter the data earlier.

Functions.

I’ve been writing SQL functions that take parameters, and then filter as early as possible. This then means that it’s a real possibility that we can get <100ms queries for stuff that is really, really complicated (and joins a couple of dozen or more tables in really funky ways). It does mean I can’t query using the Django ORM, but that’s okay: the data I’m getting back doesn’t necessarily map onto a model anyway, and we need to use it as a dict.

More recently, I’ve extended this so that the function (with the relevant parameters, extracted out of the queryset WHERE clauses) can be used as the db_table for a Model. It’s still somewhat hacky, but is very interesting, nonetheless.

Weekday Multi-Select Widget

I knocked together a little widget that allows for selecting multiple days of the week.

Multi-table Inheritance and the Django Admin

Django’s admin interface is a great way to be able to interact with your models without having to write any view code, and, within limits, it’s useful in production too. However, it can quickly get very crowded when you register lots of models.

Consider the situation where you are using Django’s multi-table inheritance:

from django.db import models

from model_utils.managers import InheritanceManager

class Sheep(models.Model):
    sheep_id = models.AutoField(primary_key=True)
    tag_id = models.CharField(max_length=32)
    date_of_birth = models.DateField()
    sire = models.ForeignKey('sheep.Ram', blank=True, null=True, related_name='progeny')
    dam = models.ForeignKey('sheep.Ewe', blank=True, null=True, related_name='progeny')

    objects = InheritanceManager()

    class Meta:
        verbose_name_plural = 'sheep'

    def __str__(self):
        return '{}: {}'.format(self._meta.verbose_name, self.tag_id)


class Ram(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ram'
        verbose_name_plural = 'rams'


class Ewe(Sheep):
    sheep = models.OneToOneField(parent_link=True)

    class Meta:
        verbose_name = 'ewe'
        verbose_name_plural = 'ewes'

Ignore the fact there is no specialisation on those child models: in practice you’d normally have some.

Also note that I’ve manually included the primary key, and the parent link fields. This has been done so that the actual columns in the database match, and in this case will all be sheep_id. This will make writing joins slightly simpler, and avoids the (not specific to Django) ORM anti-pattern of “always have a column named id”.

We can use the models like this, but it might be nice to have all sheep in the one admin changelist, and just allow filtering by subclass model.

First, we’ll put some extra stuff onto the parent model, to make obtaining the subclasses simpler. Some of these will use a new decorator, which creates a class version of the @property decorator.

class classproperty(property):
    def __get__(self, cls, owner):
      return self.fget.__get__(None, owner)()


class Sheep(models.Model):
    # Fields, etc. defined as above.

    @classproperty
    @classmethod
    def SUBCLASS_OBJECT_CHOICES(cls):
        "All known subclasses, keyed by a unique name per class."
        return {
          rel.name: rel.related_model
          for rel in cls._meta.related_objects
          if rel.parent_link
        }

    @classproperty
    @classmethod
    def SUBCLASS_CHOICES(cls):
        "Available subclass choices, with nice names."
        return [
            (name, model._meta.verbose_name)
            for name, model in cls.SUBCLASS_OBJECT_CHOICES.items()
        ]

    @classmethod
    def SUBCLASS(cls, name):
        "Given a subclass name, return the subclass."
        return cls.SUBCLASS_OBJECT_CHOICES.get(name, cls)

Note that we don’t need to enumerate the subclasses: adding a new subclass later in development will automatically add it to these properties, even though in this case it would be unlikely to happen.

From these, we can write some nice neat stuff to enable using these in the admin.

from django import forms
from django.conf.urls import url
from django.contrib import admin
from django.utils.translation import ugettext as _

from .models import Sheep


class SubclassFilter(admin.SimpleListFilter):
    title = _('gender')
    parameter_name = 'gender'

    def lookups(self, request, model_admin):
      return Sheep.SUBCLASS_CHOICES

    def queryset(self, request, queryset):
      if self.value():
        return queryset.exclude(**{self.value(): None})
      return queryset


@admin.register(Sheep)
class SheepAdmin(admin.ModelAdmin):
    list_display = [
        'tag_id',
        'date_of_birth',
        'gender'
    ]
    list_filter = [SubclassFilter]

    def get_queryset(self, request):
      return super(SheepAdmin, self).get_queryset(request).select_subclasses()

    def gender(self, obj):
        return obj._meta.verbose_name

    def get_form(self, request, obj=None, **kwargs):
        if obj is None:
            Model = Sheep.SUBCLASS(request.GET.get('gender'))
        else:
            Model = obj.__class__

        # When we change the selected gender in the create form, we want to reload the page.
        RELOAD_PAGE = "window.location.search='?gender=' + this.value"
        # We should also grab all existing field values, and pass them as query string values.

        class ModelForm(forms.ModelForm):
            if not obj:
                gender = forms.ChoiceField(
                    choices=[('', _('Please select...'))] + Sheep.SUBCLASS_CHOICES,
                    widget=forms.Select(attrs={'onchange': RELOAD_PAGE})
                )

            class Meta:
                model = Model
                exclude = ()

        return ModelForm

    def get_fields(self, request, obj=None):
        # We want gender to be the first field.
        fields = super(SheepAdmin, self).get_fields(request, obj)

        if 'gender' in fields:
            fields.remove('gender')
            fields = ['gender'] + fields

        return fields

    def get_urls(self):
        # We want to install named urls that match the subclass ones, but bounce to the relevant
        # superclass ones (since they should be able to handle rendering the correct form).
        urls = super(SheepAdmin, self).get_urls()
        existing = '{}_{}_'.format(self.model._meta.app_label, self.model._meta.model_name)
        subclass_urls = []
        for name, model in Sheep.SUBCLASS_OBJECT_CHOICES.items():
            opts = model._meta
            replace = '{}_{}_'.format(opts.app_label, opts.model_name)
            subclass_urls.extend([
                url(pattern.regex.pattern, pattern.callback, name=pattern.name.replace(existing, replace))
                for pattern in urls if pattern.name
            ])

        return urls + subclass_urls

Wow. There’s quite a lot going on there, but the summary is:

  • We create a custom filter that filters according to subclass.
  • The .select_subclasses() means that objects are downcast to their subclass when fetched.
  • There is a custom form, that, when in create mode, has a selector for the desired subclass.
  • When the subclass is changed (only on the create form), the page is reloaded. This is required in a situation where there are different fields on each of the subclass models.
  • We register the subclass admin url paths, but use the superclass admin views.

I’ve had ideas about this for some time, and have just started using something like this in development: in my situation, there will be an arbitrary number of subclasses, all of which will have several new fields. The code in this page is extracted (and changed) from those ideas, so may not be completely correct. Corrections welcome.

(Directly) Testing Django Formsets

Django Forms are excellent: they offer a really nice API for validating user input. You can quite easily pass a dict of data instead of a QueryDict, which is what the request handling mechanism provides. This makes it trivial to write tests that exercise a given Form’s validation directly. For instance:

def test_my_form(self):
    form = MyForm({
        'foo': 'bar',
        'baz': 'qux'
    })
    self.assertFalse(form.is_valid())
    self.assertTrue('foo' in form.errors)

Formsets are also really nice: they expose a neat way to update a group of homogenous objects. It’s possible to pass a list of dicts to the formset for the initial argument, but, alas, you may not do the same for passing data. Instead, it needs to be structured as the QueryDict would be:

def test_my_formset(self):
    formset = MyFormSet({
        'formset-INITIAL_FORMS': '0',
        'formset-TOTAL_FORMS': '2',
        'formset-0-foo': 'bar1',
        'formset-0-baz': 'qux1',
        'formset-1-foo': 'spam',
        'formset-1-baz': 'eggs'
    })
    self.assertTrue(formset.is_valid())

This is fine if you only have a couple of forms in your formset, but it’s a bit tiresome to have to put all of the prefixes, and is far noisier.

Here’s a nice little helper, that takes a FormSet class, and a list (of dicts), and instantiates the formset with the data coerced into the correct format:

def instantiate_formset(formset_class, data, instance=None, initial=None):
    prefix = formset_class().prefix
    formset_data = {}
    for i, form_data in enumerate(data):
        for name, value in form_data.items():
            if isinstance(value, list):
                for j, inner in enumerate(value):
                    formset_data['{}-{}-{}_{}'.format(prefix, i, name, j)] = inner
            else:
                formset_data['{}-{}-{}'.format(prefix, i, name)] = value
    formset_data['{}-TOTAL_FORMS'.format(prefix)] = len(data)
    formset_data['{}-INITIAL_FORMS'.format(prefix)] = 0

    if instance:
        return formset_class(formset_data, instance=instance, initial=initial)
    else:
        return formset_class(formset_data, initial=initial)

This handles a formset or a model formset. Much easier to use:

def test_my_formset(self):
    formset = instantiate_formset(MyFormSet, [
      {
        'foo': 'bar1',
        'baz': 'qux1',
      },
      {
        'foo': 'spam',
        'baz': 'eggs',
      },
    ])