Scheduling Celery Tasks in the (far) future
-
Comments:
- here.
We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400
.
However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.
Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.
We will start with a model to store our task:
class ScheduledTask(models.Model):
task_name = models.TextField()
task_args = models.JSONField(default=list)
task_kwargs = models.JSONField(default=dict)
due_date = models.DateTimeField()
objects = ScheduledTaskQuerySet.as_manager()
@property
def task(self):
module, task = task_name.rsplit('.', 1)
return getattr(importlib.import_module(module), task)
def execute(self):
self.task.apply_async(args=self.args, kwargs=self.kwargs)
We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:
class ScheduledTaskQuerySet(models.query.QuerySet):
def due(self):
return self.due_date < datetime.datetime.utcnow()
def schedule(self, task, when, *args, **kwargs):
self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)
Finally, we need a task that will enqueue the due tasks:
@app.task
def enqueue_due_tasks():
for task in ScheduledTask.objects.due():
task.execute()
task.delete()
As it stands, with this code, to schedule a task, you need to create a model instance:
ScheduledTask.objects.schedule(
'foo.tasks.bar',
datetime.datetime(2525, 11, 11, 9, 30),
'x',
y='z'
)
But, it would be nice if we could use the task to schedule itself:
foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)
Or, even better:
foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))
The first one we should be able to do by using custom tasks (and implementing a schedule
method):
class Scheduled(celery.Task):
def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
if not eta and not countdown:
raise ValueError('One of eta and countdown must be supplied')
if eta and countdown:
raise ValueError('Only one of eta and countdown must be supplied')
if eta:
ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
else:
ScheduledTask.objects.schedule(
self.name,
datetime.datetime.utcnow() + datetime.timedelta(countdown),
*args, **kwargs
)
Then, as long as a task is defined as using the base class, we can schedule it:
@app.task(base=Schedule)
def task_name(x, y=None):
pass
But what about mucking with the .s()
or .signature()
calls? Now we are getting a bit experimental, but it still might be fun:
from celery.canvas import Signature
def schedule(self, when=None):
if when:
if isinstance(when, datetime.timedelta):
when = datetime.datetime.utcnow() + when
else:
if self.options.countdown:
when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
elif self.options.eta:
when = self.otions.eta
ScheduledTask.objects.create(
task_name=self.task,
task_args=self.args,
task_kwargs=self.kwargs,
due_date=when,
)
Signature.schedule = schedule
This is totally mostly untested, and I’m not sure it’s a really good idea.
A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.