Django bulk_update without upsert

Postgres 9.5 brings a fantastic feature, that I’ve really been looking forward to. However, I’m not on 9.5 in production yet, and I had a situation that would really have benefitted from being able to use it.

I have to insert lots of objects, but if there is already an object in a given “slot”, then I need to instead update that existing object.

Doing this using the Django ORM can be done one a “one by one” basis, by iterating through the objects, finding which one (if any) matches the criteria, updating that, or creating a new one if there wasn’t a match.

However, this is really slow, as it does two queries for each object.

Instead, it would be great to:

  • fetch all of the instances that could possibly overlap (keyed by the matching criteria)
  • iterate through the new data, looking for a match
    • modify the instance if an existing match is made, and stash into pile “update”
    • create a new instance if no match is found, and stash into the pile “create”
  • bulk_update all of the “update” objects
  • bulk_create all of the “create” objects

Those familiar with Django may recognise that there is only one step here that cannot be done as of “now”.

So, how can we do a bulk update?

There are two ways I can think of doing it (at least with Postgres):

  • create a temporary table (cloning the structure of the table)
  • insert all of the data into this table
  • update the rows in the original table from the temporary table, based on pk column

and:

  • come up with some mechanism of using the UPDATE the_table SET ... FROM () sq WHERE sq.pk = the_table.pk syntax

It’s possible to use some of the really nice features of Postgres to create a temporary table, that clones an existing table, and will automatically be dropped at the end of the transaction:

BEGIN;

CREATE TEMPORARY TABLE upsert_source (LIKE my_table INCLUDING ALL) ON COMMIT DROP;

-- Bulk insert into upsert_source

UPDATE my_table
   SET foo = upsert_source.foo,
       bar = upsert_source.bar
  FROM upsert_source
 WHERE my_table.id = upsert_source.id;

The drawbacks of this are that it does two extra queries, but it is possible to implement fairly simply:

from django.db import transaction, connection

@transaction.atomic
def bulk_update(model, instances, *fields):
    cursor = connection.cursor()
    db_table = model._meta.db_table

    try:
        cursor.execute(
            'CREATE TEMPORARY TABLE update_{0} (LIKE {0} INCLUDING ALL) ON COMMIT DROP'.format(db_table)
        )

        model._meta.db_table = 'update_{}'.format(db_table)
        model.objects.bulk_create(instances)

        query = ' '.join([
            'UPDATE {table} SET ',
            ', '.join(
                ('%(field)s=update_{table}.%(field)s' % {'field': field})
                for field in fields
            ),
            'FROM update_{table}',
            'WHERE {table}.{pk}=update_{table}.{pk}'
        ]).format(
            table=db_table,
            pk=model._meta.pk.get_attname_column()[1]
        )
        cursor.execute(query)
    finally:
        model._meta.db_table = db_table

The avantage of this is that it mostly just uses the ORM. There’s limited scope for SQL injection (although you’d probably want to validate the field names).

It’s also possible to do the update directly from a subquery, but without the nice column names:

UPDATE my_table
   SET foo = upsert_source.column2,
       column2 = upsert_source.column3
  FROM (
    VALUES (...), (...)
  ) AS upsert_source
 WHERE upsert_source.column1 = my_table.id;

Note that you must make sure your values are in the correct order (with the primary key first).

Attempting to prevent some likely SQL injection vectors, we want to build up the fixed parts of the query (and the parts that are controlled by the django model, like the table and field names), and then pass the values in as query parameters.

from django.db import connection

