Skip to content

Commit 1964ffa

Browse files
committed
Fix unawaited critical_tasks cancel, use None for infinite reconnects.
1 parent 0ee7067 commit 1964ffa

File tree

7 files changed

+49
-21
lines changed

7 files changed

+49
-21
lines changed

Makefile

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ graphs:
1212
dot -Tsvg -O interrupt_callbacks.dot
1313

1414
coverage:
15-
coverage run --source "." --omit "mqtt_io/tests/*,mqtt_io/modules/*" -m behave mqtt_io/tests/features -t ~skip
16-
coverage report -m
15+
poetry run coverage run --source "." --omit "mqtt_io/tests/*,mqtt_io/modules/*" -m behave mqtt_io/tests/features -t ~skip
16+
poetry run coverage report -m
1717

1818
lint:
19-
pylint -d fixme mqtt_io
20-
mypy --show-error-codes --strict --no-warn-unused-ignores mqtt_io
19+
poetry run pylint -d fixme mqtt_io
20+
poetry run mypy --show-error-codes --strict --no-warn-unused-ignores mqtt_io
2121

2222
build:
2323
poetry build

config.stdio.yml

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
mqtt:
2-
host: test.mosquitto.org
2+
host: localhost
33
topic_prefix: mqtt_io
44
clean_session: yes
55
protocol: 3.1.1
6+
keepalive: 10
7+
reconnect_delay: 4
8+
reconnect_count: 4
69

710
gpio_modules:
811
- name: stdio
@@ -25,6 +28,7 @@ digital_inputs:
2528
interrupt: rising
2629
interrupt_for:
2730
- test
31+
poll_interval: 2
2832

2933
digital_outputs:
3034
- name: bell
@@ -35,6 +39,15 @@ digital_outputs:
3539
publish_initial: yes
3640
initial: high
3741

42+
sensor_modules:
43+
- name: mock
44+
module: mock
45+
46+
sensor_inputs:
47+
- name: mock_sensor
48+
module: mock
49+
interval: 2
50+
3851
logging:
3952
version: 1
4053
handlers:

docs/config/reference/mqtt/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -433,13 +433,13 @@ Default: 2
433433
*mqtt*.**reconnect_count**
434434

435435
Max number of retries of connections before giving up and exiting.
436-
-1 (the default) means infinite reconnects.
436+
Null value means infinite reconnects (default).
437437
The counter is reset when the connection is reestablished successfully.
438438

439439

440440
```yaml
441441
Type: integer
442442
Required: False
443-
Default: -1
443+
Default: None
444444
```
445445

docs/schema.json

+4-3
Original file line numberDiff line numberDiff line change
@@ -284,13 +284,14 @@
284284
},
285285
"reconnect_count": {
286286
"meta": {
287-
"description": "Max number of retries of connections before giving up and exiting.\n-1 (the default) means infinite reconnects.\nThe counter is reset when the connection is reestablished successfully.\n",
287+
"description": "Max number of retries of connections before giving up and exiting.\nNull value means infinite reconnects (default).\nThe counter is reset when the connection is reestablished successfully.\n",
288288
"title_id": "mqtt-reconnect_count"
289289
},
290290
"type": "integer",
291291
"required": false,
292-
"default": -1,
293-
"min": -1
292+
"default": null,
293+
"nullable": true,
294+
"min": 0
294295
}
295296
}
296297
},

mqtt_io/config/config.schema.yml

+4-3
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,13 @@ mqtt:
262262
meta:
263263
description: |
264264
Max number of retries of connections before giving up and exiting.
265-
-1 (the default) means infinite reconnects.
265+
Null value means infinite reconnects (default).
266266
The counter is reset when the connection is reestablished successfully.
267267
type: integer
268268
required: no
269-
default: -1
270-
min: -1
269+
default: null
270+
nullable: yes
271+
min: 0
271272
gpio_modules:
272273
meta:
273274
description: |

mqtt_io/mqtt/asyncio_mqtt.py

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(self, options: MQTTClientOptions):
6666
username=options.username,
6767
password=options.password,
6868
client_id=options.client_id,
69+
# keepalive=options.keepalive, # This isn't implemented yet on 0.8.1
6970
tls_context=tls_context,
7071
protocol=protocol_map[options.protocol],
7172
will=will,

mqtt_io/server.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,8 @@ async def _mqtt_task_loop(self) -> None:
10301030
)
10311031
try:
10321032
await entry.coro
1033+
except MQTTException:
1034+
raise
10331035
except Exception: # pylint: disable=broad-except
10341036
_LOG.exception("Exception while handling MQTT task:")
10351037
self.mqtt_task_queue.task_done()
@@ -1149,13 +1151,12 @@ async def stream_output_loop(
11491151
self.event_bus.fire(StreamDataSentEvent(stream_conf["name"], data))
11501152

11511153
async def _main_loop(self) -> None:
1152-
counter = self.config["mqtt"].get("reconnect_count")
11531154
reconnect = True
1155+
reconnect_delay = self.config["mqtt"]["reconnect_delay"]
11541156
while reconnect:
11551157
try:
11561158
await self._connect_mqtt()
1157-
# Reset counter
1158-
counter = self.config["mqtt"].get("reconnect_count")
1159+
reconnects_remaining = self.config["mqtt"]["reconnect_count"]
11591160
self.critical_tasks = [
11601161
self.loop.create_task(coro)
11611162
for coro in (
@@ -1179,18 +1180,29 @@ async def _main_loop(self) -> None:
11791180
except asyncio.CancelledError:
11801181
break
11811182
except MQTTException:
1182-
reconnect = counter > 0
1183-
if counter > 0:
1184-
counter -= 1
1183+
if reconnects_remaining is not None:
1184+
reconnect = reconnects_remaining > 0
1185+
reconnects_remaining -= 1
11851186
_LOG.exception("Connection to MQTT broker failed")
11861187

11871188
finally:
1189+
_LOG.debug("Clearing events and cancelling 'critical_tasks'")
11881190
self.running.clear()
11891191
self.mqtt_connected.clear()
11901192
if self.critical_tasks:
1191-
asyncio.gather(*self.critical_tasks, return_exceptions=True).cancel()
1193+
for task in self.critical_tasks:
1194+
task.cancel()
1195+
await asyncio.gather(*self.critical_tasks, return_exceptions=True)
1196+
_LOG.debug("'critical_tasks' cancelled and gathered")
11921197
if reconnect:
1193-
await asyncio.sleep(self.config["mqtt"].get("reconnect_delay"))
1198+
_LOG.debug(
1199+
"Waiting for MQTT reconnect delay (%s second(s))", reconnect_delay
1200+
)
1201+
await asyncio.sleep(reconnect_delay)
1202+
_LOG.info(
1203+
"Reconnecting to MQTT broker (%s retries remaining)",
1204+
"infinite" if reconnects_remaining is None else reconnects_remaining,
1205+
)
11941206
await self.shutdown()
11951207

11961208
# Main entry point

0 commit comments

Comments
 (0)