Automatically expire rows in Postgres

Here’s a fun idea: how to make a database table where the data is kept only for a certain period of time?

One solution could be to have a column updated_at, which is set to the current timestamp each time a row is updated. Then you need a scheduled task that periodically clears out rows older than the threshold.

We can do this in a single trigger function:

CREATE OR REPLACE FUNCTION keep_for() RETURNS TRIGGER AS $$
  DECLARE
    primary_key_name TEXT = (
        SELECT attname
        FROM pg_index
        JOIN pg_attribute ON
            attrelid = indrelid
            AND attnum = ANY(indkey)
        WHERE indrelid = TG_RELID AND indisprimary
    );
    primary_key_value TEXT;
  BEGIN
    IF TG_WHEN <> 'BEFORE' THEN
      RAISE EXCEPTION 'keep_for() may only run as a BEFORE trigger';
    END IF ;

    IF TG_ARGV[0]::INTERVAL IS NULL THEN
      RAISE EXCEPTION 'keep_for() must be installed with an INTERVAL to keep data for';
    END IF;

    NEW.updated_at = now();

    IF TG_OP = 'INSERT' THEN
      EXECUTE 'DELETE FROM ' || quote_ident(TG_TABLE_NAME) || ' WHERE updated_at < now() - INTERVAL ' || quote_literal(TG_ARGV[0]::TEXT) || ';';
    ELSIF TG_OP = 'UPDATE' THEN

      EXECUTE 'DELETE FROM ' || quote_ident(TG_TABLE_NAME)
                             || ' WHERE updated_at < now() - INTERVAL ' || quote_literal(TG_ARGV[0]::TEXT)
                             || ' AND ' || quote_ident(primary_key_name) || ' <> ' || quote_literal(row_to_json(NEW) ->> primary_key_name) || ';';
    END IF;

    RETURN NEW;
  END;
$$ LANGUAGE plpgsql;

Now, we can install this on a table (that has a single primary key column):

CREATE TRIGGER keep_for_one_week
BEFORE UPDATE OR INSERT ON <target_table>
FOR EACH ROW
EXECUTE PROCEDURE keep_for('1 week');

This only works on tables that are updated somewhat regularly, and that are not too big. You’ll also want an index on that updated_by column.

This was a thought experiment (which I did implement), until my co-worker pointed out that there was no reason to actually store this data in the database to begin with, since we already use Redis we can just set the TTL on the key when we set it, and it will automatically expire without us having to do extra work.

Django Implied Relationship

[This post was updated on 2021-07-28].

A little while ago, Alec McGavin put up a post on the Kogan blog about Custom Relationships in Django. This is a really cool way to get a relationship in Django that exists in the database, but cannot be modelled correctly in Django. For instance, this could be data in the database that does not have a Foreign Key, either because it’s legacy data, or because it’s possible either side of the relationship might be added to the database before the other, rather than the normal order in a database where the target table of the FK is always added to first.

However, I have another slighly different situation where an implied relationship exists, but should not be stored directly.

Consider the following data structures:

class Employee(models.Model):
    name = models.TextField()


class EmploymentPeriod(models.Model):
    employee = models.ForeignKey(
        Employee,
        related_name='employment_periods',
        on_delete=models.CASCADE,
    )
    valid_period = DateRangeField()

    class Meta:
        constraints = [
            ExclusionConstraint(
                name='employment_overlap',
                expressions=[
                    ('employee', RangeOperators.EQUAL),
                    ('valid_period', RangeOperators.OVERLAPS),
                ]
            )
        ]


class Shift(models.Model):
    employee = models.ForeignKey(
        Employee,
        related_name='shifts',
        on_delete=models.CASCADE,
    )
    date = models.DateField()
    start_time = models.TimeField()
    duration = models.DurationField()
    employment_period = Relationship(
        EmploymentPeriod,
        from_fields=['employee', 'date'],
        to_fields=['employee', 'valid_period'],
    )

    @property
    def start(self):
        return datetime.datetime.combine(self.date, self.start_time)

    @property
    def finish(self):
        return self.start + self.duration

Now, there is a direct relationship between Shift and Employee, and also between EmploymentPeriod and Employee, but there could be an inferred relatonship between Shift and EmploymentPeriod. Because of the exclusion constraint, we know there will only be one EmploymentPeriod for a given Shift.

It would be really nice to be able to create this relationship, so we can reference the employment period (or lack thereof) directly. The Relationship class above goes close, but tries to use an equality check between the date and date_range fields.

It turns out, we can add a bit to that class, and teach it how to handle the various ways this relationship can be accessed:

  • Shift().employment_period
  • EmploymentPeriod().shifts
  • Shift.objects.select_related(‘employment_period’)
  • EmploymentPeriod.objects.prefetch_related(‘shifts’)
  • Shift.objects.filter(employment_period=emp)
  • Shift.objects.filter(employment_period__in_=[emp1, emp2])
  • Shift.objects.filter(employment_period__isnull=True)
  • Shift.objects.filter(employment_period=None)
  • Shift.objects.filter(employment_period__isnull=False)
  • EmploymentPeriod.objects.filter(shifts__contains=shift)
  • EmploymentPeriod.objects.filter(shifts__contains=[shift1, shift2])
  • EmploymentPeriod.objects.filter(shifts__isnull=True)
  • EmploymentPeriod.objects.filter(shifts__isnull=False)

