Form and Formset

Sometimes, you’ll have an object that you want to save, and, at the same time, some related objects that should also be updated, created and/or deleted.

Django has really nice tools for doing both of these operations (ModelForm for the individual instance, and InlineFormSet for the group of related objects). Both of these are really well documented. However, it is nice to be able to encapsulate these operations into a single functional unit.

We can leverage the fact that all request data is passed to a form class when it is instantiated, along with some nice use of the django cached_property decorator to make this really quite neat.

Let’s consider this model structure: we have a Person, and each Person may have zero or more Addresses. Every Person has a name, and an optional date of birth. All of the fields for the address are required:

class Person(models.Model):
    name = models.TextField()
    date_of_birth = models.DateField(null=True, blank=True)


class Address(models.Model):
    person = models.ForeignKey(Person, related_name='addresses')
    street = models.TextField()
    suburb = models.TextField()
    postcode = models.TextField()
    country = django_countries.fields.CountryField()

We can have a view for updating the Person model instance that is very simple:

class PersonForm(forms.ModelForm):
    name = forms.TextInput()
    date_of_birth = forms.DateInput()

    class Meta:
        model = Person
        fields = ('name', 'date_of_birth')


class UpdatePerson(UpdateView):
    form_class = PersonForm

Likewise, we can have a view for updating a person’s addresses:

AddressFormSet = inlineformset_factory(
    Person,
    Address,
    fields=('street', 'suburb', 'postcode', 'country'),
)


class UpdateAddresses(UpdateView):
    form_class = AddressFormSet

As mentioned above, we’d like to have a page where a Person’s name, date of birth and addresses may be modified in one go, rather than having to have two seperate pages.

from django.utils.functional import cached_property
from django.utils.translation import ugettext as _


class PersonForm(forms.ModelForm):
    name = forms.TextInput()
    date_of_birth = forms.DateInput()

    class Meta:
        model = Person
        fields = ('name', 'date_of_birth')

    @cached_property
    def addresses(self):
        return inlineformset_factory(
            Person, Address, fields=('street', 'suburb', 'postcode', 'country')
        )(
            data=self.data,
            files=self.files,
            instance=self.instance,
            prefix='address',
        )

    def clean(self):
        # Just in case we are subclassing some other form that does something in `clean`.
        super().clean()
        if not self.addresses.is_valid():
            self.add_error(None, _('Please check the addresses'))

    def save(self, commit=True):
        result = super().save(commit=commit)
        self.addresses.save(commit=commit)
        return result


class UpdatePerson(UpdateView):
    form_class = PersonForm

So, how does this work?

When the form.addresses attribute is accessed, the decorator looks up to see if it has been accessed within this request-response cycle. On the first access, a new formset class is generated from the factory, which is then instantiated with the arguments as shown. Every other access will result in the cached value from the instantiation being used, keeping everything working.

Within our template, we can just render the formset normally, however, we may want to use some fancy javascript to make it dynamic. In this case, I’ll just use the default rendering as seen in the django formset documentation.

<form action="{% url 'person:update' form.instance.pk %}"
      method="POST">
  {% csrf_token %}
  
  

  <button type="submit">
    {% trans 'Save' %}
  </button>
</form>

Query Zen is no queries at all

Performing no queries is always going to be faster than performing a query.

Today I had two instances of the same problem: I have two tables, one of which essentially stores calculated data based on other data (and data in other tables, or involving a process that uses application code, and cannot be purely determined within the database).

In one case, we have an audit logging table (which is purely handled within postgres) and another related table that stores a string representation of what the audited object looked like according to the application at that point in time, which needs to be calculated after the fact in Django.

The other case stores some cached values that can be calculated in the database: basically some metadata about a shift according to the location that the shift is at. Changes to the shift table will cause this value to automatically be updated, however we have several million shifts that do not currently have this value, but we need to create items for all shifts that currently don’t have the annotation.

In both cases, we have a celery task that will create a (relatively small, to prevent locks and other performance issues) number of the related objects, but only for those that don’t already have one. The tricky bit is that we need to trigger another instance of the celery task if we still have remaining objects in the database that don’t yet have the related item.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if not missing.exists():
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Since we have some 15 million audit logs (so far), it turns out that this missing.exists() was taking several seconds to run. I tried to write an optimised version, but was not able to improve the performance.

