Examples
Prerequisites
All of these examples require a separate user-defined module named secrets.py
.
In this secrets module should be 2 dict
s:
wifi_settings
consisting of parameter names and values for the WiFi configuration.
mqtt_settings
consisting of parameter names and values for the MQTT broker configuration.
secrets.py
"""
This file is where you keep secret settings, passwords, and tokens!
If you put them in the code you risk committing that info or sharing it
"""
wifi_settings = dict(
ssid = "WiFi_Network_Name",
password = "WiFi_Password",
)
mqtt_settings = dict(
broker="openhabian", # the broker's hostname or IP address
port=1883, # the broker's port
username="user_name",
password="user_password",
)
The MQTT username and password may not be required if you skipped Setting a username and password.
Dependencies
Some examples use other third-party libraries (which are available in Adafruit’s CircuitPython
Library bundle). These libraries are listed in the examples/requirements.txt file for easy install.
examples/requirements.txt
adafruit-circuitpython-minimqtt
adafruit-circuitpython-dotstar
adafruit-circuitpython-neopixel
adafruit-circuitpython-ntp
adafruit-circuitpython-dht
Simple test
Ensure your device works with this simple test.
This example uses the PropertyRGB
class to represent the on-board LED (Neopixel or Dotstar).
The color is controlled using OpenHAB.
examples/homie_simpletest.py
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | import time
import board
import socketpool # type: ignore
import wifi # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyRGB
if not hasattr(board, "NEOPIXEL"):
# assume board has a builtin dotstar
import adafruit_dotstar
pixel = adafruit_dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1)
else:
import neopixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)
# Get wifi details and more from a secrets.py file
try:
from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
raise RuntimeError(
"WiFi and MQTT secrets are kept in secrets.py, please add them there!"
) from exc
print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)
print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-simple-test-id")
led_node = HomieNode("light", "RGB DotStar")
led_property = PropertyRGB("color", settable=True)
# append the objects to the device's attributes
led_node.properties.append(led_property)
device.nodes.append(led_node)
pixel.fill(led_property.value) # color the LED with the property's value
# add a callback to remotely control the LED
def change_color(client: MQTT, topic: str, message: str):
"""Change the color of the LED based on the message from the broker."""
print("--> broker said color is now", message)
# echo confirmation back to broker and convert to an 3-tuple of integers
color = device.set_property(led_property, message)
pixel.fill(color)
print("<-- color is now", repr(color))
led_property.callback = change_color
def on_disconnected(client: MQTT, user_data, rc):
"""Callback invoked when connection to broker is terminated."""
print("Reconnecting to the broker.")
client.reconnect()
device.set_state("ready")
mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")
# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"
# a forever loop
try:
refresh_last = time.time()
while True:
try:
now = time.time()
if now - refresh_last > 0.5: # refresh every 0.5 seconds
refresh_last = now
mqtt_client.loop()
except MMQTTException:
print("!!! Connection with broker is lost.")
except KeyboardInterrupt:
device.set_state("disconnected")
mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
mqtt_client.deinit()
|
Light Sensor test
Demonstrates using the PropertyPercent
class to represent a light sensor’s data.
examples/homie_light_sensor_test.py
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 | import time
import analogio
import board
import socketpool # type: ignore
import wifi # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyPercent
# Get wifi details and more from a secrets.py file
try:
from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
raise RuntimeError(
"WiFi and MQTT secrets are kept in secrets.py, please add them there!"
) from exc
print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)
print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
# create a light_sensor object for analog readings
light_sensor_pin = board.IO4 # change this accordingly
light_sensor = analogio.AnalogIn(light_sensor_pin)
# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-light-sensor-test-id")
ambient_light_node = HomieNode("ambient-light", "Light Sensor")
ambient_light_property = PropertyPercent("brightness")
# append the objects to the device's attributes
ambient_light_node.properties.append(ambient_light_property)
device.nodes.append(ambient_light_node)
def on_disconnected(client: MQTT, user_data, rc):
"""Callback invoked when connection to broker is terminated."""
print("Reconnecting to the broker.")
client.reconnect()
device.set_state("ready")
mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")
# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"
# a forever loop
try:
refresh_last = time.time()
while True:
try:
now = time.time()
if now - refresh_last > 0.5: # refresh every 0.5 seconds
refresh_last = now
assert mqtt_client.is_connected()
value = device.set_property(
ambient_light_property, light_sensor.value / 65535 * 100
)
print("light sensor value:", value, end="\r")
except MMQTTException:
print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
device.set_state("disconnected")
print() # move cursor to next line
mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
mqtt_client.disconnect()
|
DHT Sensor test
Demonstrates using the PropertyPercent
and PropertyFloat
classes to represent a DHT11 sensor’s
humidity and temperature data (respectively).
examples/homie_dht_sensor_test.py
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | import time
import board
import socketpool # type: ignore
import wifi # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
import adafruit_dht
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyFloat, PropertyPercent
# Get wifi details and more from a secrets.py file
try:
from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
raise RuntimeError(
"WiFi and MQTT secrets are kept in secrets.py, please add them there!"
) from exc
print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)
print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
# create a light_sensor object for analog readings
dht_in = board.A0 # change this accordingly
dht_sensor = adafruit_dht.DHT11(dht_in)
dht_sensor.measure() # update current data for Homie init
# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-dht-sensor-test-id")
dht_node = HomieNode("DHT11", "temperature/humidity sensor")
dht_temperature_property = PropertyFloat(
"temperature", init_value=dht_sensor.temperature * (9 / 5) + 32, unit="°F"
)
dht_humidity_property = PropertyPercent(
"humidity", datatype="integer", init_value=dht_sensor.humidity
)
# append the objects to the device's attributes
dht_node.properties.extend([dht_temperature_property, dht_humidity_property])
device.nodes.append(dht_node)
def on_disconnected(client: MQTT, user_data, rc):
"""Callback invoked when connection to broker is terminated."""
print("Reconnecting to the broker.")
client.reconnect()
device.set_state("ready")
mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")
# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"
# a forever loop
try:
refresh_last = time.time()
while True:
try:
now = time.time()
if now - refresh_last > 2: # refresh every 2 seconds
refresh_last = now
mqtt_client.loop()
dht_sensor.measure() # update current data
temp = device.set_property(
dht_temperature_property, dht_sensor.temperature * (9 / 5) + 32
)
print("Temp:", temp, dht_temperature_property.unit[-1], end=" ")
humid = device.set_property(dht_humidity_property, dht_sensor.humidity)
print("Humidity:", humid, dht_humidity_property.unit, end="\r")
except MMQTTException:
print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
device.set_state("disconnected")
print() # move cursor to next line
mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
mqtt_client.deinit()
dht_sensor.exit()
|
Clock test
A simple clock app that broadcasts the current date and time using the PropertyDateTime
class.
The real time clock is synchronized using the adafruit_ntp
library.
examples/homie_clock_test.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 | import time
try:
import socket
CIR_PY = False
except ImportError:
CIR_PY = True
import socketpool # type: ignore
import wifi # type: ignore
import rtc # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from adafruit_ntp import NTP
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyDateTime
# Get wifi details and more from a secrets.py file
try:
from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
raise RuntimeError(
"WiFi and MQTT secrets are kept in secrets.py, please add them there!"
) from exc
if CIR_PY: # only on CircuitPython
print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)
pool = socketpool.SocketPool(wifi.radio)
# adjust this according to your time zone
# https://en.wikipedia.org/wiki/List_of_time_zone_abbreviations
TIME_ZONE = -7 # for Mountain Standard Time (North America)
ntp = NTP(pool, tz_offset=TIME_ZONE)
# sync the system clock (accessible via `time.localtime()`)
rtc.RTC().datetime = ntp.datetime
else: # on CPython
print("My IP address is", socket.gethostbyname(socket.gethostname()))
print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
if CIR_PY: # only on CircuitPython
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
else: # on CPython
mqtt_client = MQTT(**mqtt_settings, socket_pool=socket)
# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-clock-test-id")
clock_node = HomieNode("clock", "clock")
clock_property = PropertyDateTime(
"current-time",
init_value=PropertyDateTime.convert(time.localtime()),
)
# append the objects to the device's attributes
clock_node.properties.append(clock_property)
device.nodes.append(clock_node)
def on_disconnected(client: MQTT, user_data, rc):
"""Callback invoked when connection to broker is terminated."""
print("Reconnecting to the broker.")
client.reconnect()
device.set_state("ready")
mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")
# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"
# a forever loop
try:
refresh_last = time.time()
time_fmt = clock_property.convert(time.localtime())
while True:
try:
now = time.time()
print(time_fmt, end="\r")
if now - refresh_last >= 1: # refresh every 1 second
refresh_last = now
assert mqtt_client.is_connected()
time_fmt = device.set_property(clock_property, time.localtime())
except MMQTTException:
print("\n!!! Connection with broker is lost.")
except KeyboardInterrupt:
device.set_state("disconnected")
print() # move cursor to next line
mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
mqtt_client.deinit()
|
A simple app that broadcasts the state of a switch using the PropertyBool
class.
The on-board LED is used as visual feedback (to emulate a lamp).
examples/homie_button_test.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 | import time
import board
import digitalio
import socketpool # type: ignore
import wifi # type: ignore
from adafruit_minimqtt.adafruit_minimqtt import MQTT, MMQTTException
from circuitpython_homie import HomieDevice, HomieNode
from circuitpython_homie.recipes import PropertyBool
if not hasattr(board, "NEOPIXEL"):
# assume board has a builtin dotstar
import adafruit_dotstar
pixel = adafruit_dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1)
else:
import neopixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)
pixel.brightness = 0.5 # go easy on the eyes
pixel.fill((255, 0, 0)) # this example's initial value of the switch state is False
# Get wifi details and more from a secrets.py file
try:
from secrets import wifi_settings, mqtt_settings
except ImportError as exc:
raise RuntimeError(
"WiFi and MQTT secrets are kept in secrets.py, please add them there!"
) from exc
print("Connecting to", wifi_settings["ssid"])
wifi.radio.connect(**wifi_settings)
print("Connected successfully!")
print("My IP address is", wifi.radio.ipv4_address)
print("Using MQTT broker: {}:{}".format(mqtt_settings["broker"], mqtt_settings["port"]))
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT(**mqtt_settings, socket_pool=pool)
# instantiate the button and it's state
# declare the state as a list because using `global` is a bad habit
button_pin = board.BUTTON # change this accordingly
button = digitalio.DigitalInOut(button_pin)
button.switch_to_input(digitalio.Pull.UP)
button_value = button.value # for edge detection
# create the objects that describe our device
device = HomieDevice(mqtt_client, "my device name", "lib-button-id")
switch_node = HomieNode("light", "Typical Lamp")
switch_property = PropertyBool("switch", settable=True)
# append the objects to the device's attributes
switch_node.properties.append(switch_property)
device.nodes.append(switch_node)
# add a callback to remotely control the switch_property
def change_state(client: MQTT, topic: str, message: str):
"""Change the switch's state according to message from the broker."""
print("--> broker said switch_state is now", message)
# echo confirmation back to broker
switch_state = device.set_property(switch_property, message)
pixel.fill((0, 255, 0) if switch_state else (255, 0, 0))
print("<-- switch state is now", switch_state)
switch_property.callback = change_state
def on_disconnected(client: MQTT, user_data, rc):
"""Callback invoked when connection to broker is terminated."""
print("Reconnecting to the broker.")
client.reconnect()
device.set_state("ready")
mqtt_client.on_disconnect = on_disconnected
mqtt_client.on_connect = lambda *args: print("Connected to the MQTT broker!")
# connect to the broker and publish/subscribe the device's topics
device.begin(keep_alive=3000)
# keep_alive must be set to avoid the device's `$state` being considered "lost"
# a forever loop
try:
refresh_last = time.time()
while True:
try:
# toggle the switch's state on button press
if button_value != button.value and not button.value:
button_value = button.value
state = device.set_property(switch_property, not switch_property.value)
print("button was pressed", "<(!)>" if state else " (-) ")
pixel.fill((0, 255, 0) if state else (255, 0, 0))
elif button.value and button_value != button.value:
button_value = button.value
now = time.time()
if now - refresh_last >= 1: # refresh every 1 seconds
refresh_last = now
assert mqtt_client.is_connected()
mqtt_client.loop()
except MMQTTException:
print("!!! Connection with broker is lost.")
except KeyboardInterrupt:
device.set_state("disconnected")
mqtt_client.on_disconnect = lambda *args: print("Disconnected from broker")
mqtt_client.deinit()
|