…plus there is also the inverse of a bunch of these - ie Shift.objects.exclude(employment_period=emp). In some cases these are equivalent, but that’s not always possible to determine.

Let’s have a look at the original class, and a new subclass for these non-direct relationships:

from django.db import models
from django.db.models.lookups import Lookup


class Relationship(models.ForeignObject):
    """
    Create a django link between models on a field where a foreign key isn't used.
    This class allows that link to be realised through a proper relationship,
    allowing prefetches and select_related.

    https://devblog.kogan.com/blog/custom-relationships-in-django
    """

    def __init__(self, model, from_fields, to_fields, **kwargs):
        super().__init__(
            model,
            on_delete=models.DO_NOTHING,
            from_fields=from_fields,
            to_fields=to_fields,
            null=True,
            blank=True,
            **kwargs,
        )

    def contribute_to_class(self, cls, name, private_only=False, **kwargs):
        # override the default to always make it private
        # this ensures that no additional columns are created
        super().contribute_to_class(cls, name, private_only=True, **kwargs)

There’s not much to this, which is part of the beauty of it. Django pretty-much handles composite-key relationships, it just won’t create actual ForeignKeys based on them. There have been noises about implementing this for years, and maybe eventually it will happen.

But what about a subclass that allows the implicit relationship described above?

class ImplicitRelationship(Relationship):
    """
    Create a django link between two models where at least one of the fields
    uses a containment (or other type of non-direct) relationship.

    Relationship should be used if this is just a composite key (or a single
    key that is not a real ForeignKey in the database).

    """

    def get_path_info(self, filtered_relation=None):
        """Get path from this field to the related model."""
        opts = self.remote_field.model._meta
        from_opts = self.model._meta
        self.related_fields
        return [
            PathInfo(
                from_opts=from_opts,
                to_opts=opts,
                target_fields=[rhs for lhs, rhs in self.other_related_fields],
                join_field=self,
                m2m=False,
                direct=False,
                filtered_relation=filtered_relation,
            )
        ]

    def get_reverse_path_info(self, filtered_relation=None):
        """Get path from the related model to this field's model."""
        opts = self.model._meta
        from_opts = self.remote_field.model._meta
        self.related_fields
        return [
            PathInfo(
                from_opts=from_opts,
                to_opts=opts,
                target_fields=[lhs for lhs, rhs in self.other_related_fields],
                join_field=self.remote_field,
                m2m=False,
                direct=False,
                filtered_relation=filtered_relation,
            )
        ]

    @cached_property
    def other_related_fields(self):
        return self.resolve_related_fields()

    @cached_property
    def related_fields(self):
        return []

    def get_local_related_value(self, instance):
        """
        Given an instance, determine the value that will be used as
        the key for this value in a dict of related items.

        This is where it starts to get tricky. Django only really expects
        keys to match exactly, but we may have a value that contains a
        date, that needs to be checked for inclusion in a DateRange.

        Whilst psycopg2 does not normalise Range values, it will handle
        <date> in <DateRange> correctly, so we can use that as the
        comparison.
        """
        parts = self.get_instance_value_for_fields(
            instance,
            [lhs for lhs, rhs in self.other_related_fields],
        )

        if not hasattr(self, '_known_instance_keys'):
            return parts

        if parts in self._known_instance_keys:
            return parts

        for keys in self._known_instance_keys:
            for part, key in zip(parts, keys):
                if part == key or getattr(key, '__contains__', None) and part in key:
                    return keys

    @property
    def get_foreign_related_value(self):
        """
        Because we need to use non-exact matching, we need to set up to store
        instances based on known keys. The Django code that uses this builds up
        a dict of keys/values, but since we need to do containment testing in
        get_local_related_value(instance), we have to store a local set of
        key values, which will be used for that containment checking.

        This is implemented as a property that returns a function, after clearing
        out the cache of known instances so that each queryset will have it's own
        cache. Otherwise, instances from the last run through would be matched
        in the next run.

        This uses knowledge of the Django internals, where this method is called
        before get_local_related_value, which really is not ideal, but there does
        not seem to be a better way to handle this.
        """
        self._known_instance_keys = set()

        def get_foreign_related_value(instance):
            values = self.get_instance_value_for_fields(
                instance, 
                [rhs for lhs, rhs in self.other_related_fields]
            )

            self._known_instance_keys.add(values)
            return values

        return get_foreign_related_value

    def get_extra_restriction(self, where_class, alias, remote_alias):
        """
        This method is used to get extra JOIN conditions.

        We don't need to include the exact conditions, only those
        that we filtered out from the regular related_fields.
        The exact conditinos will be already applied to the JOIN
        by the regular handling.
        """
        if not alias or not remote_alias:
            return

        if self.other_related_fields:
            cond = where_class()

            for local, remote in self.other_related_fields:
                local, remote = local.get_col(remote_alias), remote.get_col(alias)
                lookup_name = JOIN_LOOKUPS.get(get_key(local, remote), 'exact')
                lookup = local.get_lookup(lookup_name)(local, remote)
                cond.add(lookup, 'AND')

            return cond

    def get_extra_descriptor_filter(self, instance):
        """
        The sibling to get_extra_restriction, this one is used to get the extra
        filters that are required to limit to the correct objects.
        """
        extra_filter = {}
        for lhs, rhs in self.other_related_fields:
            lookup = JOIN_LOOKUPS.get(get_key(rhs.cached_col, lhs.cached_col), 'exact')
            extra_filter[f'{rhs.name}__{lookup}'] = getattr(instance, lhs.attname)
        return extra_filter

    def get_where(self, value, alias=None):
        constraint = WhereNode(connector=AND)

        values = self.get_instance_value_for_fields(
            value, 
            [remote for local, remote in self.other_related_fields]
        )

        for (source, target), value in zip(self.other_related_fields, values):
            key = (source.get_internal_type(), target.get_internal_type())
            lookup_type = JOIN_LOOKUPS.get(key, 'exact')
            lookup_class = source.get_lookup(lookup_type)
            lookup = lookup_class(target.get_col(alias or self.model._meta.db_table, source), value)
            constraint.add(lookup, AND)

        return constraint

    def get_exists_subquery_filters(self, inverted=False):
        filters = {}
        for source, target in self.other_related_fields:
            if inverted:
                source, target = target, source
            key = (target.get_internal_type(), source.get_internal_type())
            lookup = JOIN_LOOKUPS.get(key, 'exact')
            filters[f'{target.attname}__{lookup}'] = OuterRef(source.attname)
        return filters

    def exists_subquery(self, negated=False, inverted=False, **filters):
        if inverted:
            exists = Exists(
                self.model.objects.filter(
                    **self.get_exists_subquery_filters(inverted=True), 
                    **filters
                ).values('pk')
            )
        else:
            exists = Exists(
                self.related_model.objects.filter(
                    **self.get_exists_subquery_filters(), 
                    **filters
                ).values('pk')
            )
        return ~exists if negated else exists

    @property
    def target_field(self):
        raise FieldError()