Then, it occurred to me (thanks mlt- on #postgres), that we can look at the number of items we created, and see if it was the same as the batch_size. If it was smaller than the batch size, then we know we are up to date, and don’t need to reschedule our task.

@app.task
def update_missing_items(batch_size=100):
    missing_items = AuditLog.objects.filter(instance_repr=None)
    created = InstanceRepr.objects.bulk_create([
      InstanceRepr(
        audit_log=log,
        # ...
      ) for log in missing_items[:batch_size]
    ])

    if len(created) == batch_size:
      update_missing_items.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Bingo: since we needed to execute the query to fetch the objects to begin with, we are now doing no extra work to see if we need to run our task again.


The other situation can be done in the database, however a single query of several million rows will block other things from happening, so we want to run the update in batches. There is a trigger on the table so that new or updated rows will already have a value, which actually makes it the same problem, but in SQL:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    JOIN ... ON (...)
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    LIMIT 1000
), step_2 AS (
  ...
),
..., step_N AS (
  ...
)
INSERT INTO annotations (shift_id, ...) SELECT * FROM step_N;

There’s actually a bunch more to this, but it’s not really important: building up the annotations hits a couple of other tables, and I’ve used a CTE because each value is based on a previous annotation.

We can put this query into a task too, but we need some way of determining how many inserts we did. Luckily, Postgres has the RETURNING clause on an INSERT. It would be really nice if we could do:

WITH step_1 AS (...)
INSERT INTO annotations (shift_id, ...)
SELECT * FROM step_N
RETURNING COUNT(*)

Alas, that’s not possible. However, we can just extend our CTE:

WITH step_1 AS (
  SELECT shift_id, ...
    FROM shifts
    ...
    LEFT OUTER JOIN annotations USING (shift_id)
    WHERE annotations.shift_id IS NULL
    -- NOTE: the LIMIT value is a parameter!
    LIMIT %s
),
...,
step_N AS (...),
inserts AS (
  INSERT INTO annotations(shift_id, ...)
  SELECT * FROM step_N
  RETURNING shift_id
)
SELECT COUNT(*) FROM inserts

Then, we can write our celery task:

from django.db import connection

@app.task
def update_missing_annotations(batch_size):
    with connection.cursor() as cursor:
        cursor.execute(QUERY, [batch_size])
        if cursor.fetchone()[0] == batch_size:
            update_missing_annotations.apply_async(kwargs={'batch_size': batch_size}, countdown=1)

Partially failing tests

$ ack unittest.expected | grep expect | wc -l
        12

I’m not proud to say it, but we have about (exactly?) a dozen tests in our system that for one reason or another are currently failing. These were determined to not be enough to block development, so they are still there. As sort-of shown in the command line above, it’s possible to mark these tests as “Expected to Fail”. From time to time, I’ll go back and revisit these, and see if there’s anything that we can fix in the short term.

Today, however, I was working on something, and since it was going to touch some of the same code that my big query performance branch is already doing, I decided to work from there. I merged it all in, and wrote some new tests, and updated the queries.

But it turns out that the query performance branch is missing some bits that are required for one of the queries. I didn’t realise this until I’d written the tests, and the migrations to update the queries.

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

I was able to remove the migration easily enough, but I didn’t really want to lose the tests.

I could have just commented them out, but I thought maybe I’d just try to mark those parts as expectedFailure:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        with unittest.expectedFailure:
          # This assertion will fail, but we don't care right now.
          self.assertEqual('baz', qux)

However, this doesn’t work.

Hmm, I thought, what if we put that into a seperate method, not prefixed with test_:

class TestNewThing(TestCase):
    def test_new_thing(self):
        # complex set up that creates a bunch of things that are used by all assertions.

        # This assertion should be passing...and we want to be notified if it fails in the future.
        self.assertEqual('foo', bar)

        self.optional_tests()

    @unittest.expectedFailure
    def optional_tests(self):
        # This assertion will fail, but we don't care right now.
        self.assertEqual('baz', qux)