def bulk_update(model, instances, *fields):
    set_fields = ', '.join(
        ('%(field)s=update_{table}.column%(i)s' % {'field': field, 'i': i + 2})
        for i, field in enumerate(fields)
    )
    value_placeholder = '({})'.format(', '.join(['%s'] * (len(fields) + 1)))
    values = ','.join([value_placeholder] * len(instances))
    query = ' '.join([
        'UPDATE {table} SET ',
        set_fields,
        'FROM (VALUES ', values, ') update_{table}',
        'WHERE {table}.{pk} = update_{table}.column1'
    ]).format(table=model._meta.db_table, pk=model._meta.pk.get_attname_column()[1])
    params = []
    for instance in instances:
        data.append(instance.pk)
        for field in fields:
            params.append(getattr(instance, field))

    connection.cursor().execute(query, params)

This feels like a reasonable first draft, however I’d probably want to go look at how the query for bulk_create is created, and modify that. There’s a fair bit going on there that I haven’t followed as yet though. Note that this does not need the @transaction.atomic decorator, as it is only a single statement.

From here, we can build an upsert that assumes all objects with a PK need to be updated, and those without need to be inserted:

from django.utils.functional import partition
from django.db import transaction

@transaction.atomic
def bulk_upsert(model, instances, *fields):
    update, create = partition(lambda obj: obj.pk is None, instances)
    if update:
        bulk_update(model, update, *fields)
    if create:
        model.objects.bulk_create(create)

Versioning complex database migrations

Recently, I’ve been writing lots of raw SQL code that is either a complex VIEW, or a FUNCTION. Much of the time these will be used as the “source” for a Django model, but not always. Sometimes, there are complex functions that need to be run as a trigger in Postgres, or even a rule to execute when attempting a write operation on a view.

Anyway, these definitions are all code, and should be stored within the project they belong to. Using Django’s migrations you can apply them at the appropriate time, using a RunSQL statement.

Hovewer, you don’t really want to have the raw SQL in the migration file. Depending upon the text editor, it may not syntax highlight correctly, and finding the correct definition can be difficult.

Similarly, you don’t want to just have a single file, because to recreate the database migration sequence, it needs to apply the correct version at the correct time (otherwise, other migrations may fail to apply).

Some time ago, I adopted a policy of manually versioning these files. I have a pattern of naming, that seemed to be working well:

special_app/
  migrations/
    __init__.py
    0001_initial.py
    0002_update_functions.py
  sql/
    foo.function.0001.sql
    foo.function.0002.sql
    foo.trigger.0001.sql
    bar.view.0001.sql

The contents of the SQL files are irrelevant, and the migrations mostly so. There is a custom migration operation I wrote that loads the content from a file:

    LoadSQLScript('special_app', 'foo.function', 1)

The mechanics of how it works are not important.

So, this had been working well for several months, but I had a nagging feeling that the workflow was not ideal. This came to a head in my mind when I recognised that doing code review on database function/view changes was next to impossible.

See, the problem is that there is a completely new file each time you create a new version of a definition.

Instead, I’ve come up with a similar, but different solution. You still need to have versioned scripts for the purpose of historical migrations, but the key difference is that you don’t actually write these. Instead, you have something that notices that the “current” version of a definition is different to the latest version snapshot. You then also have a tool that copies the current version to a new snapshot, and creates a migration.

You can still modify a snapshot (for instance, if you’ve already created one, but it’s only in your current development tree), but mostly you don’t need to.

$ ./manage.py check
System check identified some issues:

WARNINGS:
?: (sql_helpers.W002) The versioned migration file for core: iso8601.function.sql is out of date,
and needs to be updated or a new version created.

Oh, thanks for that. Checking the file, I see that it does indeed need a new version:

$ ./manage.py make_sql_migrations core
...
Copied <project>/core/sql/iso8601.function.sql to version 0002

You still need to ensure that any dependencies between SQL files are dealt with appropriately (for instance, a function that relies on a newly added column to a view needs to have that view’s definition updated before the function can be updated). But this is a much smaller problem, and something that your database should complain about when you try to apply the migrations.

I haven’t packaged this up yet, it’s currently only an internal app, as I want to use it a bit for the next little while and see what else shakes out. I was pretty sure the way I was doing it before was “the best way” just after I thought that up, so we’ll see what happens.