# We should be able to add more pairs here as we need to handle them.
JOIN_LOOKUPS = {
    ('DateField', 'DateRangeField'): 'contained_by',
    ('DateRangeField', 'DateField'): 'contains',
}

def get_key(lhs, rhs):
    return (lhs.output_field.get_internal_type(), rhs.output_field.get_internal_type())

There’s actually a lot more code there than I really wanted, however it seems mostly to be necessary.

But wait, there’s more. We also need to teach Django how to handle the various lookups that can be performed on these relationships:

@ImplicitRelationship.register_lookup
class RelatedMultipleExact(Lookup):
    lookup_name = 'exact'
    """
    Apply each lookup type from each of the fields in an ImplicitRelationship.

    This is for querysets of the form:

    >>> Foo.objects.filter(relationship=instance)

    This gets the relevant operator for each of the lookups, based on
    the field type of the pair of (to/from) fields.
    """

    def as_sql(self, compiler, connection):
        field = self.lhs.field

        # If we only have a primary key here, and not an instance, then we
        # will need to push the querying back into the database - normally
        # a diroct lookup just uses the value as a PK, but here we need to
        # get the database to do a subquery to get the other values.
        if self.rhs_is_direct_value() and not isinstance(self.rhs, models.Model):
            # We can't unref the alias here, because Django will have also put in a IS NOT NULL
            # on the thing, which is referencing the wrong table.
            compiler.query.alias_map[self.lhs.alias] = compiler.query.alias_map[self.lhs.alias].promote()
            return field.exists_subquery(pk=self.rhs).resolve_expression(compiler.query).as_sql(compiler, connection)

        return field.get_where(self.rhs, alias=self.lhs.alias).as_sql(compiler, connection)


@ImplicitRelationship.register_lookup
class RelatedMultipleIn(Lookup):
    """
    Apply each lookup from each of the fields in an ImplicitRelationship.

    This is for querysets of the form:

    >>> Foo.objects.filter(relationship__in=[instance1, instance2])
    >>> Foo.objects.filter(relationship__not_in=[instance1, instance2])

    This builds an EXISTS() clause that uses a subquery to find
    if each instance matches - this is usually better than a bunch
    of clauses that would use OR, because that would preclude the
    use of indexes.
    """

    lookup_name = 'in'
    negated = False

    def as_sql(self, compiler, connection):
        field = self.lhs.field
        if self.negated:
            # We remove one reference to the joined table, so that if we only
            # have this reference, ie no columns, then we don't even join the
            # table in (as we'll be using an EXISTS in WHERE)
            compiler.query.unref_alias(self.lhs.alias)
        return (
            field.exists_subquery(
                negated=self.negated,
                pk__in=[getattr(x, 'pk', x) for x in self.rhs],
            )
            .resolve_expression(compiler.query)
            .as_sql(compiler, connection)
        )


