Cleaning up a required process when you quit

Some time ago, I moved a bunch of our devops tools into a Makefile. I’m not sure exactly what the benefit was then, but there have been some challenges.

One of these was related to having to run multiple processes when performing a task, but cleaning up the dependencies when we finish our task. Specifically, we run a websocket server in production that uses Redis for PubSub. We can safely assume the developer is running redis already, since the rest of the platform also depends upon it. However, to collect the changes from Postgres and publish them into Redis, we use a custom django management command. All this does is listen for postgres notifications and forward them to redis.

$ ./manage.py relay

Mostly we don’t keep this running in development, unless you need to be running the websocket code locally for testing. It is a hassle to remember to start a process and then kill it when you are done, so it would be convenient to start the relay command, then start our websocket worker, and clean up both processes when we quit the worker.

This can be done in a Makefile command:

websocket:
    $(shell trap 'kill 0' SIGINT ; \
        ./manage.py relay & \
        gunicon \
          -k geventwebsocket.gunicorn.workers.GeventWebsocketWorker \
          -workers=1 \
          -reload \
          webocket:application \
    )

The trap command performs the magic - it will kill the background command when the gunicorn worker is killed with Ctrl-C.

I have some other commands which start the background process, and then use kill -0 $(shell ps ax | grep ... | head -n 1 | cut -d ' ' -f 1) to ensure they kill the process on quit, but this sometimes does not work: I should get around to changing them over to this…

ESPHome variables from Home Assistant

One of my key tenets of Home Automation is that as much stuff should be done locally as possible. Whilst with HomeAssistant, that means “in the local network”, I feel that “in the device” is even better. However, sometimes values should still be configurable (from within Home Assistant), but the device should still work even if there is not (and has never been) a connection to Home Assistant since boot.

For instance, I have a sensor light. The device contains a PIR sensor, with configurable delay, sensitivity and lux settings. This in turn drives a relay connected to a pair of PAR bulbs, and additionally has a green LED. This device usually only has the ESP8266 controlling if the thing should ever turn on at all, but I hacked it so that it exposes the PIR sensor, and allows triggering the relay.

But the delay between the motion no longer being detected and the light turning off should be configurable. I don’t want to have to re-flash the device if I decide to change this value, so I wanted to make it that the value will be fetched from Home Assistant (if it is set).

This turned out to work really well. There are a few parts that need to be set up in the YAML (this is not the complete file):

esphome:
  name: ${device_name}
  platform: ESP8266
  board: esp01_1m
  on_boot:
    - logger.log:
        level: DEBUG
        format: 'Light ON time is set to %d'
        args: ['id(light_on_time)']

globals:
  - id: light_on_time
    type: int
    restore_value: true
    initial_value: '30'

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        - globals.set:
            id: light_on_time
            value: !lambda 'return int(x);'

It really is that simple. The first boot will set the light_on_time variable to 30. Then, when it connects to Home Assistant, it will look for an input_number.<device_name> (which matches the device name). If it finds one (or is ever told about this value changing), then it will commit that new value to the flash, and this will be be restored after a reboot.

There is one other thing we could do here, to make it so that we don’t write the value to the flash if it has not changed (and prevent wear to that, since it is limited to a number of writes):

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        if:
          condition:
            lambda: 'return id(light_on_time) != int(x);'
          then:
            - logger.log:
                level: DEBUG
                format: 'Light ON time changed from %d to %d seconds'
                args: ['id(light_on_time)', 'int(x)']
            - globals.set:
                id: light_on_time
                value: !lambda 'return int(x);'

With regards to a similar problem, detecting if it is dark enough to turn the light on should be something like “a bit before sunset to a bit after sunrise”. I could set the lux threshold on the device, but it would be nice to have motion detection work during the day too.

Here, we can have a global variable sun_is_down that defaults to being true, and is only set to false when the sun comes up. However, we would also want this to trigger when first connecting to Home Assistant and the sun is already up.

We can use a wait_until in our on_boot handler to trigger this:

globals:
  - id: sun_is_down
    type: bool
    restore_value: false
    initial_value: 'true'

esphome:
  ...
  on_boot:
    - wait_until:
        api.connected:
    - if:
        condition:
          sun.is_above_horizon:
        then:
          - logger.log:
              level: DEBUG
              format: 'Sun is up, light will not turn on.'
          - globals.set:
              id: sun_is_down
              value: 'false'
        else:
          - logger.log:
              level: DEBUG
              format: 'Sun is down, light will turn on.'

