Skip to content

Commit efeccd8

Browse files
authoredFeb 7, 2024
Implement zeroconf for operational node discovery (#531)
1 parent 7982b6f commit efeccd8

File tree

2 files changed

+131
-33
lines changed

2 files changed

+131
-33
lines changed
 

‎matter_server/server/device_controller.py

+129-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Controller that Manages Matter devices."""
2+
23
# pylint: disable=too-many-lines
34

45
from __future__ import annotations
@@ -8,14 +9,15 @@
89
from datetime import datetime
910
from functools import partial
1011
import logging
11-
import random
1212
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, TypeVar, cast
1313

1414
from chip.ChipDeviceCtrl import DeviceProxyWrapper
1515
from chip.clusters import Attribute, Objects as Clusters
1616
from chip.clusters.Attribute import ValueDecodeFailure
1717
from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster
1818
from chip.exceptions import ChipStackError
19+
from zeroconf import IPVersion, ServiceStateChange, Zeroconf
20+
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
1921

2022
from matter_server.common.helpers.util import convert_ip_address
2123
from matter_server.common.models import CommissionableNodeData, CommissioningParameters
@@ -27,6 +29,7 @@
2729
NodeCommissionFailed,
2830
NodeInterviewFailed,
2931
NodeNotExists,
32+
NodeNotReady,
3033
NodeNotResolving,
3134
)
3235
from ..common.helpers.api import api_command
@@ -61,6 +64,10 @@
6164
MAX_POLL_INTERVAL = 600
6265
MAX_COMMISSION_RETRIES = 3
6366

67+
MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local."
68+
MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local."
69+
70+
6471
ROUTING_ROLE_ATTRIBUTE_PATH = create_attribute_path_from_attribute(
6572
0, Clusters.ThreadNetworkDiagnostics.Attributes.RoutingRole
6673
)
@@ -114,6 +121,9 @@ def __init__(
114121
self.compressed_fabric_id: int | None = None
115122
self._node_lock: dict[int, asyncio.Lock] = {}
116123
self._resolve_lock = asyncio.Lock()
124+
self._aiobrowser: AsyncServiceBrowser | None = None
125+
self._aiozc: AsyncZeroconf | None = None
126+
self._mdns_inprogress: set[int] = set()
117127

118128
async def initialize(self) -> None:
119129
"""Async initialize of controller."""
@@ -150,30 +160,33 @@ async def start(self) -> None:
150160
# always mark node as unavailable at startup until subscriptions are ready
151161
node.available = False
152162
self._nodes[node_id] = node
153-
# setup subscription and (re)interview as task in the background
154-
# as we do not want it to block our startup
155-
if not node_dict.get("available"):
156-
# if the node was not available last time we will delay
157-
# the first attempt to initialize so that we prioritize nodes
158-
# that are probably available so they are back online as soon as
159-
# possible and we're not stuck trying to initialize nodes that are offline
160-
self._schedule_interview(node_id, 30)
161-
else:
162-
asyncio.create_task(self._check_interview_and_subscription(node_id))
163163
# cleanup orhpaned nodes from storage
164164
for node_id_str in orphaned_nodes:
165165
self.server.storage.remove(DATA_KEY_NODES, node_id_str)
166166
LOGGER.info("Loaded %s nodes from stored configuration", len(self._nodes))
167+
# set-up mdns browser
168+
self._aiozc = AsyncZeroconf(ip_version=IPVersion.All)
169+
services = [MDNS_TYPE_OPERATIONAL_NODE, MDNS_TYPE_COMMISSIONABLE_NODE]
170+
self._aiobrowser = AsyncServiceBrowser(
171+
self._aiozc.zeroconf,
172+
services,
173+
handlers=[self._on_mdns_service_state_change],
174+
)
167175

168176
async def stop(self) -> None:
169177
"""Handle logic on server stop."""
170178
if self.chip_controller is None:
171179
raise RuntimeError("Device Controller not initialized.")
172-
173180
# unsubscribe all node subscriptions
174181
for sub in self._subscriptions.values():
175182
await self._call_sdk(sub.Shutdown)
176183
self._subscriptions = {}
184+
# shutdown mdns browser
185+
if self._aiobrowser:
186+
await self._aiobrowser.async_cancel()
187+
if self._aiozc:
188+
await self._aiozc.async_close()
189+
# shutdown the sdk device controller
177190
await self._call_sdk(self.chip_controller.Shutdown)
178191
LOGGER.debug("Stopped.")
179192

@@ -246,7 +259,7 @@ async def commission_with_code(
246259
while retries:
247260
try:
248261
await self.interview_node(node_id)
249-
except NodeInterviewFailed as err:
262+
except (NodeNotResolving, NodeInterviewFailed) as err:
250263
if retries <= 0:
251264
raise err
252265
retries -= 1
@@ -469,16 +482,16 @@ async def interview_node(self, node_id: int) -> None:
469482
fabricFiltered=False,
470483
)
471484
)
472-
except (ChipStackError, NodeNotResolving) as err:
485+
except ChipStackError as err:
473486
raise NodeInterviewFailed(f"Failed to interview node {node_id}") from err
474487

475488
is_new_node = node_id not in self._nodes
476489
existing_info = self._nodes.get(node_id)
477490
node = MatterNodeData(
478491
node_id=node_id,
479-
date_commissioned=existing_info.date_commissioned
480-
if existing_info
481-
else datetime.utcnow(),
492+
date_commissioned=(
493+
existing_info.date_commissioned if existing_info else datetime.utcnow()
494+
),
482495
last_interview=datetime.utcnow(),
483496
interview_version=SCHEMA_VERSION,
484497
available=True,
@@ -519,7 +532,8 @@ async def send_device_command(
519532
"""Send a command to a Matter node/device."""
520533
if self.chip_controller is None:
521534
raise RuntimeError("Device Controller not initialized.")
522-
535+
if (node := self._nodes.get(node_id)) is None or not node.available:
536+
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
523537
cluster_cls: Cluster = ALL_CLUSTERS[cluster_id]
524538
command_cls = getattr(cluster_cls.Commands, command_name)
525539
command = dataclass_from_dict(command_cls, payload)
@@ -541,6 +555,8 @@ async def read_attribute(
541555
"""Read a single attribute (or Cluster) on a node."""
542556
if self.chip_controller is None:
543557
raise RuntimeError("Device Controller not initialized.")
558+
if (node := self._nodes.get(node_id)) is None or not node.available:
559+
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
544560
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path)
545561
assert self.server.loop is not None
546562
future = self.server.loop.create_future()
@@ -580,6 +596,8 @@ async def write_attribute(
580596
"""Write an attribute(value) on a target node."""
581597
if self.chip_controller is None:
582598
raise RuntimeError("Device Controller not initialized.")
599+
if (node := self._nodes.get(node_id)) is None or not node.available:
600+
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
583601
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path)
584602
attribute = ALL_ATTRIBUTES[cluster_id][attribute_id]()
585603
attribute.value = Clusters.NullValue if value is None else value
@@ -803,7 +821,10 @@ async def _subscribe_node(self, node_id: int) -> None:
803821
# if so, we need to unsubscribe first unless nothing changed
804822
# in the attribute paths we want to subscribe.
805823
if prev_sub := self._subscriptions.get(node_id, None):
806-
if self._attr_subscriptions.get(node_id) == attr_subscriptions:
824+
if (
825+
node.available
826+
and self._attr_subscriptions.get(node_id) == attr_subscriptions
827+
):
807828
# the current subscription already matches, no need to re-setup
808829
node_logger.debug("Re-using existing subscription.")
809830
return
@@ -937,8 +958,9 @@ def resubscription_attempted(
937958
# we debounce it a bit so we only mark the node unavailable
938959
# at the second resubscription attempt
939960
if node.available and self._last_subscription_attempt[node_id] >= 1:
940-
node.available = False
941-
self.server.signal_event(EventType.NODE_UPDATED, node)
961+
# NOTE: if the node is (re)discovered by mdns, that callback will
962+
# take care of resubscribing to the node
963+
asyncio.create_task(self._node_offline(node_id))
942964
self._last_subscription_attempt[node_id] += 1
943965

944966
def resubscription_succeeded(
@@ -954,9 +976,7 @@ def resubscription_succeeded(
954976

955977
node_logger.info("Setting up attributes and events subscription.")
956978
interval_floor = 0
957-
interval_ceiling = (
958-
random.randint(60, 300) if battery_powered else random.randint(30, 120)
959-
)
979+
interval_ceiling = 300 if battery_powered else 30
960980
self._last_subscription_attempt[node_id] = 0
961981
future = loop.create_future()
962982
device = await self._resolve_node(node_id)
@@ -1037,6 +1057,13 @@ async def _check_interview_and_subscription(
10371057
):
10381058
try:
10391059
await self.interview_node(node_id)
1060+
except NodeNotResolving:
1061+
LOGGER.warning(
1062+
"Unable to interview Node %s as it is unavailable",
1063+
node_id,
1064+
)
1065+
# NOTE: the node will be picked up by mdns discovery automatically
1066+
# when it becomes available again.
10401067
except NodeInterviewFailed:
10411068
LOGGER.warning(
10421069
"Unable to interview Node %s, will retry later in the background.",
@@ -1059,16 +1086,11 @@ async def _check_interview_and_subscription(
10591086
await self._subscribe_node(node_id)
10601087
except NodeNotResolving:
10611088
LOGGER.warning(
1062-
"Unable to subscribe to Node %s as it is unavailable, "
1063-
"will retry later in the background.",
1064-
node_id,
1065-
)
1066-
# TODO: fix this once OperationalNodeDiscovery is available:
1067-
# https://github.com/project-chip/connectedhomeip/pull/26718
1068-
self._schedule_interview(
1089+
"Unable to subscribe to Node %s as it is unavailable",
10691090
node_id,
1070-
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
10711091
)
1092+
# NOTE: the node will be picked up by mdns discovery automatically
1093+
# when it becomes available again.
10721094

10731095
def _schedule_interview(self, node_id: int, delay: int) -> None:
10741096
"""(Re)Schedule interview and/or initial subscription for a node."""
@@ -1151,6 +1173,65 @@ async def _handle_endpoints_added(
11511173
{"node_id": node_id, "endpoint_id": endpoint_id},
11521174
)
11531175

1176+
def _on_mdns_service_state_change(
1177+
self,
1178+
zeroconf: Zeroconf, # pylint: disable=unused-argument
1179+
service_type: str,
1180+
name: str,
1181+
state_change: ServiceStateChange,
1182+
) -> None:
1183+
if state_change not in (ServiceStateChange.Added, ServiceStateChange.Removed):
1184+
# we're not interested in update messages so return early
1185+
return
1186+
if service_type == MDNS_TYPE_COMMISSIONABLE_NODE:
1187+
asyncio.create_task(
1188+
self._on_mdns_commissionable_node_state(name, state_change)
1189+
)
1190+
return
1191+
if service_type == MDNS_TYPE_OPERATIONAL_NODE:
1192+
asyncio.create_task(
1193+
self._on_mdns_operational_node_state(name, state_change)
1194+
)
1195+
1196+
async def _on_mdns_operational_node_state(
1197+
self, name: str, state_change: ServiceStateChange
1198+
) -> None:
1199+
"""Handle a (operational) Matter node MDNS state change."""
1200+
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
1201+
# extract the node id from the name
1202+
node_id = int(name.split("-")[1].split(".")[0], 16)
1203+
if node_id not in self._nodes:
1204+
return # should not happen, but just in case
1205+
if node_id in self._mdns_inprogress:
1206+
# mdns records can potentially arrive multiplied so debounce any duplicates
1207+
return
1208+
try:
1209+
self._mdns_inprogress.add(node_id)
1210+
node = self._nodes[node_id]
1211+
if state_change == ServiceStateChange.Added:
1212+
if node.available:
1213+
return # node is already set-up, no action needed
1214+
LOGGER.info("Node %s discovered on MDNS", node_id)
1215+
# setup the node
1216+
await self._check_interview_and_subscription(node_id)
1217+
elif state_change == ServiceStateChange.Removed:
1218+
if not node.available:
1219+
return # node is already offline, nothing to do
1220+
LOGGER.info("Node %s vanished according to MDNS", node_id)
1221+
await self._node_offline(node_id)
1222+
finally:
1223+
self._mdns_inprogress.remove(node_id)
1224+
1225+
async def _on_mdns_commissionable_node_state(
1226+
self, name: str, state_change: ServiceStateChange
1227+
) -> None:
1228+
"""Handle a (commissionable) Matter node MDNS state change."""
1229+
if state_change == ServiceStateChange.Added:
1230+
info = AsyncServiceInfo(MDNS_TYPE_COMMISSIONABLE_NODE, name)
1231+
assert self._aiozc is not None
1232+
await info.async_request(self._aiozc.zeroconf, 3000)
1233+
LOGGER.debug("Discovered commissionable Matter node using MDNS: %s", info)
1234+
11541235
def _get_node_lock(self, node_id: int) -> asyncio.Lock:
11551236
"""Return lock for given node."""
11561237
if node_id not in self._node_lock:
@@ -1166,3 +1247,19 @@ def _write_node_state(self, node_id: int, force: bool = False) -> None:
11661247
subkey=str(node_id),
11671248
force=force,
11681249
)
1250+
1251+
async def _node_offline(self, node_id: int) -> None:
1252+
"""Mark node as offline."""
1253+
# Remove and cancel any existing interview/subscription reschedule timer
1254+
if existing := self._sub_retry_timer.pop(node_id, None):
1255+
existing.cancel()
1256+
# shutdown existing subscriptions
1257+
if sub := self._subscriptions.pop(node_id, None):
1258+
await self._call_sdk(sub.Shutdown)
1259+
# mark node as unavailable
1260+
node = self._nodes[node_id]
1261+
if not node.available:
1262+
return # nothing to do to
1263+
node.available = False
1264+
self.server.signal_event(EventType.NODE_UPDATED, node)
1265+
LOGGER.info("Marked node %s as offline", node_id)

‎pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ dependencies = [
3434
[project.optional-dependencies]
3535
server = [
3636
"home-assistant-chip-core==2024.1.0",
37-
"cryptography==42.0.2"
37+
"cryptography==42.0.2",
38+
"zeroconf==0.131.0"
3839
]
3940
test = [
4041
"black==23.12.1",

0 commit comments

Comments
 (0)