Skip to content

Commit 6efb530

Browse files
committed
Broker compatibility: add option for brokers not 100% mqtt compliant
Some brokers (like AWS iot core) do not support qos=2 or retain messages. Add option to transport to specify the max qos value to use and also if the retain flag is supported by broker. In case of retain flag, behavior of transport is a bit different.
1 parent cff0f16 commit 6efb530

File tree

4 files changed

+83
-5
lines changed

4 files changed

+83
-5
lines changed

python_transport/wirepas_gateway/protocol/mqtt_wrapper.py

+37-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ def __init__(
4444
# Variable to keep track of latest published packet
4545
self._timestamp_last_publish = datetime.now()
4646

47+
# load special settings for borker compatibility
48+
self.max_qos = settings.mqtt_max_qos_supported
49+
self.retain_supported = not settings.mqtt_retain_flag_not_supported
50+
4751
self._client = mqtt.Client(
4852
client_id=settings.gateway_id,
4953
clean_session=not settings.mqtt_persist_session,
@@ -65,6 +69,7 @@ def __init__(
6569

6670
self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
6771
self._client.on_connect = self._on_connect
72+
self._client.on_disconnect = self._on_disconnect
6873
self._client.on_publish = self._on_publish
6974

7075
if last_will_topic is not None and last_will_data is not None:
@@ -102,6 +107,10 @@ def _on_connect(self, client, userdata, flags, rc):
102107
if self.on_connect_cb is not None:
103108
self.on_connect_cb()
104109

110+
def _on_disconnect(self, client, userdata, rc):
111+
if rc != 0:
112+
self.logger.error("MQTT disconnect: %s (%s)", connack_string(rc), rc)
113+
105114
def _on_publish(self, client, userdata, mid):
106115
self._unpublished_mid_set.remove(mid)
107116
self._timestamp_last_publish = datetime.now()
@@ -196,7 +205,7 @@ def _get_socket(self):
196205

197206
def _set_last_will(self, topic, data):
198207
# Set Last wil message
199-
self._client.will_set(topic, data, qos=2, retain=True)
208+
self._client.will_set(topic, data, qos=1, retain=self.retain_supported)
200209

201210
def run(self):
202211
self.running = True
@@ -238,6 +247,13 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
238247
retain: Is it a retain message
239248
240249
"""
250+
# Limit qos in case broker has a limit
251+
if qos > self.max_qos:
252+
qos = self.max_qos
253+
254+
# Clear retain flag if not supported
255+
retain = retain and self.retain_supported
256+
241257
mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
242258
if self.publish_queue_size == 0:
243259
# Reset last published packet
@@ -250,14 +266,32 @@ def publish(self, topic, payload, qos=1, retain=False) -> None:
250266
Args:
251267
topic: Topic to publish on
252268
payload: Payload
253-
qos: Qos to use
254-
retain: Is it a retain message
269+
qos: Qos to use. Can be less than requested if broker does
270+
not support it
271+
retain: Is it a retain message. Can be discarded if broker
272+
does not support it
255273
256274
"""
275+
# No need to check qos or retain at this stage as
276+
# done later in real publish to broker
277+
257278
# Send it to the queue to be published from Mqtt thread
258279
self._publish_queue.put((topic, payload, qos, retain))
259280

260281
def subscribe(self, topic, cb, qos=2) -> None:
282+
""" Method to subscribe to mqtt topic
283+
284+
Args:
285+
topic: Topic to publish on
286+
cb: Callback to call on message reception
287+
qos: Qos to use. Can be less than requested if broker does
288+
not support it
289+
290+
"""
291+
# Limit qos in case broker has a limit
292+
if qos > self.max_qos:
293+
qos = self.max_qos
294+
261295
self.logger.debug("Subscribing to: {}".format(topic))
262296
self._client.subscribe(topic, qos)
263297
self._client.message_callback_add(topic, cb)

python_transport/wirepas_gateway/protocol/topic_helper.py

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ def make_otap_process_scratchpad_request_topic(gw_id="+", sink_id="+"):
6868
def make_get_gateway_info_request_topic(gw_id):
6969
return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)])
7070

71+
@staticmethod
72+
def make_get_gateway_status_request_topic():
73+
return TopicGenerator._make_request_topic("get_gw_status", [])
74+
7175
##################
7276
# Response Part
7377
##################

python_transport/wirepas_gateway/transport_service.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ def __init__(self, settings, logger=None, **kwargs):
197197
self.gw_model = settings.gateway_model
198198
self.gw_version = settings.gateway_version
199199

200+
# Does broker support retain flag
201+
self.retain_supported = not settings.mqtt_retain_flag_not_supported
202+
200203
self.whitened_ep_filter = settings.whitened_endpoints_filter
201204

202205
last_will_topic = TopicGenerator.make_status_topic(self.gw_id)
@@ -253,8 +256,9 @@ def _set_status(self):
253256
)
254257

255258
topic = TopicGenerator.make_status_topic(self.gw_id)
256-
257-
self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
259+
self.mqtt_wrapper.publish(
260+
topic, event_online.payload, qos=1, retain=self.retain_supported
261+
)
258262

259263
def _on_connect(self):
260264
# Register for get gateway info
@@ -291,6 +295,14 @@ def _on_connect(self):
291295
topic, self._on_otap_process_scratchpad_request_received
292296
)
293297

298+
if not self.retain_supported:
299+
# In case retain flag is not supported, it will allow gateway to resend its status
300+
# every time a backend ask for it
301+
# If retain is supported, no need to subscribe as backend will already receive the last
302+
# retain status
303+
topic = TopicGenerator.make_get_gateway_status_request_topic()
304+
self.mqtt_wrapper.subscribe(topic, self._on_get_status_request_received)
305+
294306
self._set_status()
295307

296308
self.logger.info("MQTT connected!")
@@ -646,6 +658,12 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message
646658

647659
self.mqtt_wrapper.publish(topic, response.payload, qos=2)
648660

661+
@deferred_thread
662+
def _on_get_status_request_received(self, client, userdata, message):
663+
# This particular message has no payload
664+
self.logger.debug("Get status request received")
665+
self._set_status()
666+
649667

650668
def parse_setting_list(list_setting):
651669
""" This function parse ep list specified from setting file or cmd line

python_transport/wirepas_gateway/utils/argument_tools.py

+22
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,28 @@ def add_mqtt(self):
344344
),
345345
)
346346

347+
self.mqtt.add_argument(
348+
"--mqtt_max_qos_supported",
349+
default=os.environ.get("WM_SERVICES_MQTT_MAX_QOS_SUPPORTED", 2),
350+
action="store",
351+
type=self.str2int,
352+
help=(
353+
"Max qos supported by broker in case it has limitation"
354+
"compare to MQTT specification that is 2."
355+
),
356+
)
357+
358+
self.mqtt.add_argument(
359+
"--mqtt_retain_flag_not_supported",
360+
default=bool(
361+
os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False)
362+
),
363+
type=self.str2bool,
364+
nargs="?",
365+
const=True,
366+
help=("Set to false if broker do not support retain flag"),
367+
)
368+
347369
def add_buffering_settings(self):
348370
""" Parameters used to avoid black hole case """
349371
self.buffering.add_argument(

0 commit comments

Comments
 (0)