@ImplicitRelationship.register_lookup
class RelatedMultipleNotIn(RelatedMultipleIn):
    negated = True
    lookup_name = 'not_in'


@ImplicitRelationship.register_lookup
class RelatedMultipleNull(Lookup):
    """
    Apply each lookup from each of the fields in an ImplicitRelationship.

    This is for querysets of the form:

    >>> Foo.objects.filter(relationship=None)
    >>> Foo.objects.exclude(relationship=None)
    >>> Foo.objects.filter(relationship__isnull=True)
    >>> Foo.objects.filter(relationship__isnull=False)

    """

    lookup_name = 'isnull'

    def as_sql(self, compiler, connection):
        field = self.lhs.field

        if not isinstance(field, Relationship):
            pk = field.related_model._meta.pk
            lookup = pk.get_lookup('isnull')(pk.get_col(self.lhs.alias), self.rhs)
            return lookup.as_sql(compiler, connection)

        # We remove one reference to the joined table, so that if we only
        # have this reference, ie no columns, then we don't even join the
        # table in (as we'll be using an EXISTS in WHERE)
        compiler.query.unref_alias(self.lhs.alias)
        return field.exists_subquery(negated=self.rhs).resolve_expression(compiler.query).as_sql(compiler, connection)


@ImplicitRelationship.register_lookup
class RelatedMultipleContains(Lookup):
    """
    Apply each lookup from each of the fields in an ImplicitRelationship.

    This is for querysets of the form:

    >>> Foo.objects.filter(reverse_relationship__contains=[x, y])
    >>> Foo.objects.filter(reverse_relationship__contains=x)

    """

    lookup_name = 'contains'
    negated = False

    def as_sql(self, compiler, connection):
        if isinstance(self.lhs.field, Relationship):
            raise TypeError(f'Unable to perform __{self.lookup_name} queries on Relationship, only on reversed')

        field = self.lhs.field.remote_field

        try:
            iter(self.rhs)
        except TypeError:
            value = getattr(self.rhs, 'pk', self.rhs)
            lookup = 'pk'
        else:
            if self.negated:
                raise ValueError('Unable to perform not_contains=list')
            value = [getattr(x, 'pk', x) for x in self.rhs]
            lookup = 'pk__in'

        # We remove one reference to the joined table, so that if we only
        # have this reference, ie no columns, then we don't even join the
        # table in (as we'll be using an EXISTS in WHERE)
        compiler.query.unref_alias(self.lhs.alias)

        return (
            field.exists_subquery(negated=self.negated, inverted=True, **{lookup: value})
            .resolve_expression(compiler.query)
            .as_sql(compiler, connection)
        )


@ImplicitRelationship.register_lookup
class RelatedMultipleNotContains(RelatedMultipleContains):
    lookup_name = 'not_contains'
    negated = True
    

There is one caveat to this - these lookups (under certain conditions) will change the list of FROM tables that are required to be joined into the query. In some cases this adds a required JOIN, in others it actually removes the need for a JOIN because the WHERE clause is all within a subquery, and should not contain a join to the original table.

However, until https://github.com/django/django/pull/14683 is merged, these lookups will not always work - if the list of tables is mutated by the lookup, the SQL query that is generated will not contain this required mutation. Hopefully I can get that PR merged, but it is possible to use patchy to patch your local installation until that is done.

Optus Billing Misleading Chart

I really hate when people use charts or graphs in a misleading manner. It’s even worse when it’s big companies, and in my latest bill from Optus, I noticed a particularly egregious chart.

Chart with really misleading scaling on the axis

For those that don’t notice at first (it’s taken me a bunch of billing months to notice, actually), the middle line is slightly longer than the other two. In that week, I went over my data plan, and had to pay an extra $10 to get more data.

I don’t have a problem with this, but I do have a problem with the scaling on the axis.

The shorter ones are months where I was billed $35. The longer one is where I was billed $45.

That’s a difference of $10, which is ~30%. However, the middle bar in the chart is not even close to 20% longer. It looks to me to be around 20 pixels longer, or around 3.5%

So, when you overspend on your bill, it looks like you are only paying a small amound extra, when in fact I am paying a significant amount more on that given bill.

The only way you could have scaling like this would be if you had a logarithmic scale. Which, granted, many more people are familiar with now than before the pandemic, but you don’t use a logarithmic scale when your values are all of the same order of magnitude, unless you want to mislead people.

