Examples

Prerequisites

All of these examples require a separate user-defined module named secrets.py. In this secrets module should be 2 dicts:

  1. wifi_settings consisting of parameter names and values for the WiFi configuration.

  2. mqtt_settings consisting of parameter names and values for the MQTT broker configuration.

secrets.py
"""
This file is where you keep secret settings, passwords, and tokens!
If you put them in the code you risk committing that info or sharing it
"""

wifi_settings = dict(
    ssid = "WiFi_Network_Name",
    password = "WiFi_Password",
)

mqtt_settings = dict(
    broker="openhabian",  # the broker's hostname or IP address
    port=1883,  # the broker's port
    username="user_name",
    password="user_password",
)

The MQTT username and password may not be required if you skipped Setting a username and password.

Dependencies

Some examples use other third-party libraries (which are available in Adafruit’s CircuitPython Library bundle). These libraries are listed in the examples/requirements.txt file for easy install.

examples/requirements.txt
adafruit-circuitpython-minimqtt
adafruit-circuitpython-dotstar
adafruit-circuitpython-neopixel
adafruit-circuitpython-ntp
adafruit-circuitpython-dht

Simple test

Ensure your device works with this simple test.

This example uses the PropertyRGB class to represent the on-board LED (Neopixel or Dotstar). The color is controlled using OpenHAB.

examples/homie_simpletest.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import time
import board
import socketpool  # type: ignore
import wifi  # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyRGB

if not hasattr(board, "NEOPIXEL"):
    # assume board has a builtin dotstar
    import adafruit_dotstar

    pixel = adafruit_dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1)
else:
    import neopixel

    pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)

# Get wifi details and more from a secrets.py file
try:
    from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
    raise RuntimeError(
        "WiFi and MQTT secrets are kept in secrets.py, please add them there!"
    ) from exc

print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)

print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)

# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-simple-test-id")
led_node = HomieNode("light", "RGB DotStar")
led_property = PropertyRGB("color", settable=True)

# append the objects to the device's attributes
led_node.properties.append(led_property)
device.nodes.append(led_node)

pixel.fill(led_property.value)  # color the LED with the property's value

# add a callback to remotely control the LED
def change_color(client: MQTT, topic: str, message: str):
    """Change the color of the LED based on the message from the broker."""
    print("--> broker said color is now", message)
    # echo confirmation back to broker and convert to an 3-tuple of integers
    color = device.set_property(led_property, message)
    pixel.fill(color)
    print("<-- color is now", repr(color))


led_property.callback = change_color


def on_disconnected(client: MQTT, user_data, rc):
    """Callback invoked when connection to broker is terminated."""
    print("Reconnecting to the broker.")
    client.reconnect()
    device.set_state("ready")


mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")

# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"

# a forever loop
try:
    refresh_last = time.time()
    while True:
        try:
            now = time.time()
            if now - refresh_last > 0.5:  # refresh every 0.5 seconds
                refresh_last = now
                mqtt_client.loop()
        except MMQTTException:
            print("!!! Connection with broker is lost.")
except KeyboardInterrupt:
    device.set_state("disconnected")
    mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
    mqtt_client.deinit()

Light Sensor test

Demonstrates using the PropertyPercent class to represent a light sensor’s data.

examples/homie_light_sensor_test.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import time
import analogio
import board
import socketpool  # type: ignore
import wifi  # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyPercent

# Get wifi details and more from a secrets.py file
try:
    from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
    raise RuntimeError(
        "WiFi and MQTT secrets are kept in secrets.py, please add them there!"
    ) from exc

print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)

print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)

# create a light_sensor object for analog readings
light_sensor_pin = board.IO4  # change this accordingly
light_sensor = analogio.AnalogIn(light_sensor_pin)

# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-light-sensor-test-id")
ambient_light_node = HomieNode("ambient-light", "Light Sensor")
ambient_light_property = PropertyPercent("brightness")

# append the objects to the device's attributes
ambient_light_node.properties.append(ambient_light_property)
device.nodes.append(ambient_light_node)


def on_disconnected(client: MQTT, user_data, rc):
    """Callback invoked when connection to broker is terminated."""
    print("Reconnecting to the broker.")
    client.reconnect()
    device.set_state("ready")


mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")

# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"

# a forever loop
try:
    refresh_last = time.time()
    while True:
        try:
            now = time.time()
            if now - refresh_last > 0.5:  # refresh every 0.5 seconds
                refresh_last = now
                assert mqtt_client.is_connected()
                value = device.set_property(
                    ambient_light_property, light_sensor.value / 65535 * 100
                )
                print("light sensor value:", value, end="\r")
        except MMQTTException:
            print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
    device.set_state("disconnected")
    print()  # move cursor to next line
    mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
    mqtt_client.disconnect()

DHT Sensor test

Demonstrates using the PropertyPercent and PropertyFloat classes to represent a DHT11 sensor’s humidity and temperature data (respectively).

examples/homie_dht_sensor_test.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import time
import board
import socketpool  # type: ignore
import wifi  # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
import adafruit_dht
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyFloat, PropertyPercent

# Get wifi details and more from a secrets.py file
try:
    from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
    raise RuntimeError(
        "WiFi and MQTT secrets are kept in secrets.py, please add them there!"
    ) from exc

print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)

print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)

# create a light_sensor object for analog readings
dht_in = board.A0  # change this accordingly
dht_sensor = adafruit_dht.DHT11(dht_in)
dht_sensor.measure()  # update current data for Homie init

# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-dht-sensor-test-id")
dht_node = HomieNode("DHT11", "temperature/humidity sensor")
dht_temperature_property = PropertyFloat(
    "temperature", init_value=dht_sensor.temperature * (9 / 5) + 32, unit="°F"
)
dht_humidity_property = PropertyPercent(
    "humidity", datatype="integer", init_value=dht_sensor.humidity
)

# append the objects to the device's attributes
dht_node.properties.extend([dht_temperature_property, dht_humidity_property])
device.nodes.append(dht_node)


def on_disconnected(client: MQTT, user_data, rc):
    """Callback invoked when connection to broker is terminated."""
    print("Reconnecting to the broker.")
    client.reconnect()
    device.set_state("ready")


mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")

# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"

# a forever loop
try:
    refresh_last = time.time()
    while True:
        try:
            now = time.time()
            if now - refresh_last > 2:  # refresh every 2 seconds
                refresh_last = now
                mqtt_client.loop()
                dht_sensor.measure()  # update current data
                temp = device.set_property(
                    dht_temperature_property, dht_sensor.temperature * (9 / 5) + 32
                )
                print("Temp:", temp, dht_temperature_property.unit[-1], end=" ")
                humid = device.set_property(dht_humidity_property, dht_sensor.humidity)
                print("Humidity:", humid, dht_humidity_property.unit, end="\r")
        except MMQTTException:
            print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
    device.set_state("disconnected")
    print()  # move cursor to next line
    mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
    mqtt_client.deinit()
    dht_sensor.exit()

Clock test

A simple clock app that broadcasts the current date and time using the PropertyDateTime class.

The real time clock is synchronized using the adafruit_ntp library.

examples/homie_clock_test.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
import time

try:
    import socket

    CIR_PY = False
except ImportError:
    CIR_PY = True

    import socketpool  # type: ignore
    import wifi  # type: ignore
    import rtc  # type: ignore

from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from adafruit_ntp import NTP
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyDateTime

# Get wifi details and more from a secrets.py file
try:
    from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
    raise RuntimeError(
        "WiFi and MQTT secrets are kept in secrets.py, please add them there!"
    ) from exc

if CIR_PY:  # only on CircuitPython
    print("Connecting to", wifi_settings["ssid"])
    wifi.radio.connect(**wifi_settings)
    print("Connected successfully!")
    print("My IP address is", wifi.radio.ipv4_address)

    pool = socketpool.SocketPool(wifi.radio)

    # adjust this according to your time zone
    # https://en.wikipedia.org/wiki/List_of_time_zone_abbreviations
    TIME_ZONE = -7  # for Mountain Standard Time (North America)
    ntp = NTP(pool, tz_offset=TIME_ZONE)

    # sync the system clock (accessible via `time.localtime()`)
    rtc.RTC().datetime = ntp.datetime
