Skip to content

Commit 415fe93

Browse files
authored
Process the mdns messages semi parallel (#537)
1 parent 86d68cc commit 415fe93

File tree

1 file changed

+38
-23
lines changed

1 file changed

+38
-23
lines changed

matter_server/server/device_controller.py

+38-23
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class MatterDeviceController:
101101
"""Class that manages the Matter devices."""
102102

103103
chip_controller: ChipDeviceController | None
104+
fabric_id_hex: str
104105

105106
def __init__(
106107
self,
@@ -123,7 +124,9 @@ def __init__(
123124
self._resolve_lock = asyncio.Lock()
124125
self._aiobrowser: AsyncServiceBrowser | None = None
125126
self._aiozc: AsyncZeroconf | None = None
126-
self._mdns_inprogress: set[int] = set()
127+
self._mdns_queues: dict[
128+
str, tuple[asyncio.Queue[ServiceStateChange], asyncio.Task]
129+
] = {}
127130

128131
async def initialize(self) -> None:
129132
"""Async initialize of controller."""
@@ -134,9 +137,10 @@ async def initialize(self) -> None:
134137
self.chip_controller = self.server.stack.fabric_admin.NewController(
135138
paaTrustStorePath=str(PAA_ROOT_CERTS_DIR)
136139
)
137-
self.compressed_fabric_id = await self._call_sdk(
138-
self.chip_controller.GetCompressedFabricId
140+
self.compressed_fabric_id = cast(
141+
int, await self._call_sdk(self.chip_controller.GetCompressedFabricId)
139142
)
143+
self.fabric_id_hex = hex(self.compressed_fabric_id)[2:]
140144
LOGGER.debug("CHIP Device Controller Initialized")
141145

142146
async def start(self) -> None:
@@ -181,7 +185,10 @@ async def stop(self) -> None:
181185
for sub in self._subscriptions.values():
182186
await self._call_sdk(sub.Shutdown)
183187
self._subscriptions = {}
184-
# shutdown mdns browser
188+
# shutdown (and cleanup) mdns browser
189+
for key in tuple(self._mdns_queues.keys()):
190+
_, mdns_task = self._mdns_queues.pop(key)
191+
mdns_task.cancel()
185192
if self._aiobrowser:
186193
await self._aiobrowser.async_cancel()
187194
if self._aiozc:
@@ -1189,38 +1196,46 @@ def _on_mdns_service_state_change(
11891196
)
11901197
return
11911198
if service_type == MDNS_TYPE_OPERATIONAL_NODE:
1192-
asyncio.create_task(
1193-
self._on_mdns_operational_node_state(name, state_change)
1194-
)
1195-
1196-
async def _on_mdns_operational_node_state(
1197-
self, name: str, state_change: ServiceStateChange
1199+
name = name.lower()
1200+
if not name.startswith(self.fabric_id_hex):
1201+
# filter out messages that are not for our fabric
1202+
return
1203+
if existing := self._mdns_queues.get(name):
1204+
queue = existing[0]
1205+
else:
1206+
# we want mdns messages to be processes sequentially PER NODE but in
1207+
# PARALLEL overall, hence we create a node specific mdns queue per mdns name.
1208+
queue = asyncio.Queue()
1209+
task = asyncio.create_task(self._process_mdns_queue(name, queue))
1210+
self._mdns_queues[name] = (queue, task)
1211+
queue.put_nowait(state_change)
1212+
1213+
async def _process_mdns_queue(
1214+
self, name: str, queue: asyncio.Queue[ServiceStateChange]
11981215
) -> None:
1199-
"""Handle a (operational) Matter node MDNS state change."""
1216+
"""Process the incoming MDNS messages of an (operational) Matter node."""
12001217
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
12011218
# extract the node id from the name
12021219
node_id = int(name.split("-")[1].split(".")[0], 16)
1203-
if node_id not in self._nodes:
1204-
return # should not happen, but just in case
1205-
if node_id in self._mdns_inprogress:
1206-
# mdns records can potentially arrive multiplied so debounce any duplicates
1207-
return
1208-
try:
1209-
self._mdns_inprogress.add(node_id)
1220+
while True:
1221+
state_change = await queue.get()
1222+
if node_id not in self._nodes:
1223+
continue # this should not happen, but just in case
12101224
node = self._nodes[node_id]
1211-
if state_change in (ServiceStateChange.Added, ServiceStateChange.Updated):
1225+
if state_change in (
1226+
ServiceStateChange.Added,
1227+
ServiceStateChange.Updated,
1228+
):
12121229
if node.available:
1213-
return # node is already set-up, no action needed
1230+
continue # node is already set-up, no action needed
12141231
LOGGER.info("Node %s discovered on MDNS", node_id)
12151232
# setup the node
12161233
await self._check_interview_and_subscription(node_id)
12171234
elif state_change == ServiceStateChange.Removed:
12181235
if not node.available:
1219-
return # node is already offline, nothing to do
1236+
continue # node is already offline, nothing to do
12201237
LOGGER.info("Node %s vanished according to MDNS", node_id)
12211238
await self._node_offline(node_id)
1222-
finally:
1223-
self._mdns_inprogress.remove(node_id)
12241239

12251240
async def _on_mdns_commissionable_node_state(
12261241
self, name: str, state_change: ServiceStateChange

0 commit comments

Comments
 (0)