And this actually works. If you have a failure in the inner method, this will not trigger a failed build, but if you have a failure in the actual test method, then it will.

Hallelujah

I have been listening to Malcolm Gladwell’s podcast Revisionist History. It’s been quite entertaining, but I sometimes do feel that he is a little less than honest with some of his opinions. I know there are people who have strong opinions about Gladwell, but I’m not really one of them. I have read some of his stuff, and find myself sometimes in agreement with his hypothesis, but not always. I think not blindly accepting what he talks about is prudent: but the same is true of everyone.

In episode Hallelujah, the discussion starts with a foray into Elvis Costello, but at some point then pivots and talks about the Leonard Cohen song, covered by almost everyone, but most famously Jeff Buckley. Gladwell’s point is that some songs (or works of art, or other works of genius) are “fully formed”, but others take iteration, and in some cases, a bunch of it.

Specifically, he states that the song Hallelujah had this really long, really unlikely string of events that all needed to happen in order for the song to become recognised as the magical entity that it is. I’m not denying that, but he does miss the point that there are probably many more chains of events that could have resulted in songs just as good as this; maybe even better.

We only know about Hallelujah because in our universe, there was a completed chain of events that lead to the song being released by Buckley, and then his unfortunate demise. I also wonder if perhaps the song may still have become as famous as it did even if he had not drowned: my iTunes library tells me another song of his was in the Triple J Hottest 100 the year before he died.

I’m not doubting at all that Leonard Cohen, Elvis Costello or Jeff Buckley were all musical geniuses: but this selection bias hides the fact that there could be many more musical geniuses out there that wrote songs that did fade into obvlivion.

I’m still glad we got Hallelujah, though.

Sonoff Touch LAN Mode and MQTT

I’ve had a couple of the Sonoff Basic devices for quite some time. It’s fairly easy to solder some header pins onto these which makes flashing the firmware somewhat of a non event, but it’s still a bit of a pain.

The other thing I bought (again, some time ago, but a bit after the Sonoff Basic) is a Sonoff Touch. This is an in-wall light switch replacement, which means you can replace your existing light switches with something that you can control over WiFi. They actually look pretty nice, too.

I wasn’t so keen on mucking around with soldering them, partly because you need to use a 90° header. However, the other day I learned that there is a way to control them (and the Basic) without having to connect to the iTead servers.

When the devices are unable to connect to a remote server (yes, they basically keep a connection to this remote server open 100% of the time, which was part of the rationalé behind flashing the firmware), the go into LAN mode.

When they are in LAN mode, they will respond to WebSocket connections over port 8081, making it easy to control them directly.

In my router (running LEDE), I can set a specific range of IP addresses to be unable to connect out to the internet, and then all I need to do is make sure the devices get one of these IP addresses.

The configuration process is something like:

  • Touch the switch toggle for 7 seconds. This puts it into pairing mode, where it acts as an Access Point (AP).
  • Connect to the new WiFi network ITEAD_100000xxxxxx.
  • Get the MAC address of the device at 10.10.7.1
  • Tell the router to reserve an IP address in the required range for this MAC address.
  • POST data to the device (10.10.7.1) with a JSON object that contains the WiFi credentials. This will trigger the device to disconnect the AP, and connect to the WiFi network. It’s also possible at this time to tell it to connect to a different server (which I may do instead at some point, but this method was quicker for now).
  • Connected to your WiFi, send JSON messages over a WebSocket connection to the device (at it’s fixed IP address).

I’m hoping at some point to automate this, but it’s meaningless to do so until I get a bunch more devices.


So, on to the software.

Ultimately, the plan is to control these devices using HomeKit. I started writing a direct bridge (similar to my MQTT HomeKit bridge), but then decided it would be simpler to just bridge to MQTT - I could then use the correct topic names and values to allow it to interact with that MQTT HomeKit bridge.

There’s really only two things to do:

  • Connect to the Sonoff device, and wait for events from there as to the switch state. Push these changes to our MQTT topic.
  • Connect to the MQTT broker, and subscribe to our topic. When we get events, push these to the Sonoff device.