I find it pretty unlikely that a high proportion of Optus’s customers have a monthly billing variance that differs by orders of magnitude. I also find it pretty unlikely that this is a mistake - someone at Optus (and I suspect not someone in their tech department, great work dumping on them chat support person) has decided that their charts should be tweaked so that overspends look like they are “not that big”.

Who wants to bet Optus never get back to me about my complaint?

KnockoutJS Watchdog

I first discovered KnockoutJS many years ago. It’s a really neat way of managing dependencies between values, and also updating HTML accordingly. Recently, I rewrote my fronius-dashboard to use a Quart app and KnockoutJS instead of a Phoenix LiveView, as the latter was much harder to build (and dependency changes seem to break that fairly frequently).

One thing I did notice is that from time to time I’ll come back to a pinned tab version of the app and it will no longer be updating. I’m not sure exactly why that is, but it seems to be connected still (the EventSource object says it is still connected, anyway).

So, I thought maybe a watchdog process would be in order.

Basically, if there are no updates for a certain period of time, then it should just reload the page.

KnockoutJS doesn’t have anything like this built in, but it does have the facility to throttle updates, and and specifically, to use the “only update when changes stop” facility. As such, we can build a watchdog fairly easily using just that:

let watchdog = ko.computed(() => viewModel.grid()).extend({
  rateLimit: {
    timeout: 10000,
    method: "notifyWhenChangesStop"
  }
});
watchdog.subscribe(() => {
  document.location.reload();
});

In this case, I have one value that will almost certainly update every second (or more frequently than this), called grid.

If there are no updates for 10 seconds, then it will reload the page.


I’m not sure that this will actually solve my problem though - I suspect that the stoppage happens because the browser tab is not frontmost, and the JS is suspended, and doesn’t un-suspend properly. Time will tell, I guess.

Extending Django Wizards

I quite like the wizard pattern. Most people are somewhat familiar with the concept of having to complete a logically grouped set of form elements before moving onto the next one, with a final confirmation step, after which the operation is committed.

Howver, there are some perceieved problems within the Django formtools implementation, at least from the perspective of our use case.

The big one my team have identified: when you reload the page with a GET request, it clears out the data from the current run of the wizard. This is problematic when something happens to your internet, and you have to reload a page manually, for instance. Or if you just happen to reload it through another means.

Related to this, if you don’t have all of the information you require, there’s no way to “stash” your current iteration, and return to it later. This later could be after a logout and login (or even on a different computer).

Additionally, there’s no way to have “extra” steps inserted based on the selections made in a previous step. You could take the other approach, and define all of the steps initially (and then skip those that aren’t required), but in our case, we need a repeatable step (I call them a sub-step), where we perform an operation on a bunch of items that were selected in a previous step.

Let’s look at each of these in turn.


The default implementation automatically clears the storage whenever a GET request is handled:

class WizardView(TemplateView):

    def get(self, request, *args, **kwargs):
        self.storage.reset()
        self.storage.current_step = self.steps.first
        return self.render(self.get_form())

We can replace this with an implementation that will only reset when it detects a special GET parameter:

    def get(self, request, *args, **kwargs):
        if not request.GET.get('reset'):
            self.storage.reset()
            self.storage.current_step = self.steps.first
        
        step = self.storage.current_step or self.steps.first
        return self.render(self.get_form(
            step,
            data=self.storage.get_step_data(step),
            files=self.storage.get_step_files(step),
        ))

Not that we need to build up the form data to provide to the view - otherwise when you load up the wizard, it will render the first page as empty, but if you then use the navigation to select the first page it will correctly render it with data.

You’ll need to have an explicit link or button in your wizard template(s) to enable the user to restart the wizard if they need to.


Next up is persistent storage. For this we will need somewhere to store the data. The formtools implementation makes this easy to swap out - you can define the storage backend you want to use.

class MyWizard(WizardView):
    form_list = [...]
    storage_name = 'storage.wizard.DatabaseStorage'

That’s all you need to do to make your wizard use it - but now we need to build the storage class.

Let’s begin with a model to store the data in.

class WizardStorage(models.Model):
    user = models.ForeignKey('auth.User', related_name='wizard_storage', on_delete=models.CASCADE)
    prefix = models.TextField()
    data = models.JSONField()

We could use extra fields for the various parts, but that just complicates things.

Now let’s see a storage implementation:

class DatabaseStorage(BaseStorage):
    def __init__(self, prefix, request=None, file_storage=None):
        super().__init__(prefix, request, file_storage)
        self.init_data()
    
    def init_data(self):
        self.instance, _create = WizardStorage.objects.get_or_create(
            user=self.request.user,
            prefix=self.prefix,
            defaults={
                'data': {
                    self.step_key: None,
                    self.step_data_key: {},
                    self.step_files_key: {},
                    self.extra_data_key: {},
                }
            }
        )
    
    @property
    def data(self):
        return self.instance.data
    
    def update_response(self, response):
        if hasattr(self, 'instance'):
            self.instance.save()
      
    def reset(self):
        if hasattr(self, 'instance'):
            self.instance.delete()
            del self.instance

