asyncpg and upserting bulk data

I’ve been doing some work on analysing the output of my PV system, and needed a mechanism for inserting large numbers of rows of data pulled from the Fronius API. Since I mostly work in python, I decided to use a python script to get the data from the inverter, but I needed a clean way to insert potentially lots of rows (every 5 minute period from every day: batched in 16 day blocks).

I’d normally just use psycopg2, but since I was using Python 3, I thought I’d try out asyncpg.

The API is quite nice:

import asyncio
import asyncpg

async def write():
    conn = await asyncpg.connect('postgres://:@:/database')
    await conn.execute('query', [params])
    await conn.close()

asyncio.get_event_loop().run_until_complete(write())

So, that part was fine. There are more hoops to jump through because async, but meh.

But I had an interesting case: I wanted to do a bulk insert, and I didn’t know how many records I was going to be inserting. It seemed like it was going to be a bunch of work to build up placeholder strings based on the number of rows, and the number of columns in each row.

But no, there is a function that works perfectly (and turned out to be even better, because it takes an argument as to the name of the table):

    conn = await asyncpg.connect(...)
    await conn.copy_records_to_table(table, records=values)
    await conn.close()

That was really easy. It’s a requirement that each row of values is in the same order as the table definition, but my tables in this case were simple.

But there’s one catch: this works great for inserting data, but what about when some of that data is already present. We want to insert some rows, but update others: upsert. Postgres supports this using the ON CONFLICT ... DO ... syntax. So, how can this work with the bulk insert (that uses the Postgres COPY command under the hood)?

    # Our table just contains two columns: timestamp and value.
    await conn.execute('''CREATE TEMPORARY TABLE _data(
        timestamp TIMESTAMP, value NUMERIC
    )''')
    await conn.copy_records_to_table('_data', records=values)
    await conn.execute('''
        INSERT INTO {table}(timestamp, value)
        SELECT * FROM _data
        ON CONFLICT (timestamp)
        DO UPDATE SET value=EXCLUDED.value
    '''.format(table=table))

Sweet.

We can use the string formatting tools here because the table name is controlled by us, so there is not an SQL injection vector to worry about. You should not do this if that value could be coming from a user though.

But there is one problem. We are updating rows, even if the value has not changed. If you don’t think too hard, that’s probably fine. But the way postgres works is that it rewrites every row that has “changed”, even if it hasn’t really changed. It marks the old row as superseded, and that space will not be reclaimed until the table is vacuumed.

Instead, we should be able to just ignore those rows that have the same value (which, in my case, should be all rows except the last one, as that is the only one that should have different data).

    await conn.execute('''
        INSERT INTO {table}(timestamp, value)
        SELECT * FROM _data
        ON CONFLICT (timestamp)
        DO UPDATE SET value=EXCLUDED.value
        WHERE {table}.value <> EXCLUDED.value
    '''.format(table=table))

Oh nice: Postgres also supports a WHERE clause on the DO UPDATE clause.

Django bulk_update without upsert

Postgres 9.5 brings a fantastic feature, that I’ve really been looking forward to. However, I’m not on 9.5 in production yet, and I had a situation that would really have benefitted from being able to use it.

I have to insert lots of objects, but if there is already an object in a given “slot”, then I need to instead update that existing object.

Doing this using the Django ORM can be done one a “one by one” basis, by iterating through the objects, finding which one (if any) matches the criteria, updating that, or creating a new one if there wasn’t a match.

However, this is really slow, as it does two queries for each object.

Instead, it would be great to:

  • fetch all of the instances that could possibly overlap (keyed by the matching criteria)
  • iterate through the new data, looking for a match
    • modify the instance if an existing match is made, and stash into pile “update”
    • create a new instance if no match is found, and stash into the pile “create”
  • bulk_update all of the “update” objects
  • bulk_create all of the “create” objects

Those familiar with Django may recognise that there is only one step here that cannot be done as of “now”.

So, how can we do a bulk update?