sun:
  on_sunrise:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now down, light will turn on.'
        - globals.set:
            id: sun_is_down
            value: 'false'
  on_sunset:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now up, light will not turn on.'
        - globals.set:
            id: sun_is_down
            value: 'true'

I’ve used a 10° elevation to trigger the light to turn on a bit before sunset and still turn on a bit after sunrise when motion is detected. I haven’t figured out a way to get the same check to apply in the sun.is_above_horizon: check.

Mute the rest of this track.

Working from home means that I don’t use my AirPods as much as I used to: I’m happy to mostly listen to the music my partner (who has worked from home for over a decade) wants to listen to. Lately, that has been NonStop80s.

We play this through the IKEA Symphonisk stereo pair in the dining room, several metres away from the office. The sound is good, and it’s nice not having it come out of the computer speaker.

Because I’m a nut, I have all of this hooked up to Home Assistant, and have one nice automation that will turn off the music that is playing on that speaker when the TV in that room is turned on.

Sometimes the tracks that are played on NonStop80s are really bad. For instance, Snooker Loopy, by Chas and Dave is an absolute piece of shit song. If it weren’t a streaming station, I’d just hit skip.

What would be nice is to be able to have the speakers stop playing, and start playing again when the next track comes on.

Enter “Mute the rest of this track”.

alias: Mute the rest of this track
sequence:
  - condition: state
    entity_id: media_player.dining_room_sonos
    state: playing
  - condition: template
    value_template: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: true
    entity_id: media_player.dining_room_sonos
  - delay: '1'
  - wait_for_trigger:
      - platform: state
        entity_id: media_player.dining_room_sonos
    timeout: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: false
    entity_id: media_player.dining_room_sonos
mode: single
icon: 'mdi:volume-mute'
description: Mute the media player until the media_title attribute changes.
fields:
  timeout:
    description: Amount of time to stay muted if the track does not change
    example: 300

This is a script that you can use in Home Assistant to do just that. It will (iff the dining_room_sonos is unmuted, and currently playing), mute that speaker until something else on the speaker changes.

This could be a track change, or a volume change, or anything else. It’s deliberately loose, so that if it was muted, and someone changes the volume, it unmutes immediately.

After a configurable (default 300 seconds) timeout, it will unmute.

I was trying to make it so that you could provide the named media_player entity, but there still seem to be places where you can’t use templates (but need to provide the string directly). I was able to get the conditions to use the variable, but was not able to figure out how to use them in the service calls…and to be honest, I don’t really need it to apply to other players.

Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an eta=86400.

However, as Adam points out here (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn’t mention is that if your queue is corrupted, then all of those future tasks will not be executed.

Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due.

We will start with a model to store our task:

class ScheduledTask(models.Model):
    task_name = models.TextField()
    task_args = models.JSONField(default=list)
    task_kwargs = models.JSONField(default=dict)
    due_date = models.DateTimeField()

    objects = ScheduledTaskQuerySet.as_manager()

    @property
    def task(self):
        module, task = task_name.rsplit('.', 1)
        return getattr(importlib.import_module(module), task)

    def execute(self):
        self.task.apply_async(args=self.args, kwargs=self.kwargs)

We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method:

class ScheduledTaskQuerySet(models.query.QuerySet):
    def due(self):
        return self.due_date < datetime.datetime.utcnow()

    def schedule(self, task, when, *args, **kwargs):
        self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when)

Finally, we need a task that will enqueue the due tasks:

@app.task
def enqueue_due_tasks():
    for task in ScheduledTask.objects.due():
        task.execute()
        task.delete()

As it stands, with this code, to schedule a task, you need to create a model instance:

ScheduledTask.objects.schedule(
    'foo.tasks.bar',
    datetime.datetime(2525, 11, 11, 9, 30),
    'x',
    y='z'
)

But, it would be nice if we could use the task to schedule itself:

foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y)

Or, even better:

foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365))

The first one we should be able to do by using custom tasks (and implementing a schedule method):

class Scheduled(celery.Task):
    def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None):
        if not eta and not countdown:
            raise ValueError('One of eta and countdown must be supplied')
        if eta and countdown:
            raise ValueError('Only one of eta and countdown must be supplied')
        if eta:
            ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs)
        else:
            ScheduledTask.objects.schedule(
                self.name,
                datetime.datetime.utcnow() + datetime.timedelta(countdown),
                *args, **kwargs
            )

Then, as long as a task is defined as using the base class, we can schedule it:

