-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqtt_if.py
179 lines (148 loc) · 6.49 KB
/
mqtt_if.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""Manage MQTT connexion, subscription and publishing."""
import os
import ssl
from typing import Optional, Literal
import uritools
import paho.mqtt.client as mqtt
MYUSERNAME = "admin"
MYPASSWORD = "changeme"
DEFAULT_INSECURE_MQTT_PORT = 1883
DEFAULT_TLS_MQTT_PORT = 8883
QosValue = Literal[0, 1, 2]
# pylint: disable=too-many-instance-attributes
class MQTTInterface:
"""MQTT Interface management."""
__instance = None
def __new__(cls):
if MQTTInterface.__instance is None:
MQTTInterface.__instance = object.__new__(cls)
return MQTTInterface.__instance
def __init__(self):
if "server" in self.__dict__:
return
super().__init__()
self.server = None
self.port = None
self.mqtts = False
self.client_name = f"enki_{os.getpid()}"
self.client = mqtt.Client(client_id=self.client_name, protocol=mqtt.MQTTv311,
clean_session=True, reconnect_on_failure=True,
userdata=self, transport="tcp")
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.username_pw_set(MYUSERNAME, MYPASSWORD)
self.client.reconnect_delay_set(1, 10)
self.subscribed_topics = ["spBv1.0/#"]
self.forwarded_topics = {}
self.connect_callback = None
self.disconnect_callback = None
self.message_callback = None
self._is_connected = False
self._will_topic = None
self._will_payload = None
self._will_qos = 0
@property
def is_connected(self):
"""True if connected to the MQTT broker."""
return self._is_connected
def set_server(self, server, port, certificate=None):
"""Set mqtt server address and port."""
self.server = server
self.port = port
if certificate is not None:
self.client.tls_set(certificate, tls_version=ssl.PROTOCOL_TLSv1_2)
def set_server_from_uri(self, server_uri, certificate=None):
"""Set mqtt server from its URI."""
assert uritools.isuri(server_uri), f"{server_uri} is not a valid URI"
uri = uritools.urisplit(server_uri)
self.mqtts = uri.scheme == "mqtts"
if self.mqtts:
port = uri.getport(default=DEFAULT_TLS_MQTT_PORT)
else:
port = uri.getport(default=DEFAULT_INSECURE_MQTT_PORT)
host = str(uri.gethost())
self.set_server(host, port, certificate)
def get_uri(self) -> str:
"""Returns the broker's URI."""
scheme = "mqtts" if self.mqtts else "mqtt"
return f"{scheme}://{self.server}:{self.port}"
def get_subscribed_topics(self):
"""Get list of topics subscribed to."""
return self.subscribed_topics
def set_will(self, will_topic: str, will_payload: Optional[bytearray], qos: QosValue) -> None:
"""Sets the MQTT Last Will and Testament topic and payload.
The payload will be automatically published on the given topic
by the MQTT broker if the client is unexpectedly disconnected.
:param will_topic: The topic to publish to.
:param will_payload: Optional payload to publish. If None, a zero-length message
will be published.
:param qos: The QoS to use to publish the Last Will and Testament."""
self._will_topic = will_topic
self._will_payload = will_payload
self.client.will_set(self._will_topic, self._will_payload, qos)
def get_will(self) -> (Optional[str], Optional[bytearray], QosValue):
"""Returns the MQTT Last Will and Testament topic and payload
that are set for the MQTT client.
:return: A tuple with an optional topic, an optional payload and a qos"""
return self._will_topic, self._will_payload, self._will_qos
def subscribe(self, topic):
"""Subscribe to a topic."""
if topic not in self.subscribed_topics:
self.client.subscribe(topic)
self.subscribed_topics.append(topic)
def unsubscribe(self, topic):
"""Unsubscribe from topic."""
if topic in self.subscribed_topics:
self.client.unsubscribe(topic)
self.subscribed_topics.remove(topic)
def publish(self, topic, byte_array, qos, retain):
"""Publish message on topic."""
self.client.publish(topic, byte_array, qos, retain)
def forward_topic(self, topic, q_io):
"""Forward messages received on topic to queue"""
self.forwarded_topics[topic] = q_io
def stop_forwarding(self, topic):
"""Stop forwarding messages received on topic"""
self.forwarded_topics.pop(topic)
@staticmethod
def on_connect(client, userdata, _flags, ret):
"""The callback for when the client receives a CONNACK response from the server."""
if ret == 0:
# pylint: disable-next=protected-access
userdata._is_connected = True
if userdata.connect_callback is not None:
userdata.connect_callback(ret)
if ret == 0:
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
for topic in userdata.subscribed_topics:
client.subscribe(topic)
@staticmethod
def on_disconnect(_client, userdata, result):
"""The callback for when the client is disconnected from the server
or a connection attempt fails."""
# pylint: disable-next=protected-access
userdata._is_connected = False
if userdata.disconnect_callback is not None:
userdata.disconnect_callback(result)
@staticmethod
def on_message(_client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
if userdata.message_callback:
userdata.message_callback(userdata, msg)
for (topic, q_io) in userdata.forwarded_topics.items():
if mqtt.topic_matches_sub(topic, msg.topic):
q_io.put(msg)
def join(self, _timeout=None):
"""Wait for the MQTT loop to stop."""
self.client.loop_stop()
MQTTInterface.__instance = None
def start(self):
"""Connect to the broker and start mqtt loop."""
self.client.connect_async(self.server, self.port, 60)
self.client.loop_start()
def stop(self):
"""Stop loop and disconnect from the broker."""
self.client.loop_stop()
self.client.disconnect()