From 7e5021bdfef70e0ee3a2b9c31b2d840d7a0552e9 Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Thu, 2 Apr 2020 11:56:07 +0200 Subject: [PATCH] Broker compatibility: add option for brokers not 100% mqtt compliant Some brokers (like AWS iot core) do not support qos=2 or retain messages. Add option to transport to specify the max qos value to use and also if the retain flag is supported by broker. In case of retain flag, behavior of transport is a bit different. --- .../wirepas_gateway/protocol/mqtt_wrapper.py | 40 +++++++++++++++++-- .../wirepas_gateway/protocol/topic_helper.py | 4 ++ .../wirepas_gateway/transport_service.py | 22 +++++++++- .../wirepas_gateway/utils/argument_tools.py | 22 ++++++++++ 4 files changed, 83 insertions(+), 5 deletions(-) diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 6f2dce2c..8ec96a4a 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -44,6 +44,10 @@ def __init__( # Variable to keep track of latest published packet self._timestamp_last_publish = datetime.now() + # load special settings for borker compatibility + self.max_qos = settings.mqtt_max_qos_supported + self.retain_supported = not settings.mqtt_retain_flag_not_supported + self._client = mqtt.Client( client_id=settings.gateway_id, clean_session=not settings.mqtt_persist_session, @@ -65,6 +69,7 @@ def __init__( self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password) self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect self._client.on_publish = self._on_publish if last_will_topic is not None and last_will_data is not None: @@ -102,6 +107,10 @@ def _on_connect(self, client, userdata, flags, rc): if self.on_connect_cb is not None: self.on_connect_cb() + def _on_disconnect(self, client, userdata, rc): + if rc != 0: + self.logger.error("MQTT disconnect: %s (%s)", connack_string(rc), rc) + def _on_publish(self, client, userdata, mid): self._unpublished_mid_set.remove(mid) self._timestamp_last_publish = datetime.now() @@ -196,7 +205,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=1, retain=self.retain_supported) def run(self): self.running = True @@ -238,6 +247,13 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain): retain: Is it a retain message """ + # Limit qos in case broker has a limit + if qos > self.max_qos: + qos = self.max_qos + + # Clear retain flag if not supported + retain = retain and self.retain_supported + mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid if self.publish_queue_size == 0: # Reset last published packet @@ -250,14 +266,32 @@ def publish(self, topic, payload, qos=1, retain=False) -> None: Args: topic: Topic to publish on payload: Payload - qos: Qos to use - retain: Is it a retain message + qos: Qos to use. Can be less than requested if broker does + not support it + retain: Is it a retain message. Can be discarded if broker + does not support it """ + # No need to check qos or retain at this stage as + # done later in real publish to broker + # Send it to the queue to be published from Mqtt thread self._publish_queue.put((topic, payload, qos, retain)) def subscribe(self, topic, cb, qos=2) -> None: + """ Method to subscribe to mqtt topic + + Args: + topic: Topic to publish on + cb: Callback to call on message reception + qos: Qos to use. Can be less than requested if broker does + not support it + + """ + # Limit qos in case broker has a limit + if qos > self.max_qos: + qos = self.max_qos + self.logger.debug("Subscribing to: {}".format(topic)) self._client.subscribe(topic, qos) self._client.message_callback_add(topic, cb) diff --git a/python_transport/wirepas_gateway/protocol/topic_helper.py b/python_transport/wirepas_gateway/protocol/topic_helper.py index 84a5f51d..76e4cf47 100644 --- a/python_transport/wirepas_gateway/protocol/topic_helper.py +++ b/python_transport/wirepas_gateway/protocol/topic_helper.py @@ -68,6 +68,10 @@ def make_otap_process_scratchpad_request_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_request_topic(gw_id): return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gateway_status_request_topic(): + return TopicGenerator._make_request_topic("get_gw_status", []) + ################## # Response Part ################## diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 9448f2e4..a6bb5497 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -197,6 +197,9 @@ def __init__(self, settings, logger=None, **kwargs): self.gw_model = settings.gateway_model self.gw_version = settings.gateway_version + # Does broker support retain flag + self.retain_supported = not settings.mqtt_retain_flag_not_supported + self.whitened_ep_filter = settings.whitened_endpoints_filter last_will_topic = TopicGenerator.make_status_topic(self.gw_id) @@ -253,8 +256,9 @@ def _set_status(self): ) topic = TopicGenerator.make_status_topic(self.gw_id) - - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish( + topic, event_online.payload, qos=1, retain=self.retain_supported + ) def _on_connect(self): # Register for get gateway info @@ -291,6 +295,14 @@ def _on_connect(self): topic, self._on_otap_process_scratchpad_request_received ) + if not self.retain_supported: + # In case retain flag is not supported, it will allow gateway to + # resend its status every time a backend ask for it + # If retain is supported, no need to subscribe as backend will + # already receive the last retained status + topic = TopicGenerator.make_get_gateway_status_request_topic() + self.mqtt_wrapper.subscribe(topic, self._on_get_status_request_received) + self._set_status() self.logger.info("MQTT connected!") @@ -646,6 +658,12 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread + def _on_get_status_request_received(self, client, userdata, message): + # This particular message has no payload + self.logger.debug("Get status request received") + self._set_status() + def parse_setting_list(list_setting): """ This function parse ep list specified from setting file or cmd line diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index f1651f21..a9cc57b2 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -344,6 +344,28 @@ def add_mqtt(self): ), ) + self.mqtt.add_argument( + "--mqtt_max_qos_supported", + default=os.environ.get("WM_SERVICES_MQTT_MAX_QOS_SUPPORTED", 2), + action="store", + type=self.str2int, + help=( + "Max qos supported by broker in case it has limitation" + "compare to MQTT specification that is 2." + ), + ) + + self.mqtt.add_argument( + "--mqtt_retain_flag_not_supported", + default=bool( + os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False) + ), + type=self.str2bool, + nargs="?", + const=True, + help=("Set to false if broker do not support retain flag"), + ) + def add_buffering_settings(self): """ Parameters used to avoid black hole case """ self.buffering.add_argument(