I attempted to play around with asyncio to get this to work, but I can’t remember enough about how to use it, so I went for an easier (for me) solution.

At this stage, it’s just a single Sonoff being controlled.

import json
import time
import enum

from websocket import create_connection
from paho.mqtt import client as mqtt

API_KEY = 'bba2e54d-7202-4a75-bd26-307597a1dd7d'
TOPIC = 'HomeKit/sonoff-{}/Lightbulb/On'


class State(enum.Enum):
    ON = 'on'
    OFF = 'off'

    @classmethod
    def parse(cls, data):
        if data in [cls.ON.value, True, 'true', 1, '1']:
            return cls.ON
        elif data in [cls.OFF.value, False, 'false', 0, '0']:
            return cls.OFF
        value = json.loads(data)['params']['switch']
        if value == cls.ON.value:
            return cls.ON
        return cls.OFF

    def __invert__(self):
        if self == State.ON:
            return State.OFF
        return State.ON

    def __bool__(self):
        return self == State.ON


class Sonoff:
    def __init__(self, host):
        self._state = None
        timestamp = str(time.time()).replace('.', '')
        self.ws = create_connection('ws://{}:8081/'.format(host))
        self.ws.send(json.dumps({
            'action': 'userOnline',
            'ts': timestamp,
            'version': 6,
            'apikey': API_KEY,
            'sequence': timestamp,
            'userAgent': 'HomeKit'
        }))
        self.deviceid = json.loads(self.ws.recv())['deviceid']
        print('Connectod to {}'.format(self.deviceid))

        self.client = mqtt.Client()
        self.client.on_connect = self.mqtt_init
        self.client.on_message = self.handle_mqtt_message
        self.client.connect('mqtt.lan', 1883, 60)

        self.state = State.parse(self.ws.recv())
        print('Current state is {}'.format(self._state.name))

    @property
    def topic(self):
        return TOPIC.format(self.deviceid)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value: State):
        if value == self.state:
            return
        timestamp = (str(time.time())).replace('.', '')
        self.ws.send(json.dumps({
            "action": "update",
            "deviceid": "nonce",
            "apikey": "nonce",
            "selfApikey": "nonce",
            "params": {
                "switch": value.value
            },
            "sequence": timestamp,
            "userAgent": "app"
        }))
        self._state = value
        self.client.publish(self.topic, int(bool(self.state)), retain=1)

    def on(self):
        self.state = State.ON

    def off(self):
        self.state = State.OFF

    def toggle(self):
        self.state = ~self.state

    def wait_for_ws(self):
        result = self.ws.recv()
        if 'switch' in result:
            self.state = State.parse(result)

    def handle_mqtt_message(self, client, userdata, message):
        self.state = State.parse(message.payload.decode())

    def mqtt_init(self, client, userdata, flags, rc):
        client.subscribe(self.topic)
        print("Subscribed to {}".format(self.topic))

    def start(self):
        self.client.loop_start()
        try:
            while True:
                time.sleep(0.01)
                self.wait_for_ws()
        finally:
            self.client.loop_stop()


if __name__ == '__main__':
    sonoff = Sonoff('10.1.10.140')
    sonoff.start()

I’m still not totally happy with the State stuff: I think I’ll use a simpler mapping there. But this works, and integrates nicely with my MQTT HomeKit bridge.

Postgres ALTER TABLE ... USING

Helped out a person in #django IRC some time ago, and learned something new about Postgres.

I’ve had data type migrations in the past (where you change a database column in some way, and need to alter the data that is already in that column), however I’ve created a function to do so.

It turns out you can just write an expression, and that works too:

  ALTER TABLE defect_defect
 ALTER COLUMN risk_rating
          SET DATA TYPE INTEGER
              USING CASE WHEN risk_rating = 'HIGH'   THEN 3
                         WHEN risk_rating = 'MEDIUM' THEN 2
                         WHEN risk_rating = 'LOW'    THEN 1;

Of course, this still would cause issues if you had code running from the old version (that expected the text values). However, it is nice to know.