@app.task(base=Schedule)
def task_name(x, y=None):
    pass

But what about mucking with the .s() or .signature() calls? Now we are getting a bit experimental, but it still might be fun:

from celery.canvas import Signature

def schedule(self, when=None):
    if when:
        if isinstance(when, datetime.timedelta):
            when = datetime.datetime.utcnow() + when
    else:
        if self.options.countdown:
            when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown)
        elif self.options.eta:
            when = self.otions.eta

    ScheduledTask.objects.create(
        task_name=self.task,
        task_args=self.args,
        task_kwargs=self.kwargs,
        due_date=when,
    )

Signature.schedule = schedule

This is totally mostly untested, and I’m not sure it’s a really good idea.

A better idea might be to have a backend that doesn’t even tell the workers about the tasks until after they are due…that way you would not have to duck-punch celery at all.

Logging WiFi client Signal Strength

One of the cool things about the old macOS version of Airport Utility is that it would show you the WiFi signal of the various connected clients. At some point, they removed this feature, bringing it down to where the iOS app is in terms of functionality.

Whilst I still have a Mac that can (only) run that version next to me on the desk, mostly I don’t use that, and having to switch computers and open it up is a pain. It would be really nice if I could get that data in a more configurable format (possibly with the client data from multiple base stations).

It turns out that this data is available over SNMP, but you need the old version of Airport Utility to enable it. And, as we shall see shortly, there is another caveat too.

Before we get to that though, my main router is actually an OpenWRT device, so it might be nice to grab the handful of clients that connect to that (it being in the garage and all).

We can get a list of the WiFi interfaces using iwinfo | awk '/ESSID/ {print $1}'. In my case, that gives me wlan0 and wlan1, but the values are not relevant, other than the fact we will need them later.

Now, for each each WiFi interface, we want to get the MAC address, which we can get similarly: iwinfo $iw assoclist | awk '/Access Point:/ {print $3}.

It might be nice to also get the “connection speed”, although in some cases this is not known. In those cases, we want to use null, since we’ll have a numeric value otherwise.

  iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
  else
   print $3
  }'

Okay, now we are ready to get the signal/noise/MAC data:

  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash

Oh, there’s a bit in there, we might unpack that a bit:

We get the list of associated clients, and then pipe that through awk. To make it easier, we want to stick our previous environment variable captures of our rate and MAC into awk variables, and then we grab the relevant columns, and build it all up into a shell command that will look something like:

mosquitto_pub -t "wifi-signal/98:AF:65:E3:16:F1"  -m '{
        "signal":-70,
        "noise":-102,
        "rate":null,
        "station":"B0:95:75:E8:C8:32"
      }' -h mqtt.lan

Because awk has no way to escape a single quote, we need to use \x27 to get it to emit one of them.

Finally, this is passed to bash to execute the command(s).

And, putting this all together:

#! /bin/sh

for iw in $(iwinfo | awk '/ESSID/ {print $1}')
do
  MAC=$(iwinfo $iw info | awk '/Access Point:/ {print $3}')
  RATE=$(iwinfo $iw assoclist | awk '/expected/ {if ($3 == "unknown") print "null"
else
   print $3
 }')
  iwinfo $iw assoclist \
    | awk -v rate=$RATE -v mac=$MAC '/dBm/ { \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 1) "\"", \
      " -m \x27{ \
        \"signal\":" $2 ", \
        \"noise\":" $5 ", \
        \"rate\":" rate ", \
        \"station\":\"" mac "\" \
      }\x27 -h mqtt.lan"}' \
    | bash
done

Okay, so that’s the easy one. What about getting the data from SNMP?

Since this code is running not on a Mac, I can’t rely on the MIB data being available. Instead, we’ll use the raw numeric codes. It turns out that we can get all of the data we need (and then some!) using one command:

snmpwalk -v 2c -c public hostname.local iso.3.6.1.4.1.63.501.3.2.2.1

This outputs a whole bunch of data - the ones we care about have the values 1, 6, 7 and 8 in position 13, so we can extract those values. But, we want to retain a bit more context, so we can sort the data and ensure we have a consistent ordering to our data: MAC address, signal, noise and then rate for each client. To ensure we have a stable sorting, we need to get the last two parts of the MIB (which seemed to be enough in my case to generate a unique value), and then the 13th column. Then we want to get the “value” that was returned for this row. If it was a STRING value, then it will already have quotes around it.

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] '

