Sonoff Touch LAN Mode and MQTT

I’ve had a couple of the Sonoff Basic devices for quite some time. It’s fairly easy to solder some header pins onto these which makes flashing the firmware somewhat of a non event, but it’s still a bit of a pain.

The other thing I bought (again, some time ago, but a bit after the Sonoff Basic) is a Sonoff Touch. This is an in-wall light switch replacement, which means you can replace your existing light switches with something that you can control over WiFi. They actually look pretty nice, too.

I wasn’t so keen on mucking around with soldering them, partly because you need to use a 90° header. However, the other day I learned that there is a way to control them (and the Basic) without having to connect to the iTead servers.

When the devices are unable to connect to a remote server (yes, they basically keep a connection to this remote server open 100% of the time, which was part of the rationalé behind flashing the firmware), the go into LAN mode.

When they are in LAN mode, they will respond to WebSocket connections over port 8081, making it easy to control them directly.

In my router (running LEDE), I can set a specific range of IP addresses to be unable to connect out to the internet, and then all I need to do is make sure the devices get one of these IP addresses.

The configuration process is something like:

  • Touch the switch toggle for 7 seconds. This puts it into pairing mode, where it acts as an Access Point (AP).
  • Connect to the new WiFi network ITEAD_100000xxxxxx.
  • Get the MAC address of the device at 10.10.7.1
  • Tell the router to reserve an IP address in the required range for this MAC address.
  • POST data to the device (10.10.7.1) with a JSON object that contains the WiFi credentials. This will trigger the device to disconnect the AP, and connect to the WiFi network. It’s also possible at this time to tell it to connect to a different server (which I may do instead at some point, but this method was quicker for now).
  • Connected to your WiFi, send JSON messages over a WebSocket connection to the device (at it’s fixed IP address).

I’m hoping at some point to automate this, but it’s meaningless to do so until I get a bunch more devices.


So, on to the software.

Ultimately, the plan is to control these devices using HomeKit. I started writing a direct bridge (similar to my MQTT HomeKit bridge), but then decided it would be simpler to just bridge to MQTT - I could then use the correct topic names and values to allow it to interact with that MQTT HomeKit bridge.

There’s really only two things to do:

  • Connect to the Sonoff device, and wait for events from there as to the switch state. Push these changes to our MQTT topic.
  • Connect to the MQTT broker, and subscribe to our topic. When we get events, push these to the Sonoff device.

I attempted to play around with asyncio to get this to work, but I can’t remember enough about how to use it, so I went for an easier (for me) solution.

At this stage, it’s just a single Sonoff being controlled.

import json
import time
import enum

from websocket import create_connection
from paho.mqtt import client as mqtt

API_KEY = 'bba2e54d-7202-4a75-bd26-307597a1dd7d'
TOPIC = 'HomeKit/sonoff-{}/Lightbulb/On'


class State(enum.Enum):
    ON = 'on'
    OFF = 'off'

    @classmethod
    def parse(cls, data):
        if data in [cls.ON.value, True, 'true', 1, '1']:
            return cls.ON
        elif data in [cls.OFF.value, False, 'false', 0, '0']:
            return cls.OFF
        value = json.loads(data)['params']['switch']
        if value == cls.ON.value:
            return cls.ON
        return cls.OFF

    def __invert__(self):
        if self == State.ON:
            return State.OFF
        return State.ON

    def __bool__(self):
        return self == State.ON


class Sonoff:
    def __init__(self, host):
        self._state = None
        timestamp = str(time.time()).replace('.', '')
        self.ws = create_connection('ws://{}:8081/'.format(host))
        self.ws.send(json.dumps({
            'action': 'userOnline',
            'ts': timestamp,
            'version': 6,
            'apikey': API_KEY,
            'sequence': timestamp,
            'userAgent': 'HomeKit'
        }))
        self.deviceid = json.loads(self.ws.recv())['deviceid']
        print('Connectod to {}'.format(self.deviceid))

        self.client = mqtt.Client()
        self.client.on_connect = self.mqtt_init
        self.client.on_message = self.handle_mqtt_message
        self.client.connect('mqtt.lan', 1883, 60)

        self.state = State.parse(self.ws.recv())
        print('Current state is {}'.format(self._state.name))

    @property
    def topic(self):
        return TOPIC.format(self.deviceid)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value: State):
        if value == self.state:
            return
        timestamp = (str(time.time())).replace('.', '')
        self.ws.send(json.dumps({
            "action": "update",
            "deviceid": "nonce",
            "apikey": "nonce",
            "selfApikey": "nonce",
            "params": {
                "switch": value.value
            },
            "sequence": timestamp,
            "userAgent": "app"
        }))
        self._state = value
        self.client.publish(self.topic, int(bool(self.state)), retain=1)

    def on(self):
        self.state = State.ON

    def off(self):
        self.state = State.OFF

    def toggle(self):
        self.state = ~self.state

    def wait_for_ws(self):
        result = self.ws.recv()
        if 'switch' in result:
            self.state = State.parse(result)

    def handle_mqtt_message(self, client, userdata, message):
        self.state = State.parse(message.payload.decode())

    def mqtt_init(self, client, userdata, flags, rc):
        client.subscribe(self.topic)
        print("Subscribed to {}".format(self.topic))

    def start(self):
        self.client.loop_start()
        try:
            while True:
                time.sleep(0.01)
                self.wait_for_ws()
        finally:
            self.client.loop_stop()


if __name__ == '__main__':
    sonoff = Sonoff('10.1.10.140')
    sonoff.start()

I’m still not totally happy with the State stuff: I think I’ll use a simpler mapping there. But this works, and integrates nicely with my MQTT HomeKit bridge.