Celery Once

Within our audit logs, which happen at the database level, we also want to be able to show the user a representation of what the object “looks like”: basically, what str(instance) is. This is shown as a supplementary thing, and often makes it easier to identify which object has been edited. Otherwise, the audit logs only show the things that have changed on the object in that event.

However, because this happens in python/django, and not in the database, we need some mechanism for fetching audit logs that don’t have this string representation, and then update those. This is the perfect use case for celery: we can have a periodic task that looks for audit logs missing this related object, and then creates them.

There are a few things that can cause problems:

  • If we run the task infrequently, then there can at times be a large number of new audit logs, which can cause this task to take a long time to run: blocking other tasks, or perhaps even timing out. We should limit the number of objects that may be handled in a given task.
  • If we run the task infrequently, there can be a big delay between an action being logged, and the representation of that action being created. This can make audit logs hard to read.
  • If we run with too few as our batch size, then we don’t deal with the backlog, or we need to run more frequently.
  • If we run too frequently, we end up spending all of our time checking for missing objects (we have tens of millions at last count), and the tasks stack up.
  • If we have multiple instances of the task running at the same time, then creation of the objects in the second finishing task can fail, because the first task has already created them. At this point, we have done a bunch of work for nothing.

The ideal behaviour is to queue a task to run somewhat frequently: perhaps every 2-5 minutes, and do a batch of a reasonable size. Then, if there are still any more objects to process, re-queue a task immediately.

But there is still the problem here of multiple tasks running at the same time. If a task has detected more missing objects, and requeued itself, and this keeps happening before the next time the scheduled task is started, we may well end up with the race condition described above. It’s possible at this time that maybe we are making more objects than we are able to process, but in our case this only happens rarely, not all of the time.

There is a project, celery-once, that will prevent multiple instances of the same task being queued at the same time. This feels like it will do exactly what we want!

@app.task(base=QueueOnce)
def fix_missing_instances(batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if missing.exists():
        fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

But, alas, this won’t quite work. The task is still “in the queue”, just marked as “reserved”, so celery_once will not add a new item to the queue.

As an aside, there’s actually a better pattern for knowing if there are more objects to process: we can compare the number of objects we just created to our batch size, and if they are the same, then we probably need to process more.

But, back onto the first issue. We tried to delay (using the countdown argument) our task, but this was not enough.

We can sidestep the problem using a second task, which just queues the first task:

@app.task(bind=True, base=QueueOnce)
def fix_missing_instances(self, batch_size=1000):
    missing = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
        InstanceRepr(
            audit_log=audit_log,
            instance_repr=audit_log.build_instance_repr(),
        ) for audit_log in missing[:batch_size]
    ])

    if len(created) == batch_size and not self.request.called_directly:
        requeue_fix_missing_instances.apply_async(kwargs={'batch_size': batch_size}, countdown=1)


@app.task
def requeue_fix_missing_instances(batch_size):
    fix_missing_instances.delay(batch_size=batch_size)

Note that I’ve also done the “optimisation” where it no longer does a seperate .exists() query after processing.

I also think there is a bug somewhere in celery-once related to handling the keys argument to the once dict, that can be used to limit the conditions that would indicate a task was already running (that is, with the same kwargs). But I haven’t been able to isolate this and write a test case/PR yet. In our case, we don’t really care about the task args anyway.

Logging Celery Queue Sizes in New Relic

Several times during the last week, I needed to know the size of one of our celery queues. In one case, it was related to my battles with celery-once, in that I needed to see if tasks were being added. Usually, however, we need to know if we have a backlog of tasks.

Whilst looking, I wound up using the curses-based celery monitor, but this shows the tasks as they are added/processed. In practice, that was actually more useful for my celery once investigations, however the other use case (how much of a backlog we currently have) is an ongoing concern.

We use NewRelic for our performance monitoring, and I’ve yet to find anything that, out of the box, will act as plugin of some sort that pushes the queue lengths to a place in NewRelic that you can then view. I had toyed with the idea of just building our own dashboard specifically for this, but that means doing things like looking at values over time would require me to (a) store the queue sizes, and (b) write visualisation tools.

NewRelic has some tools for arbitrary metrics gathering (and visualisation), under it’s Insights package. We can leverage these to get nice monitoring, without having to write any UI code.

So, it turns out we can send a JSON object (or more than one) to a specific endpoint. The data that is in here is largely arbitrary, as long as it has an eventType, and possibly an appId. The former is used to say what type of event this datum is, and the latter is useful if you have different NewRelic applications (we do). For more detail, see the documentation.

[
  {
    "eventType": "CeleryQueueSize",
    "queue": "celery",
    "length": 22,
    "appId": 12345678
  }
]

All we need now is some mechanism to (a) collect those metrics from our celery backend, and (b) send it through to NewRelic.

In our case, we are using redis, so we can client.llen(queue_name). Because I run this command on our container that runs celerybeat, and does not have very many resources, I was not able to load all of django, so made a simpler version that is just pure python:

#! /bin/env python

import os
import time

import redis
import requests

ACCOUNT_ID = 111111   # Get this from your URL in NewRelic
URL = 'https://insights-collector.newrelic.com/v1/accounts/{}/events'.format(ACCOUNT_ID)
APP_ID = 123456789    # Get this from your URL in NewRelic too.
QUEUES = ['celery', 'system']  # We have two celery queues.
API_KEY = '37a1eaba-2b8c-4f37-823d-ba4bf4391f9b'  # You will need to generate one of these.

client = redis.Redis.from_url(os.environ['CACHE_URL'])
headers = {
    'X-Insert-Key': API_KEY,
}

def send():
    data = [
        {
            'eventType': 'CeleryQueueSize',
            'queue': queue_name,
            'length': client.llen(queue_name),
            'appId': APP_ID
        } for queue_name in QUEUES
    ]
    requests.post(URL, json=data, headers=headers)


if __name__ == '__main__':
    while True:
        send()
        time.sleep(10)

Now we can have this command started automatically when our system boots (but only in one container, although you could probably have it run in multiple containers).

You’ll probably want to configure a Dashboard in Insights, but you should be able to use the Data Explorer to view the data in an ad-hoc manner.

celery queue length shown in new relic