There are two ways I can think of doing it (at least with Postgres):

  • create a temporary table (cloning the structure of the table)
  • insert all of the data into this table
  • update the rows in the original table from the temporary table, based on pk column

and:

  • come up with some mechanism of using the UPDATE the_table SET ... FROM () sq WHERE sq.pk = the_table.pk syntax

It’s possible to use some of the really nice features of Postgres to create a temporary table, that clones an existing table, and will automatically be dropped at the end of the transaction:

BEGIN;

CREATE TEMPORARY TABLE upsert_source (LIKE my_table INCLUDING ALL) ON COMMIT DROP;

-- Bulk insert into upsert_source

UPDATE my_table
   SET foo = upsert_source.foo,
       bar = upsert_source.bar
  FROM upsert_source
 WHERE my_table.id = upsert_source.id;

The drawbacks of this are that it does two extra queries, but it is possible to implement fairly simply:

from django.db import transaction, connection

@transaction.atomic
def bulk_update(model, instances, *fields):
    cursor = connection.cursor()
    db_table = model._meta.db_table

    try:
        cursor.execute(
            'CREATE TEMPORARY TABLE update_{0} (LIKE {0} INCLUDING ALL) ON COMMIT DROP'.format(db_table)
        )

        model._meta.db_table = 'update_{}'.format(db_table)
        model.objects.bulk_create(instances)

        query = ' '.join([
            'UPDATE {table} SET ',
            ', '.join(
                ('%(field)s=update_{table}.%(field)s' % {'field': field})
                for field in fields
            ),
            'FROM update_{table}',
            'WHERE {table}.{pk}=update_{table}.{pk}'
        ]).format(
            table=db_table,
            pk=model._meta.pk.get_attname_column()[1]
        )
        cursor.execute(query)
    finally:
        model._meta.db_table = db_table

The avantage of this is that it mostly just uses the ORM. There’s limited scope for SQL injection (although you’d probably want to validate the field names).

It’s also possible to do the update directly from a subquery, but without the nice column names:

UPDATE my_table
   SET foo = upsert_source.column2,
       column2 = upsert_source.column3
  FROM (
    VALUES (...), (...)
  ) AS upsert_source
 WHERE upsert_source.column1 = my_table.id;

Note that you must make sure your values are in the correct order (with the primary key first).

Attempting to prevent some likely SQL injection vectors, we want to build up the fixed parts of the query (and the parts that are controlled by the django model, like the table and field names), and then pass the values in as query parameters.

from django.db import connection

def bulk_update(model, instances, *fields):
    set_fields = ', '.join(
        ('%(field)s=update_{table}.column%(i)s' % {'field': field, 'i': i + 2})
        for i, field in enumerate(fields)
    )
    value_placeholder = '({})'.format(', '.join(['%s'] * (len(fields) + 1)))
    values = ','.join([value_placeholder] * len(instances))
    query = ' '.join([
        'UPDATE {table} SET ',
        set_fields,
        'FROM (VALUES ', values, ') update_{table}',
        'WHERE {table}.{pk} = update_{table}.column1'
    ]).format(table=model._meta.db_table, pk=model._meta.pk.get_attname_column()[1])
    params = []
    for instance in instances:
        data.append(instance.pk)
        for field in fields:
            params.append(getattr(instance, field))

    connection.cursor().execute(query, params)

This feels like a reasonable first draft, however I’d probably want to go look at how the query for bulk_create is created, and modify that. There’s a fair bit going on there that I haven’t followed as yet though. Note that this does not need the @transaction.atomic decorator, as it is only a single statement.

From here, we can build an upsert that assumes all objects with a PK need to be updated, and those without need to be inserted:

from django.utils.functional import partition
from django.db import transaction

@transaction.atomic
def bulk_upsert(model, instances, *fields):
    update, create = partition(lambda obj: obj.pk is None, instances)
    if update:
        bulk_update(model, update, *fields)
    if create:
        model.objects.bulk_create(create)