Celery Once

Within our audit logs, which happen at the database level, we also want to be able to show the user a representation of what the object “looks like”: basically, what str(instance) is. This is shown as a supplementary thing, and often makes it easier to identify which object has been edited. Otherwise, the audit logs only show the things that have changed on the object in that event.

However, because this happens in python/django, and not in the database, we need some mechanism for fetching audit logs that don’t have this string representation, and then update those. This is the perfect use case for celery: we can have a periodic task that looks for audit logs missing this related object, and then creates them.

There are a few things that can cause problems:

  • If we run the task infrequently, then there can at times be a large number of new audit logs, which can cause this task to take a long time to run: blocking other tasks, or perhaps even timing out. We should limit the number of objects that may be handled in a given task.
  • If we run the task infrequently, there can be a big delay between an action being logged, and the representation of that action being created. This can make audit logs hard to read.
  • If we run with too few as our batch size, then we don’t deal with the backlog, or we need to run more frequently.
  • If we run too frequently, we end up spending all of our time checking for missing objects (we have tens of millions at last count), and the tasks stack up.
  • If we have multiple instances of the task running at the same time, then creation of the objects in the second finishing task can fail, because the first task has already created them. At this point, we have done a bunch of work for nothing.

The ideal behaviour is to queue a task to run somewhat frequently: perhaps every 2-5 minutes, and do a batch of a reasonable size. Then, if there are still any more objects to process, re-queue a task immediately.

But there is still the problem here of multiple tasks running at the same time. If a task has detected more missing objects, and requeued itself, and this keeps happening before the next time the scheduled task is started, we may well end up with the race condition described above. It’s possible at this time that maybe we are making more objects than we are able to process, but in our case this only happens rarely, not all of the time.

There is a project, celery-once, that will prevent multiple instances of the same task being queued at the same time. This feels like it will do exactly what we want!

@app.task(base=QueueOnce)
def fix_missing_instances(batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if missing.exists():
        fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

But, alas, this won’t quite work. The task is still “in the queue”, just marked as “reserved”, so celery_once will not add a new item to the queue.

As an aside, there’s actually a better pattern for knowing if there are more objects to process: we can compare the number of objects we just created to our batch size, and if they are the same, then we probably need to process more.

But, back onto the first issue. We tried to delay (using the countdown argument) our task, but this was not enough.

We can sidestep the problem using a second task, which just queues the first task:

@app.task(bind=True, base=QueueOnce)
def fix_missing_instances(self, batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if len(created) == batch_size and not self.request.called_directly:
        requeue_fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)


@app.task
def requeue_fix_missing_instances(batch_size):
    fix_missing_instances.delay(batch_size=batch_size)

Note that I’ve also done the “optimisation” where it no longer does a seperate .exists() query after processing.

I also think there is a bug somewhere in celery-once related to handling the keys argument to the once dict, that can be used to limit the conditions that would indicate a task was already running (that is, with the same kwargs). But I haven’t been able to isolate this and write a test case/PR yet. In our case, we don’t really care about the task args anyway.