Skip to content

Broker compatibility: add option for brokers not 100% mqtt compliant #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def __init__(
# Variable to keep track of latest published packet
self._timestamp_last_publish = datetime.now()

# load special settings for borker compatibility
self.max_qos = settings.mqtt_max_qos_supported
self.retain_supported = not settings.mqtt_retain_flag_not_supported

self._client = mqtt.Client(
client_id=settings.gateway_id,
clean_session=not settings.mqtt_persist_session,
Expand All @@ -65,6 +69,7 @@ def __init__(

self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_publish = self._on_publish

if last_will_topic is not None and last_will_data is not None:
Expand Down Expand Up @@ -102,6 +107,10 @@ def _on_connect(self, client, userdata, flags, rc):
if self.on_connect_cb is not None:
self.on_connect_cb()

def _on_disconnect(self, client, userdata, rc):
if rc != 0:
self.logger.error("MQTT disconnect: %s (%s)", connack_string(rc), rc)

def _on_publish(self, client, userdata, mid):
self._unpublished_mid_set.remove(mid)
self._timestamp_last_publish = datetime.now()
Expand Down Expand Up @@ -196,7 +205,7 @@ def _get_socket(self):

def _set_last_will(self, topic, data):
# Set Last wil message
self._client.will_set(topic, data, qos=2, retain=True)
self._client.will_set(topic, data, qos=1, retain=self.retain_supported)

def run(self):
self.running = True
Expand Down Expand Up @@ -238,6 +247,13 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
retain: Is it a retain message

"""
# Limit qos in case broker has a limit
if qos > self.max_qos:
qos = self.max_qos

# Clear retain flag if not supported
retain = retain and self.retain_supported

mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
if self.publish_queue_size == 0:
# Reset last published packet
Expand All @@ -250,14 +266,32 @@ def publish(self, topic, payload, qos=1, retain=False) -> None:
Args:
topic: Topic to publish on
payload: Payload
qos: Qos to use
retain: Is it a retain message
qos: Qos to use. Can be less than requested if broker does
not support it
retain: Is it a retain message. Can be discarded if broker
does not support it

"""
# No need to check qos or retain at this stage as
# done later in real publish to broker

# Send it to the queue to be published from Mqtt thread
self._publish_queue.put((topic, payload, qos, retain))

def subscribe(self, topic, cb, qos=2) -> None:
""" Method to subscribe to mqtt topic

Args:
topic: Topic to publish on
cb: Callback to call on message reception
qos: Qos to use. Can be less than requested if broker does
not support it

"""
# Limit qos in case broker has a limit
if qos > self.max_qos:
qos = self.max_qos

self.logger.debug("Subscribing to: {}".format(topic))
self._client.subscribe(topic, qos)
self._client.message_callback_add(topic, cb)
Expand Down
4 changes: 4 additions & 0 deletions python_transport/wirepas_gateway/protocol/topic_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def make_otap_process_scratchpad_request_topic(gw_id="+", sink_id="+"):
def make_get_gateway_info_request_topic(gw_id):
return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)])

@staticmethod
def make_get_gateway_status_request_topic():
return TopicGenerator._make_request_topic("get_gw_status", [])

##################
# Response Part
##################
Expand Down
22 changes: 20 additions & 2 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def __init__(self, settings, logger=None, **kwargs):
self.gw_model = settings.gateway_model
self.gw_version = settings.gateway_version

# Does broker support retain flag
self.retain_supported = not settings.mqtt_retain_flag_not_supported

self.whitened_ep_filter = settings.whitened_endpoints_filter

last_will_topic = TopicGenerator.make_status_topic(self.gw_id)
Expand Down Expand Up @@ -253,8 +256,9 @@ def _set_status(self):
)

topic = TopicGenerator.make_status_topic(self.gw_id)

self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
self.mqtt_wrapper.publish(
topic, event_online.payload, qos=1, retain=self.retain_supported
)

def _on_connect(self):
# Register for get gateway info
Expand Down Expand Up @@ -291,6 +295,14 @@ def _on_connect(self):
topic, self._on_otap_process_scratchpad_request_received
)

if not self.retain_supported:
# In case retain flag is not supported, it will allow gateway to
# resend its status every time a backend ask for it
# If retain is supported, no need to subscribe as backend will
# already receive the last retained status
topic = TopicGenerator.make_get_gateway_status_request_topic()
self.mqtt_wrapper.subscribe(topic, self._on_get_status_request_received)

self._set_status()

self.logger.info("MQTT connected!")
Expand Down Expand Up @@ -646,6 +658,12 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_get_status_request_received(self, client, userdata, message):
# This particular message has no payload
self.logger.debug("Get status request received")
self._set_status()


def parse_setting_list(list_setting):
""" This function parse ep list specified from setting file or cmd line
Expand Down
22 changes: 22 additions & 0 deletions python_transport/wirepas_gateway/utils/argument_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,28 @@ def add_mqtt(self):
),
)

self.mqtt.add_argument(
"--mqtt_max_qos_supported",
default=os.environ.get("WM_SERVICES_MQTT_MAX_QOS_SUPPORTED", 2),
action="store",
type=self.str2int,
help=(
"Max qos supported by broker in case it has limitation"
"compare to MQTT specification that is 2."
),
)

self.mqtt.add_argument(
"--mqtt_retain_flag_not_supported",
default=bool(
os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False)
),
type=self.str2bool,
nargs="?",
const=True,
help=("Set to false if broker do not support retain flag"),
)

def add_buffering_settings(self):
""" Parameters used to avoid black hole case """
self.buffering.add_argument(
Expand Down