You can make it a bit more configurable so that the model could be swapped quite easily.

From here, we can use this in a wizard, and it will persist the step data to the database. I’ve still got a bit to do to ensure it can handle files, but this has not been a requirement of mine as yet.

There could be a bit of fun around having multiple stashes for a given wizard, and allowing the user to select which one they want to work on. As it stands, it just uses the user id and wizard prefix to determine where the data is stored.


The third improvement, allowing sub-steps, is a bit more complicated. To do that, you need to replace a bit more of the internal formtools code, rather than just subclassing/extending it. This involves a bunch of patching of the StepsHelper class from formtools - although you could replace this class by overriding WizardView.dispatch.

That’s beyond the scope of this post.

Django modify_settings and receivers

Sometimes, tests expose weird behaviour.

In this instance, I have a Makefile command that calls a cookiecutter command to create a new integration with a payroll system. We’d noticed that over time, this infrequntly used code had become slightly less than perfect.

So, I wrote a test that runs this code. Not directly using the make new-payroll-system command, but rather using a test case from within Django.

This worked great in development. I was able to have code that was generated, some tests on that code were run (including generating migrations for the new app), and then the code was removed.

def test_cookie_cutter(self):
    from cookiecutter.main import cookiecutter
    
    try:
        cookiecutter(
            'integrations/__template__', 
            extra_context={'system_name': 'new_thing'}, 
            output_dir='integrations/systems',
            no_input=True,
        )
        with modify_settings(INSTALLED_APPS={'append': 'integrations.systems.new_thing'}):
            call_command('makemigrations', 'new_thing', no_input=True, verbosity=0)
    finally:
        shutil.rmtree('integrations/systems/new_thing')

But this failed in CI.

Turns out that our Codeship-based testing infrastructure doesn’t allow for writing the files in the expected location.

Never mind, we can use tempfile.TemporaryDirectory() instead. That will handle the cleanup for us, which is better than removing files ourselves:

def test_cookie_cutter(self):
    from cookiecutter.main import cookiecutter
    
    with tempfile.TemporaryDirectory() as dirname:
        # Put our new temporary directory on the PYTHONPATH.
        sys.path.insert(0, dirname)
        
        cookiecutter(
            'integrations/__template__', 
            extra_context={'system_name': 'new_thing'}, 
            output_dir=dirname,
            no_input=True,
        )
        with modify_settings(INSTALLED_APPS={'append': 'integrations.systems.new_thing'}):
            call_command('makemigrations', 'new_thing', no_input=True, verbosity=0)

There’s a little more to my code, but it’s not really relevant.

What is revelant is that, whilst this test was working fine, there is a subsequent test that was failing. Because, as part of the cookie-cutter template, we install a signal handler by default, and this signal handler was still connected even after the app was removed by the end of the modify_settings context manager.

So, how can we remove the signal handler when we are done? It should be possible to, in the context manager, look at which signals exist before we run, and then compare that to the ones that are connected when we are exiting…

Turns out, we don’t need to. In this context, because we aren’t really running the code from the new django app, we can just prevent the signal handlers being connected in the first place:

@patch('django.dispatch.receiver')
def test_cookie_cutter(self, _receiver):
    from cookiecutter.main import cookiecutter
    
    with tempfile.TemporaryDirectory() as dirname:
        # Put our new temporary directory on the PYTHONPATH.
        sys.path.insert(0, dirname)
        
        cookiecutter(
            'integrations/__template__', 
            extra_context={'system_name': 'new_thing'}, 
            output_dir=dirname,
            no_input=True,
        )
        with modify_settings(INSTALLED_APPS={'append': 'integrations.systems.new_thing'}):
            call_command('makemigrations', 'new_thing', no_input=True, verbosity=0)

That one change of mocking out the @receiver decorator means that it won’t attach the signals that it comes across when doing the modify_settings, which is exactly what I want to happen in this case.

Docker + Makefile

I rewrote one of my projects (Fronius Dashboard) in Python - more because I was no longer able to get the build to work correctly under Elixir. As a side-effect, the image size went down a lot.

Part of this process is to build multiple-architectures, and publish these manifests. As an aside, being able to version them is also nice.

We’ll start with the versioning, because that’s a bit simpler.

Take a file VERSION. Put the current version into this file (x.y.z format is the only one supported so far).

Now, we can have a few tools to handle this:

.PHONY: bump-major bump-minor

requirements.txt: poetry.lock pyproject.toml
	poetry export -o requirements.txt

bump-major:
	cat VERSION | awk -F. '{print $$1 + 1 ".0.0"}' | tee VERSION
	
bump-minor:
	cat VERSION | awk -F. '{print $$1 "." $$2 + 1  ".0"}' | tee VERSION
	
VERSION: app.py requirements.txt
	cat VERSION | awk -F. '{print $$1 "." $$2 "." $$3 + 1}' | tee VERSION

