Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400.

However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.

Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.

We will start with a model to store our task:

class ScheduledTask(models.Model):
    task_name = models.TextField()
    task_args = models.JSONField(default=list)
    task_kwargs = models.JSONField(default=dict)
    due_date = models.DateTimeField()

    objects = ScheduledTaskQuerySet.as_manager()

    @property
    def task(self):
        module, task = task_name.rsplit('.', 1)
        return getattr(importlib.import_module(module), task)

    def execute(self):
        self.task.apply_async(args=self.args, kwargs=self.kwargs)

We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:

class ScheduledTaskQuerySet(models.query.QuerySet):
    def due(self):
        return self.due_date < datetime.datetime.utcnow()

    def schedule(self, task, when, *args, **kwargs):
        self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)

Finally, we need a task that will enqueue the due tasks:

@app.task
def enqueue_due_tasks():
    for task in ScheduledTask.objects.due():
        task.execute()
        task.delete()

As it stands, with this code, to schedule a task, you need to create a model instance:

ScheduledTask.objects.schedule(
    'foo.tasks.bar',
    datetime.datetime(2525, 11, 11, 9, 30),
    'x',
    y='z'
)

But, it would be nice if we could use the task to schedule itself:

foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)

Or, even better:

foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))

The first one we should be able to do by using custom tasks (and implementing a schedule method):

class Scheduled(celery.Task):
    def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
        if not eta and not countdown:
            raise ValueError('One of eta and countdown must be supplied')
        if eta and countdown:
            raise ValueError('Only one of eta and countdown must be supplied')
        if eta:
            ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
        else:
            ScheduledTask.objects.schedule(
                self.name,
                datetime.datetime.utcnow() + datetime.timedelta(countdown),
                *args, **kwargs
            )

Then, as long as a task is defined as using the base class, we can schedule it:

@app.task(base=Schedule)
def task_name(x, y=None):
    pass

But what about mucking with the .s() or .signature() calls? Now we are getting a bit experimental, but it still might be fun:

from celery.canvas import Signature

def schedule(self, when=None):
    if when:
        if isinstance(when, datetime.timedelta):
            when = datetime.datetime.utcnow() + when
    else:
        if self.options.countdown:
            when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
        elif self.options.eta:
            when = self.otions.eta

    ScheduledTask.objects.create(
        task_name=self.task,
        task_args=self.args,
        task_kwargs=self.kwargs,
        due_date=when,
    )

Signature.schedule = schedule

This is totally mostly untested, and I’m not sure it’s a really good idea.

A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.

Logging WiFi client Signal Strength

One of the cool things about the old macOS version of Airport Utility is that it would show you the WiFi signal of the various connected clients. At some point, they removed this feature, bringing it down to where the iOS app is in terms of functionality.

Whilst I still have a Mac that can (only) run that version next to me on the desk, mostly I don’t use that, and having to switch computers and open it up is a pain. It would be really nice if I could get that data in a more configurable format (possibly with the client data from multiple base stations).

It turns out that this data is available over SNMP, but you need the old version of Airport Utility to enable it. And, as we shall see shortly, there is another caveat too.

Before we get to that though, my main router is actually an OpenWRT device, so it might be nice to grab the handful of clients that connect to that (it being in the garage and all).

We can get a list of the WiFi interfaces using iwinfo | awk '/ESSID/ {print $1}'. In my case, that gives me wlan0 and wlan1, but the values are not relevant, other than the fact we will need them later.

Now, for each each WiFi interface, we want to get the MAC address, which we can get similarly: iwinfo $iw assoclist | awk '/Access Point:/ {print $3}.

It might be nice to also get the “connection speed”, although in some cases this is not known. In those cases, we want to use null, since we’ll have a numeric value otherwise.

  iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
  else
   print $3
  }'

Okay, now we are ready to get the signal/noise/MAC data:

  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash

Oh, there’s a bit in there, we might unpack that a bit:

We get the list of associated clients, and then pipe that through awk. To make it easier, we want to stick our previous environment variable captures of our rate and MAC into awk variables, and then we grab the relevant columns, and build it all up into a shell command that will look something like:

mosquitto_pub -t "wifi-signal/98:AF:65:E3:16:F1"  -m '{
        "signal":-70,
        "noise":-102,
        "rate":null,
        "station":"B0:95:75:E8:C8:32"
      }' -h mqtt.lan

Because awk has no way to escape a single quote, we need to use \x27 to get it to emit one of them.

Finally, this is passed to bash to execute the command(s).

And, putting this all together:

#! /bin/sh

for iw in $(iwinfo | awk '/ESSID/ {print $1}')
do
  MAC=$(iwinfo $iw info | awk '/Access Point:/ {print $3}')
  RATE=$(iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
else
   print $3
 }')
  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash
done

Okay, so that’s the easy one. What about getting the data from SNMP?

Since this code is running not on a Mac, I can’t rely on the MIB data being available. Instead, we’ll use the raw numeric codes. It turns out that we can get all of the data we need (and then some!) using one command:

snmpwalk -v 2c -c public hostname.local iso.3.6.1.4.1.63.501.3.2.2.1

This outputs a whole bunch of data - the ones we care about have the values 1, 6, 7 and 8 in position 13, so we can extract those values. But, we want to retain a bit more context, so we can sort the data and ensure we have a consistent ordering to our data: MAC address, signal, noise and then rate for each client. To ensure we have a stable sorting, we need to get the last two parts of the MIB (which seemed to be enough in my case to generate a unique value), and then the 13th column. Then we want to get the “value” that was returned for this row. If it was a STRING value, then it will already have quotes around it.

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] '

This will build up the data in the correct format, from here we now need to put each client’s data on a single line:

  snmpwalk ...
    | <as-above>
    | awk '/^.. .. 1/{ if (NR!=1) print "";}{printf $4 " "}'

And then turn it into a string that we’ll be able to execute with bash (like before). Putting it all together:

  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpwalk -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.2.1 \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash

Note that I also needed to check that there was a signal value - one of the stations was reporting this as empty, so we skip over any that don’t have a signal value reported.

But… this doesn’t quite work. I mean, it works, but for some reason (after having left it running every minute overnight), it never updates the signal/noise/rate data. It seems that the Airport Express/Extreme SNMP data is not refreshed.

You can, however, force it to update by asking for the number of connected clients!

snmpget -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.1.0

Thus, our full script can look like:

#! /bin/bash

stations="study.lan
bedroom.lan
dining-room.lan"

CLIENT_COUNT=iso.1.3.6.1.4.1.63.501.3.2.1.0
WIFI_DATA=iso.3.6.1.4.1.63.501.3.2.2.1

for station in ${stations}
do
  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpget -v 2c -c public $station $CLIENT_COUNT > /dev/null

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash
done

These scripts could be more configurable - I’ve hard-coded the MQTT broker name, for instance, but that will be fine for my purposes. I’m also not actually doing anything with this data yet - I will eventually look at pulling this into Home Assistant or somewhere else, and graphing the data.