Skip to content

Commit 8e2cff4

Browse files
committed
Broker compatibility: add option for brokers not 100% mqtt compliant
Some brokers (like AWS iot core) do not support qos=2 or retain messages. Add option to transport to specify the max qos value to use and also if the retain flag is supported by broker. In case of retain flag, behavior of transport is a bit different.
1 parent cff0f16 commit 8e2cff4

File tree

3 files changed

+131
-4
lines changed

3 files changed

+131
-4
lines changed

python_transport/wirepas_gateway/protocol/mqtt_wrapper.py

+39-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(
3030
logger,
3131
on_termination_cb=None,
3232
on_connect_cb=None,
33+
on_disconnect_cb=None,
3334
last_will_topic=None,
3435
last_will_data=None,
3536
):
@@ -39,11 +40,16 @@ def __init__(
3940
self.logger = logger
4041
self.on_termination_cb = on_termination_cb
4142
self.on_connect_cb = on_connect_cb
43+
self.on_disconnect_cb = on_disconnect_cb
4244
# Set to track the unpublished packets
4345
self._unpublished_mid_set = set()
4446
# Variable to keep track of latest published packet
4547
self._timestamp_last_publish = datetime.now()
4648

49+
# load special settings for borker compatibility
50+
self.max_qos = settings.mqtt_max_qos_supported
51+
self.retain_not_supported = settings.mqtt_retain_flag_not_supported
52+
4753
self._client = mqtt.Client(
4854
client_id=settings.gateway_id,
4955
clean_session=not settings.mqtt_persist_session,
@@ -65,6 +71,7 @@ def __init__(
6571

6672
self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
6773
self._client.on_connect = self._on_connect
74+
self._client.on_disconnect = self._on_disconnect
6875
self._client.on_publish = self._on_publish
6976

7077
if last_will_topic is not None and last_will_data is not None:
@@ -102,6 +109,13 @@ def _on_connect(self, client, userdata, flags, rc):
102109
if self.on_connect_cb is not None:
103110
self.on_connect_cb()
104111

112+
def _on_disconnect(self, client, userdata, rc):
113+
if rc != 0:
114+
self.logger.error("MQTT disconnect: %s (%s)", connack_string(rc), rc)
115+
116+
if self.on_disconnect_cb is not None:
117+
self.on_disconnect_cb()
118+
105119
def _on_publish(self, client, userdata, mid):
106120
self._unpublished_mid_set.remove(mid)
107121
self._timestamp_last_publish = datetime.now()
@@ -196,7 +210,7 @@ def _get_socket(self):
196210

197211
def _set_last_will(self, topic, data):
198212
# Set Last wil message
199-
self._client.will_set(topic, data, qos=2, retain=True)
213+
self._client.will_set(topic, data, qos=1, retain=not self.retain_not_supported)
200214

201215
def run(self):
202216
self.running = True
@@ -238,6 +252,14 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
238252
retain: Is it a retain message
239253
240254
"""
255+
# Limit qos in case broker has a limit
256+
if qos > self.max_qos:
257+
qos = self.max_qos
258+
259+
# Clear retain flag if not supported
260+
if self.retain_not_supported:
261+
retain = False
262+
241263
mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
242264
if self.publish_queue_size == 0:
243265
# Reset last published packet
@@ -250,14 +272,28 @@ def publish(self, topic, payload, qos=1, retain=False) -> None:
250272
Args:
251273
topic: Topic to publish on
252274
payload: Payload
253-
qos: Qos to use
254-
retain: Is it a retain message
275+
qos: Qos to use. Can be less than requested if broker does not support it
276+
retain: Is it a retain message. Can be discarded if broker does not support it
255277
256278
"""
279+
# No need to check qos or retain at this stage as done in real publish to broker
280+
257281
# Send it to the queue to be published from Mqtt thread
258282
self._publish_queue.put((topic, payload, qos, retain))
259283

260284
def subscribe(self, topic, cb, qos=2) -> None:
285+
""" Method to subscribe to mqtt topic
286+
287+
Args:
288+
topic: Topic to publish on
289+
cb: Callback to call on message reception
290+
qos: Qos to use. Can be less than requested if broker does not support it
291+
292+
"""
293+
# Limit qos in case broker has a limit
294+
if qos > self.max_qos:
295+
qos = self.max_qos
296+
261297
self.logger.debug("Subscribing to: {}".format(topic))
262298
self._client.subscribe(topic, qos)
263299
self._client.message_callback_add(topic, cb)

python_transport/wirepas_gateway/transport_service.py

+70-1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,48 @@ def initialize_sink(self, name):
168168
sink.cost = self.minimum_sink_cost
169169

170170

171+
class PeriodicPublishStatusThread(Thread):
172+
def __init__(self, logger, period, mqtt_wrapper, topic, payload):
173+
"""
174+
Thread to periodically publish gateway status if retain flag is not
175+
supported by broker.
176+
177+
Args:
178+
logger: logger used to log messages
179+
period: the period to check the buffer status
180+
mqtt_wrapper: the mqtt wrapper to get access to queue level
181+
"""
182+
Thread.__init__(self)
183+
184+
self.logger = logger
185+
186+
# Daemonize thread to exit with full process
187+
self.daemon = True
188+
self.running = False
189+
190+
# How often to publish
191+
self.period = period
192+
self.mqtt_wrapper = mqtt_wrapper
193+
self.topic = topic
194+
self.payload = payload
195+
196+
def run(self):
197+
"""
198+
Main loop to publish status
199+
"""
200+
self.running = True
201+
while self.running:
202+
self.logger.debug("Publish status")
203+
self.mqtt_wrapper.publish(self.topic, self.payload, qos=1, retain=False)
204+
sleep(self.period)
205+
206+
def stop(self):
207+
"""
208+
Stop the publish script
209+
"""
210+
self.running = False
211+
212+
171213
class TransportService(BusClient):
172214
"""
173215
Implementation of gateway to backend protocol
@@ -182,6 +224,9 @@ class TransportService(BusClient):
182224
# Period in s to check for black hole issue
183225
MONITORING_BUFFERING_PERIOD_S = 1
184226

227+
# Period to send status in case broker do not support retain flag
228+
GATEWAY_STATUS_PERIOD_S = 30
229+
185230
def __init__(self, settings, logger=None, **kwargs):
186231
self.logger = logger or logging.getLogger(__name__)
187232
self.logger.info("Version is: %s", transport_version)
@@ -197,6 +242,10 @@ def __init__(self, settings, logger=None, **kwargs):
197242
self.gw_model = settings.gateway_model
198243
self.gw_version = settings.gateway_version
199244

245+
# Does broker support retain flag
246+
self.retain_not_supported = settings.mqtt_retain_flag_not_supported
247+
self.status_publish_thread = None
248+
200249
self.whitened_ep_filter = settings.whitened_endpoints_filter
201250

202251
last_will_topic = TopicGenerator.make_status_topic(self.gw_id)
@@ -209,6 +258,7 @@ def __init__(self, settings, logger=None, **kwargs):
209258
self.logger,
210259
self._on_mqtt_wrapper_termination_cb,
211260
self._on_connect,
261+
self._on_disconnect,
212262
last_will_topic,
213263
last_will_message,
214264
)
@@ -254,7 +304,20 @@ def _set_status(self):
254304

255305
topic = TopicGenerator.make_status_topic(self.gw_id)
256306

257-
self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
307+
if self.retain_not_supported:
308+
if self.status_publish_thread is None:
309+
self.status_publish_thread = PeriodicPublishStatusThread(
310+
self.logger,
311+
self.GATEWAY_STATUS_PERIOD_S,
312+
self.mqtt_wrapper,
313+
topic,
314+
event_online.payload,
315+
)
316+
self.status_publish_thread.start()
317+
else:
318+
self.logger.warning("Publish thread already running")
319+
else:
320+
self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
258321

259322
def _on_connect(self):
260323
# Register for get gateway info
@@ -295,6 +358,12 @@ def _on_connect(self):
295358

296359
self.logger.info("MQTT connected!")
297360

361+
def _on_disconnect(self):
362+
self.logger.info("MQTT disconnected!")
363+
if self.status_publish_thread is not None:
364+
self.status_publish_thread.stop()
365+
self.status_publish_thread = None
366+
298367
def on_data_received(
299368
self,
300369
sink_id,

python_transport/wirepas_gateway/utils/argument_tools.py

+22
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,28 @@ def add_mqtt(self):
344344
),
345345
)
346346

347+
self.mqtt.add_argument(
348+
"--mqtt_max_qos_supported",
349+
default=os.environ.get("WM_SERVICES_MQTT_MAX_QOS_SUPPORTED", 2),
350+
action="store",
351+
type=self.str2int,
352+
help=(
353+
"Max qos supported by broker in case it has limitation"
354+
"compare to MQTT specification that is 2."
355+
),
356+
)
357+
358+
self.mqtt.add_argument(
359+
"--mqtt_retain_flag_not_supported",
360+
default=bool(
361+
os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_SUPPORTED", False)
362+
),
363+
type=self.str2bool,
364+
nargs="?",
365+
const=True,
366+
help=("Set to false if broker do not support retain flag"),
367+
)
368+
347369
def add_buffering_settings(self):
348370
""" Parameters used to avoid black hole case """
349371
self.buffering.add_argument(

0 commit comments

Comments
 (0)