This will build up the data in the correct format, from here we now need to put each client’s data on a single line:

  snmpwalk ...
    | <as-above>
    | awk '/^.. .. 1/{ if (NR!=1) print "";}{printf $4 " "}'

And then turn it into a string that we’ll be able to execute with bash (like before). Putting it all together:

  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpwalk -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.2.1 \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash

Note that I also needed to check that there was a signal value - one of the stations was reporting this as empty, so we skip over any that don’t have a signal value reported.

But… this doesn’t quite work. I mean, it works, but for some reason (after having left it running every minute overnight), it never updates the signal/noise/rate data. It seems that the Airport Express/Extreme SNMP data is not refreshed.

You can, however, force it to update by asking for the number of connected clients!

snmpget -v 2c -c public $station iso.3.6.1.4.1.63.501.3.2.1.0

Thus, our full script can look like:

#! /bin/bash

stations="study.lan
bedroom.lan
dining-room.lan"

CLIENT_COUNT=iso.1.3.6.1.4.1.63.501.3.2.1.0
WIFI_DATA=iso.3.6.1.4.1.63.501.3.2.2.1

for station in ${stations}
do
  MAC=$(/usr/sbin/arp -a $station | awk '{print $4}')

  snmpget -v 2c -c public $station $CLIENT_COUNT > /dev/null

  snmpwalk -v 2c -c public $station $WIFI_DATA \
    | awk -F. 'match($31, /: /) {print $30, substr($31, 0, 3), $13, substr($31, RSTART + 2)}' \
    | sort \
    | grep '^.. .. [1678] ' \
    | awk '/^.. .. 1/ {if (NR!=1) print "";}{printf $4 " "}' \
    | awk -v mac=$MAC '{if ($2 != "") \
      print "mosquitto_pub -t \"wifi-signal/" substr($1, 2), \
        "-m \x27{\
          \"signal\":" $2 ", \
          \"noise\":" $3 ", \
          \"rate\":" $4 ", \
          \"station\": \"" mac "\" \
    }\x27 -h mqtt.lan" \
    }' | bash
done

These scripts could be more configurable - I’ve hard-coded the MQTT broker name, for instance, but that will be fine for my purposes. I’m also not actually doing anything with this data yet - I will eventually look at pulling this into Home Assistant or somewhere else, and graphing the data.

Locking rows in PostgreSQL

Someone asked a question today in #postgresql about locking rows for a long running task, where multiple tasks could be updating rows. There is a flag in each row that indicates whether a row has been processed, but not one indicating if it is currently being processed.

One solution is to use advisory locks to mark that we are currently working on a given row. The advantage of this over having a column that indicates it is currently being processed (or mis-using the “processed” column by setting it before we’ve finished building the data) is that if our task that generates the data fails, the lock is released when the session is closed.

So, let’s look at the things we need to make this happen. We’ll start with something that represents our long-running process:

CREATE OR REPLACE FUNCTION expensive_operation(INTEGER)
RETURNS JSONB AS $$

  BEGIN
    PERFORM pg_sleep(10);
    RETURN JSONB_BUILD_OBJECT(
        'foo', 'bar',
        'baz', $1
    );
  END;

$$ LANGUAGE plpgsql;

And we need our table to put the tasks in (and some unprocessed rows):

CREATE TABLE tasks (
    id SERIAL PRIMARY KEY,
    processed BOOLEAN DEFAULT FALSE,
    data JSONB
);

INSERT INTO tasks (data) VALUES (NULL), (NULL), (NULL);

Finally, we are ready to process our tasks.

WITH next_task AS (
  SELECT t.id, pg_advisory_lock(t.id)
    FROM (
      SELECT id
        FROM tasks
       WHERE NOT processed
         AND id NOT IN (SELECT objid
                          FROM pg_locks
                         WHERE locktype = 'advisory')
       LIMIT 1
    ) t
)
UPDATE tasks
   SET data = expensive_operation(id),
       processed = true
 WHERE id IN (SELECT id FROM next_task)
   AND NOT processed;

The CTE will get a task.id that has not been processed (and is not currently being processed). It will take an advisory lock on that value, which marks the task as in-progress.

Then, when it has the lock, it performs the expensive operation and updates the data (and the processed status).

I’m not totally sure if the AND NOT processed at the bottom of the query is necessary - it seems to me that there could be a situation where two workers try to get the lock at the same time (and therefore are both asking for the same lock), and one of them gets it. The other one, when it finally gets the lock, would now see that row as processed, and so would not update anything.

I’m happy to see a better way to do this.