This allows us to have make bump-major that adds one to the existing major version, and resets the minor and patch versions. And another version that adds one to the minor version, and resets the patch version.

There is no bump-patch, instead every file that could possibly affect the code is included in the make VERSION dependencies list. This means that make VERSION will only run if any files have changed, and in that case it will increment the patch version.


So, that’s the versioning. How about publishing docker images?

There are a couple of things that we need to do:

  • build for a number of platforms (amd64, armv6 and armv7, because I run stuff on Raspberry Pi hardware).
  • create and push a manifest of all images
  • also create and push a tagged version (ie, not just latest).
IMAGE := <image-name-goes-here>

.PHONY: release bump-major bump-minor
	
release: Dockerfile VERSION
	docker buildx build . -t $(IMAGE):armv6 --platform linux/arm/v6 --push
	docker buildx build . -t $(IMAGE):armv7 --platform linux/arm/v7 --push
	docker buildx build . -t $(IMAGE):amd64 --platform linux/amd64 --push
	
	docker pull $(IMAGE):armv6
	docker pull $(IMAGE):armv7
	docker pull $(IMAGE):amd64
	
	docker image rm --force $(IMAGE):latest
	
	docker manifest create $(IMAGE):latest \
		$(IMAGE):armv6 \
		$(IMAGE):armv7 \
		$(IMAGE):amd64 \
		--amend
	
	docker manifest annotate $(IMAGE):latest $(IMAGE):armv6 --variant v6l
	docker manifest annotate $(IMAGE):latest $(IMAGE):armv7 --variant v7l
	
	docker manifest create $(IMAGE):$(shell cat VERSION) \
		$(IMAGE):armv6 \
		$(IMAGE):armv7 \
		$(IMAGE):amd64 \
		--amend
	
	docker manifest annotate $(IMAGE):$(shell cat VERSION) $(IMAGE):armv6 --variant v6l
	docker manifest annotate $(IMAGE):$(shell cat VERSION) $(IMAGE):armv7 --variant v7l
	
	docker manifest push $(IMAGE):$(shell cat VERSION)
	docker manifest push $(IMAGE):latest

requirements.txt: poetry.lock pyproject.toml
	poetry export -o requirements.txt

bump-major:
	cat VERSION | awk -F. '{print $$1 + 1 ".0.0"}' | tee VERSION
	
bump-minor:
	cat VERSION | awk -F. '{print $$1 "." $$2 + 1  ".0"}' | tee VERSION
	
VERSION: app.py requirements.txt
	cat VERSION | awk -F. '{print $$1 "." $$2 "." $$3 + 1}' | tee VERSION

There is a bit of repetition - I’m sure I could do something using Makefile expansion, but this works for now.

Cleaning up a required process when you quit

Some time ago, I moved a bunch of our devops tools into a Makefile. I’m not sure exactly what the benefit was then, but there have been some challenges.

One of these was related to having to run multiple processes when performing a task, but cleaning up the dependencies when we finish our task. Specifically, we run a websocket server in production that uses Redis for PubSub. We can safely assume the developer is running redis already, since the rest of the platform also depends upon it. However, to collect the changes from Postgres and publish them into Redis, we use a custom django management command. All this does is listen for postgres notifications and forward them to redis.

$ ./manage.py relay

Mostly we don’t keep this running in development, unless you need to be running the websocket code locally for testing. It is a hassle to remember to start a process and then kill it when you are done, so it would be convenient to start the relay command, then start our websocket worker, and clean up both processes when we quit the worker.

This can be done in a Makefile command:

websocket:
    $(shell trap 'kill 0' SIGINT ; \
        ./manage.py relay & \
        gunicon \
          -k geventwebsocket.gunicorn.workers.GeventWebsocketWorker \
          -workers=1 \
          -reload \
          webocket:application \
    )

The trap command performs the magic - it will kill the background command when the gunicorn worker is killed with Ctrl-C.

I have some other commands which start the background process, and then use kill -0 $(shell ps ax | grep ... | head -n 1 | cut -d ' ' -f 1) to ensure they kill the process on quit, but this sometimes does not work: I should get around to changing them over to this…

ESPHome variables from Home Assistant

One of my key tenets of Home Automation is that as much stuff should be done locally as possible. Whilst with HomeAssistant, that means “in the local network”, I feel that “in the device” is even better. However, sometimes values should still be configurable (from within Home Assistant), but the device should still work even if there is not (and has never been) a connection to Home Assistant since boot.

For instance, I have a sensor light. The device contains a PIR sensor, with configurable delay, sensitivity and lux settings. This in turn drives a relay connected to a pair of PAR bulbs, and additionally has a green LED. This device usually only has the ESP8266 controlling if the thing should ever turn on at all, but I hacked it so that it exposes the PIR sensor, and allows triggering the relay.

But the delay between the motion no longer being detected and the light turning off should be configurable. I don’t want to have to re-flash the device if I decide to change this value, so I wanted to make it that the value will be fetched from Home Assistant (if it is set).

