asyncpg and upserting bulk data

I’ve been doing some work on analysing the output of my PV system, and needed a mechanism for inserting large numbers of rows of data pulled from the Fronius API. Since I mostly work in python, I decided to use a python script to get the data from the inverter, but I needed a clean way to insert potentially lots of rows (every 5 minute period from every day: batched in 16 day blocks).

I’d normally just use psycopg2, but since I was using Python 3, I thought I’d try out asyncpg.

The API is quite nice:

import asyncio
import asyncpg

async def write():
    conn = await asyncpg.connect('postgres://:@:/database')
    await conn.execute('query', [params])
    await conn.close()

asyncio.get_event_loop().run_until_complete(write())

So, that part was fine. There are more hoops to jump through because async, but meh.

But I had an interesting case: I wanted to do a bulk insert, and I didn’t know how many records I was going to be inserting. It seemed like it was going to be a bunch of work to build up placeholder strings based on the number of rows, and the number of columns in each row.

But no, there is a function that works perfectly (and turned out to be even better, because it takes an argument as to the name of the table):

    conn = await asyncpg.connect(...)
    await conn.copy_records_to_table(table, records=values)
    await conn.close()

That was really easy. It’s a requirement that each row of values is in the same order as the table definition, but my tables in this case were simple.

But there’s one catch: this works great for inserting data, but what about when some of that data is already present. We want to insert some rows, but update others: upsert. Postgres supports this using the ON CONFLICT ... DO ... syntax. So, how can this work with the bulk insert (that uses the Postgres COPY command under the hood)?

    # Our table just contains two columns: timestamp and value.
    await conn.execute('''CREATE TEMPORARY TABLE _data(
        timestamp TIMESTAMP, value NUMERIC
    )''')
    await conn.copy_records_to_table('_data', records=values)
    await conn.execute('''
        INSERT INTO {table}(timestamp, value)
        SELECT * FROM _data
        ON CONFLICT (timestamp)
        DO UPDATE SET value=EXCLUDED.value
    '''.format(table=table))

Sweet.

We can use the string formatting tools here because the table name is controlled by us, so there is not an SQL injection vector to worry about. You should not do this if that value could be coming from a user though.

But there is one problem. We are updating rows, even if the value has not changed. If you don’t think too hard, that’s probably fine. But the way postgres works is that it rewrites every row that has “changed”, even if it hasn’t really changed. It marks the old row as superseded, and that space will not be reclaimed until the table is vacuumed.

Instead, we should be able to just ignore those rows that have the same value (which, in my case, should be all rows except the last one, as that is the only one that should have different data).

    await conn.execute('''
        INSERT INTO {table}(timestamp, value)
        SELECT * FROM _data
        ON CONFLICT (timestamp)
        DO UPDATE SET value=EXCLUDED.value
        WHERE {table}.value <> EXCLUDED.value
    '''.format(table=table))

Oh nice: Postgres also supports a WHERE clause on the DO UPDATE clause.