Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400.

However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.

Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.

We will start with a model to store our task:

class ScheduledTask(models.Model):
    task_name = models.TextField()
    task_args = models.JSONField(default=list)
    task_kwargs = models.JSONField(default=dict)
    due_date = models.DateTimeField()

    objects = ScheduledTaskQuerySet.as_manager()

    @property
    def task(self):
        module, task = task_name.rsplit('.', 1)
        return getattr(importlib.import_module(module), task)

    def execute(self):
        self.task.apply_async(args=self.args, kwargs=self.kwargs)

We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:

class ScheduledTaskQuerySet(models.query.QuerySet):
    def due(self):
        return self.due_date < datetime.datetime.utcnow()

    def schedule(self, task, when, *args, **kwargs):
        self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)

Finally, we need a task that will enqueue the due tasks:

@app.task
def enqueue_due_tasks():
    for task in ScheduledTask.objects.due():
        task.execute()
        task.delete()

As it stands, with this code, to schedule a task, you need to create a model instance:

ScheduledTask.objects.schedule(
    'foo.tasks.bar',
    datetime.datetime(2525, 11, 11, 9, 30),
    'x',
    y='z'
)

But, it would be nice if we could use the task to schedule itself:

foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)

Or, even better:

foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))

The first one we should be able to do by using custom tasks (and implementing a schedule method):

class Scheduled(celery.Task):
    def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
        if not eta and not countdown:
            raise ValueError('One of eta and countdown must be supplied')
        if eta and countdown:
            raise ValueError('Only one of eta and countdown must be supplied')
        if eta:
            ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
        else:
            ScheduledTask.objects.schedule(
                self.name,
                datetime.datetime.utcnow() + datetime.timedelta(countdown),
                *args, **kwargs
            )

Then, as long as a task is defined as using the base class, we can schedule it:

@app.task(base=Schedule)
def task_name(x, y=None):
    pass

But what about mucking with the .s() or .signature() calls? Now we are getting a bit experimental, but it still might be fun:

from celery.canvas import Signature

def schedule(self, when=None):
    if when:
        if isinstance(when, datetime.timedelta):
            when = datetime.datetime.utcnow() + when
    else:
        if self.options.countdown:
            when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
        elif self.options.eta:
            when = self.otions.eta

    ScheduledTask.objects.create(
        task_name=self.task,
        task_args=self.args,
        task_kwargs=self.kwargs,
        due_date=when,
    )

Signature.schedule = schedule

This is totally mostly untested, and I’m not sure it’s a really good idea.

A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.