else:  # on CPython
    print("My IP address is", socket.gethostbyname(socket.gethostname()))

print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
if CIR_PY:  # only on CircuitPython
    mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
else:  # on CPython
    mqtt_client = MQTT(**mqtt_settings, socket_pool=socket)

# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-clock-test-id")
clock_node = HomieNode("clock", "clock")
clock_property = PropertyDateTime(
    "current-time",
    init_value=PropertyDateTime.convert(time.localtime()),
)

# append the objects to the device's attributes
clock_node.properties.append(clock_property)
device.nodes.append(clock_node)


def on_disconnected(client: MQTT, user_data, rc):
    """Callback invoked when connection to broker is terminated."""
    print("Reconnecting to the broker.")
    client.reconnect()
    device.set_state("ready")


mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")

# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"

# a forever loop
try:
    refresh_last = time.time()
    time_fmt = clock_property.convert(time.localtime())
    while True:
        try:
            now = time.time()
            print(time_fmt, end="\r")
            if now - refresh_last >= 1:  # refresh every 1 second
                refresh_last = now
                assert mqtt_client.is_connected()
                time_fmt = device.set_property(clock_property, time.localtime())
        except MMQTTException:
            print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
    device.set_state("disconnected")
    print()  # move cursor to next line
    mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
    mqtt_client.deinit()

button test

A simple app that broadcasts the state of a switch using the PropertyBool class.

The on-board LED is used as visual feedback (to emulate a lamp).

examples/homie_button_test.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import time
import board
import digitalio
import socketpool  # type: ignore
import wifi  # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyBool

if not hasattr(board, "NEOPIXEL"):
    # assume board has a builtin dotstar
    import adafruit_dotstar

    pixel = adafruit_dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1)
else:
    import neopixel

    pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)
pixel.brightness = 0.5  # go easy on the eyes
pixel.fill((255, 0, 0))  # this example's initial value of the switch state is False

# Get wifi details and more from a secrets.py file
try:
    from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
    raise RuntimeError(
        "WiFi and MQTT secrets are kept in secrets.py, please add them there!"
    ) from exc

print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)

print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)

# instantiate the button and it's state
# declare the state as a list because using `global` is a bad habit
button_pin = board.BUTTON  # change this accordingly
button = digitalio.DigitalInOut(button_pin)
button.switch_to_input(digitalio.Pull.UP)
button_value = button.value  # for edge detection

# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-button-id")
switch_node = HomieNode("light", "Typical Lamp")
switch_property = PropertyBool("switch", settable=True)

# append the objects to the device's attributes
switch_node.properties.append(switch_property)
device.nodes.append(switch_node)

# add a callback to remotely control the switch_property
def change_state(client: MQTT, topic: str, message: str):
    """Change the switch's state according to message from the broker."""
    print("--> broker said switch_state is now", message)
    # echo confirmation back to broker
    switch_state = device.set_property(switch_property, message)
    pixel.fill((0, 255, 0) if switch_state else (255, 0, 0))
    print("<-- switch state is now", switch_state)


switch_property.callback = change_state


def on_disconnected(client: MQTT, user_data, rc):
    """Callback invoked when connection to broker is terminated."""
    print("Reconnecting to the broker.")
    client.reconnect()
    device.set_state("ready")


mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")

# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"

# a forever loop
try:
    refresh_last = time.time()
    while True:
        try:
            # toggle the switch's state on button press
            if button_value != button.value and not button.value:
                button_value = button.value
                state = device.set_property(switch_property, not switch_property.value)
                print("button was pressed", "<(!)>" if state else " (-) ")
                pixel.fill((0, 255, 0) if state else (255, 0, 0))
            elif button.value and button_value != button.value:
                button_value = button.value
            now = time.time()
            if now - refresh_last >= 1:  # refresh every 1 seconds
                refresh_last = now
                assert mqtt_client.is_connected()
                mqtt_client.loop()
        except MMQTTException:
            print("!!! Connection with broker is lost.")
except KeyboardInterrupt:
    device.set_state("disconnected")
    mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
    mqtt_client.deinit()