Query Zen is no queries at all
-
Comments:
- here.
Performing no queries is always going to be faster than performing a query.
Today I had two instances of the same problem: I have two tables, one of which essentially stores calculated data based on other data (and data in other tables, or involving a process that uses application code, and cannot be purely determined within the database).
In one case, we have an audit logging table (which is purely handled within postgres) and another related table that stores a string representation of what the audited object looked like according to the application at that point in time, which needs to be calculated after the fact in Django.
The other case stores some cached values that can be calculated in the database: basically some metadata about a shift according to the location that the shift is at. Changes to the shift table will cause this value to automatically be updated, however we have several million shifts that do not currently have this value, but we need to create items for all shifts that currently don’t have the annotation.
In both cases, we have a celery task that will create a (relatively small, to prevent locks and other performance issues) number of the related objects, but only for those that don’t already have one. The tricky bit is that we need to trigger another instance of the celery task if we still have remaining objects in the database that don’t yet have the related item.
@app.task
def update_missing_items(batch_size=100):
missing_items = AuditLog.objects.filter(instance_repr=None)
InstanceRepr.objects.bulk_create([
InstanceRepr(
audit_log=log,
# ...
) for log in missing_items[:batch_size]
])
if not missing.exists():
update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)
Since we have some 15 million audit logs (so far), it turns out that this missing.exists()
was taking several seconds to run. I tried to write an optimised version, but was not able to improve the performance.
Then, it occurred to me (thanks mlt-
on #postgres), that we can look at the number of items we created, and see if it was the same as the batch_size. If it was smaller than the batch size, then we know we are up to date, and don’t need to reschedule our task.
@app.task
def update_missing_items(batch_size=100):
missing_items = AuditLog.objects.filter(instance_repr=None)
created = InstanceRepr.objects.bulk_create([
InstanceRepr(
audit_log=log,
# ...
) for log in missing_items[:batch_size]
])
if len(created) == batch_size:
update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)
Bingo: since we needed to execute the query to fetch the objects to begin with, we are now doing no extra work to see if we need to run our task again.
The other situation can be done in the database, however a single query of several million rows will block other things from happening, so we want to run the update in batches. There is a trigger on the table so that new or updated rows will already have a value, which actually makes it the same problem, but in SQL:
WITH step_1 AS (
SELECT shift_id, ...
FROM shifts
JOIN ... ON (...)
LEFT OUTER JOIN annotations USING (shift_id)
WHERE annotations.shift_id IS NULL
LIMIT 1000
), step_2 AS (
...
),
..., step_N AS (
...
)
INSERT INTO annotations (shift_id, ...) SELECT * FROM step_N;
There’s actually a bunch more to this, but it’s not really important: building up the annotations hits a couple of other tables, and I’ve used a CTE because each value is based on a previous annotation.
We can put this query into a task too, but we need some way of determining how many inserts we did. Luckily, Postgres has the RETURNING
clause on an INSERT
. It would be really nice if we could do:
WITH step_1 AS (...)
INSERT INTO annotations (shift_id, ...)
SELECT * FROM step_N
RETURNING COUNT(*)
Alas, that’s not possible. However, we can just extend our CTE:
WITH step_1 AS (
SELECT shift_id, ...
FROM shifts
...
LEFT OUTER JOIN annotations USING (shift_id)
WHERE annotations.shift_id IS NULL
-- NOTE: the LIMIT value is a parameter!
LIMIT %s
),
...,
step_N AS (...),
inserts AS (
INSERT INTO annotations(shift_id, ...)
SELECT * FROM step_N
RETURNING shift_id
)
SELECT COUNT(*) FROM inserts
Then, we can write our celery task:
from django.db import connection
@app.task
def update_missing_annotations(batch_size):
with connection.cursor() as cursor:
cursor.execute(QUERY, [batch_size])
if cursor.fetchone()[0] == batch_size:
update_missing_annotations.apply_async(kwargs={'batch_size': batch_size}, countdown=1)