This turned out to work really well. There are a few parts that need to be set up in the YAML (this is not the complete file):

esphome:
  name: ${device_name}
  platform: ESP8266
  board: esp01_1m
  on_boot:
    - logger.log:
        level: DEBUG
        format: 'Light ON time is set to %d'
        args: ['id(light_on_time)']

globals:
  - id: light_on_time
    type: int
    restore_value: true
    initial_value: '30'

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        - globals.set:
            id: light_on_time
            value: !lambda 'return int(x);'

It really is that simple. The first boot will set the light_on_time variable to 30. Then, when it connects to Home Assistant, it will look for an input_number.<device_name> (which matches the device name). If it finds one (or is ever told about this value changing), then it will commit that new value to the flash, and this will be be restored after a reboot.

There is one other thing we could do here, to make it so that we don’t write the value to the flash if it has not changed (and prevent wear to that, since it is limited to a number of writes):

sensor:
  - platform: homeassistant
    id: on_time
    entity_id: input_number.${device_name}
    on_value:
      then:
        if:
          condition:
            lambda: 'return id(light_on_time) != int(x);'
          then:
            - logger.log:
                level: DEBUG
                format: 'Light ON time changed from %d to %d seconds'
                args: ['id(light_on_time)', 'int(x)']
            - globals.set:
                id: light_on_time
                value: !lambda 'return int(x);'

With regards to a similar problem, detecting if it is dark enough to turn the light on should be something like “a bit before sunset to a bit after sunrise”. I could set the lux threshold on the device, but it would be nice to have motion detection work during the day too.

Here, we can have a global variable sun_is_down that defaults to being true, and is only set to false when the sun comes up. However, we would also want this to trigger when first connecting to Home Assistant and the sun is already up.

We can use a wait_until in our on_boot handler to trigger this:

globals:
  - id: sun_is_down
    type: bool
    restore_value: false
    initial_value: 'true'

esphome:
  ...
  on_boot:
    - wait_until:
        api.connected:
    - if:
        condition:
          sun.is_above_horizon:
        then:
          - logger.log:
              level: DEBUG
              format: 'Sun is up, light will not turn on.'
          - globals.set:
              id: sun_is_down
              value: 'false'
        else:
          - logger.log:
              level: DEBUG
              format: 'Sun is down, light will turn on.'

sun:
  on_sunrise:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now down, light will turn on.'
        - globals.set:
            id: sun_is_down
            value: 'false'
  on_sunset:
    - elevation: 10°
      then:
        - logger.log:
            format: 'The sun is now up, light will not turn on.'
        - globals.set:
            id: sun_is_down
            value: 'true'

I’ve used a 10° elevation to trigger the light to turn on a bit before sunset and still turn on a bit after sunrise when motion is detected. I haven’t figured out a way to get the same check to apply in the sun.is_above_horizon: check.

Mute the rest of this track.

Working from home means that I don’t use my AirPods as much as I used to: I’m happy to mostly listen to the music my partner (who has worked from home for over a decade) wants to listen to. Lately, that has been NonStop80s.

We play this through the IKEA Symphonisk stereo pair in the dining room, several metres away from the office. The sound is good, and it’s nice not having it come out of the computer speaker.

Because I’m a nut, I have all of this hooked up to Home Assistant, and have one nice automation that will turn off the music that is playing on that speaker when the TV in that room is turned on.

Sometimes the tracks that are played on NonStop80s are really bad. For instance, Snooker Loopy, by Chas and Dave is an absolute piece of shit song. If it weren’t a streaming station, I’d just hit skip.

What would be nice is to be able to have the speakers stop playing, and start playing again when the next track comes on.

Enter “Mute the rest of this track”.

alias: Mute the rest of this track
sequence:
  - condition: state
    entity_id: media_player.dining_room_sonos
    state: playing
  - condition: template
    value_template: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: true
    entity_id: media_player.dining_room_sonos
  - delay: '1'
  - wait_for_trigger:
      - platform: state
        entity_id: media_player.dining_room_sonos
    timeout: ''
  - service: media_player.volume_mute
    data:
      is_volume_muted: false
    entity_id: media_player.dining_room_sonos
mode: single
icon: 'mdi:volume-mute'
description: Mute the media player until the media_title attribute changes.
fields:
  timeout:
    description: Amount of time to stay muted if the track does not change
    example: 300

This is a script that you can use in Home Assistant to do just that. It will (iff the dining_room_sonos is unmuted, and currently playing), mute that speaker until something else on the speaker changes.

This could be a track change, or a volume change, or anything else. It’s deliberately loose, so that if it was muted, and someone changes the volume, it unmutes immediately.

After a configurable (default 300 seconds) timeout, it will unmute.

I was trying to make it so that you could provide the named media_player entity, but there still seem to be places where you can’t use templates (but need to provide the string directly). I was able to get the conditions to use the variable, but was not able to figure out how to use them in the service calls…and to be honest, I don’t really need it to apply to other players.