From 767c7820ecf568562b1af78dd89ddd2c112e15ae Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 09:28:02 +0100 Subject: [PATCH 1/5] Implement pyzeroconf for operational node discovery --- matter_server/server/device_controller.py | 150 +++++++++++++++++----- pyproject.toml | 3 +- 2 files changed, 122 insertions(+), 31 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 7c019e00..46ddec61 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -1,4 +1,5 @@ """Controller that Manages Matter devices.""" + # pylint: disable=too-many-lines from __future__ import annotations @@ -8,7 +9,6 @@ from datetime import datetime from functools import partial import logging -import random from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, TypeVar, cast from chip.ChipDeviceCtrl import DeviceProxyWrapper @@ -16,6 +16,8 @@ from chip.clusters.Attribute import ValueDecodeFailure from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster from chip.exceptions import ChipStackError +from zeroconf import IPVersion, ServiceStateChange, Zeroconf +from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf from matter_server.common.helpers.util import convert_ip_address from matter_server.common.models import CommissionableNodeData, CommissioningParameters @@ -27,6 +29,7 @@ NodeCommissionFailed, NodeInterviewFailed, NodeNotExists, + NodeNotReady, NodeNotResolving, ) from ..common.helpers.api import api_command @@ -61,6 +64,10 @@ MAX_POLL_INTERVAL = 600 MAX_COMMISSION_RETRIES = 3 +MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local." +MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local." + + ROUTING_ROLE_ATTRIBUTE_PATH = create_attribute_path_from_attribute( 0, Clusters.ThreadNetworkDiagnostics.Attributes.RoutingRole ) @@ -114,6 +121,9 @@ def __init__( self.compressed_fabric_id: int | None = None self._node_lock: dict[int, asyncio.Lock] = {} self._resolve_lock = asyncio.Lock() + self._aiobrowser: AsyncServiceBrowser | None = None + self._aiozc: AsyncZeroconf | None = None + self._mdns_inprogress: set[int] = set() async def initialize(self) -> None: """Async initialize of controller.""" @@ -150,30 +160,33 @@ async def start(self) -> None: # always mark node as unavailable at startup until subscriptions are ready node.available = False self._nodes[node_id] = node - # setup subscription and (re)interview as task in the background - # as we do not want it to block our startup - if not node_dict.get("available"): - # if the node was not available last time we will delay - # the first attempt to initialize so that we prioritize nodes - # that are probably available so they are back online as soon as - # possible and we're not stuck trying to initialize nodes that are offline - self._schedule_interview(node_id, 30) - else: - asyncio.create_task(self._check_interview_and_subscription(node_id)) # cleanup orhpaned nodes from storage for node_id_str in orphaned_nodes: self.server.storage.remove(DATA_KEY_NODES, node_id_str) LOGGER.info("Loaded %s nodes from stored configuration", len(self._nodes)) + # set-up mdns browser + self._aiozc = AsyncZeroconf(ip_version=IPVersion.All) + services = [MDNS_TYPE_OPERATIONAL_NODE, MDNS_TYPE_COMMISSIONABLE_NODE] + self._aiobrowser = AsyncServiceBrowser( + self._aiozc.zeroconf, + services, + handlers=[self._on_mdns_service_state_change], + ) async def stop(self) -> None: """Handle logic on server stop.""" if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") - # unsubscribe all node subscriptions for sub in self._subscriptions.values(): await self._call_sdk(sub.Shutdown) self._subscriptions = {} + # shutdown mdns browser + if self._aiobrowser: + await self._aiobrowser.async_cancel() + if self._aiozc: + await self._aiozc.async_close() + # shutdown the sdk device controller await self._call_sdk(self.chip_controller.Shutdown) LOGGER.debug("Stopped.") @@ -246,7 +259,7 @@ async def commission_with_code( while retries: try: await self.interview_node(node_id) - except NodeInterviewFailed as err: + except (NodeNotResolving, NodeInterviewFailed) as err: if retries <= 0: raise err retries -= 1 @@ -469,16 +482,16 @@ async def interview_node(self, node_id: int) -> None: fabricFiltered=False, ) ) - except (ChipStackError, NodeNotResolving) as err: + except ChipStackError as err: raise NodeInterviewFailed(f"Failed to interview node {node_id}") from err is_new_node = node_id not in self._nodes existing_info = self._nodes.get(node_id) node = MatterNodeData( node_id=node_id, - date_commissioned=existing_info.date_commissioned - if existing_info - else datetime.utcnow(), + date_commissioned=( + existing_info.date_commissioned if existing_info else datetime.utcnow() + ), last_interview=datetime.utcnow(), interview_version=SCHEMA_VERSION, available=True, @@ -519,7 +532,8 @@ async def send_device_command( """Send a command to a Matter node/device.""" if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") - + if (node := self._nodes.get(node_id)) is None or not node.available: + raise NodeNotReady(f"Node {node_id} is not (yet) available.") cluster_cls: Cluster = ALL_CLUSTERS[cluster_id] command_cls = getattr(cluster_cls.Commands, command_name) command = dataclass_from_dict(command_cls, payload) @@ -541,6 +555,8 @@ async def read_attribute( """Read a single attribute (or Cluster) on a node.""" if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") + if (node := self._nodes.get(node_id)) is None or not node.available: + raise NodeNotReady(f"Node {node_id} is not (yet) available.") endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) assert self.server.loop is not None future = self.server.loop.create_future() @@ -580,6 +596,8 @@ async def write_attribute( """Write an attribute(value) on a target node.""" if self.chip_controller is None: raise RuntimeError("Device Controller not initialized.") + if (node := self._nodes.get(node_id)) is None or not node.available: + raise NodeNotReady(f"Node {node_id} is not (yet) available.") endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) attribute = ALL_ATTRIBUTES[cluster_id][attribute_id]() attribute.value = Clusters.NullValue if value is None else value @@ -803,7 +821,10 @@ async def _subscribe_node(self, node_id: int) -> None: # if so, we need to unsubscribe first unless nothing changed # in the attribute paths we want to subscribe. if prev_sub := self._subscriptions.get(node_id, None): - if self._attr_subscriptions.get(node_id) == attr_subscriptions: + if ( + node.available + and self._attr_subscriptions.get(node_id) == attr_subscriptions + ): # the current subscription already matches, no need to re-setup node_logger.debug("Re-using existing subscription.") return @@ -938,6 +959,8 @@ def resubscription_attempted( # at the second resubscription attempt if node.available and self._last_subscription_attempt[node_id] >= 1: node.available = False + # NOTE: if the node is (re)discovered bt mdns, that callback will + # take care of resubscribing to the node self.server.signal_event(EventType.NODE_UPDATED, node) self._last_subscription_attempt[node_id] += 1 @@ -954,9 +977,7 @@ def resubscription_succeeded( node_logger.info("Setting up attributes and events subscription.") interval_floor = 0 - interval_ceiling = ( - random.randint(60, 300) if battery_powered else random.randint(30, 120) - ) + interval_ceiling = 3600 if battery_powered else 30 self._last_subscription_attempt[node_id] = 0 future = loop.create_future() device = await self._resolve_node(node_id) @@ -1037,6 +1058,13 @@ async def _check_interview_and_subscription( ): try: await self.interview_node(node_id) + except NodeNotResolving: + LOGGER.warning( + "Unable to interview Node %s as it is unavailable", + node_id, + ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it becomes available again. except NodeInterviewFailed: LOGGER.warning( "Unable to interview Node %s, will retry later in the background.", @@ -1059,16 +1087,11 @@ async def _check_interview_and_subscription( await self._subscribe_node(node_id) except NodeNotResolving: LOGGER.warning( - "Unable to subscribe to Node %s as it is unavailable, " - "will retry later in the background.", - node_id, - ) - # TODO: fix this once OperationalNodeDiscovery is available: - # https://github.com/project-chip/connectedhomeip/pull/26718 - self._schedule_interview( + "Unable to subscribe to Node %s as it is unavailable", node_id, - min(reschedule_interval + 10, MAX_POLL_INTERVAL), ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it becomes available again. def _schedule_interview(self, node_id: int, delay: int) -> None: """(Re)Schedule interview and/or initial subscription for a node.""" @@ -1151,6 +1174,73 @@ async def _handle_endpoints_added( {"node_id": node_id, "endpoint_id": endpoint_id}, ) + def _on_mdns_service_state_change( + self, + zeroconf: Zeroconf, # pylint: disable=unused-argument + service_type: str, + name: str, + state_change: ServiceStateChange, + ) -> None: + if state_change not in (ServiceStateChange.Added, ServiceStateChange.Removed): + # we're not interested in update messages so return early + return + if service_type == MDNS_TYPE_COMMISSIONABLE_NODE: + asyncio.create_task( + self._on_mdns_commissionable_node_state(name, state_change) + ) + return + if service_type == MDNS_TYPE_OPERATIONAL_NODE: + asyncio.create_task( + self._on_mdns_operational_node_state(name, state_change) + ) + + async def _on_mdns_operational_node_state( + self, name: str, state_change: ServiceStateChange + ) -> None: + """Handle a (operational) Matter node MDNS state change.""" + # the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local. + # extract the node id from the name + node_id = int(name.split("-")[1].split(".")[0], 16) + if node_id not in self._nodes: + return # should not happen, but just in case + if node_id in self._mdns_inprogress: + # mdns records can potentially arrive multiplied so debounce any duplicates + return + try: + self._mdns_inprogress.add(node_id) + node = self._nodes[node_id] + if state_change == ServiceStateChange.Added: + if node.available: + return # node is already set-up, no action needed + LOGGER.info("Node %s discovered on MDNS", node_id) + # setup the node + await self._check_interview_and_subscription(node_id) + elif state_change == ServiceStateChange.Removed: + if not node.available: + return # node is already offline, nothing to do + LOGGER.info("Node %s vanished according to MDNS", node_id) + # Remove and cancel any existing interview/subscription reschedule timer + if existing := self._sub_retry_timer.pop(node_id, None): + existing.cancel() + # shutdown existing subscriptions + if sub := self._subscriptions.pop(node_id, None): + await self._call_sdk(sub.Shutdown) + # mark node as unavailable + node.available = False + self.server.signal_event(EventType.NODE_UPDATED, node_id) + finally: + self._mdns_inprogress.remove(node_id) + + async def _on_mdns_commissionable_node_state( + self, name: str, state_change: ServiceStateChange + ) -> None: + """Handle a (commissionable) Matter node MDNS state change.""" + if state_change == ServiceStateChange.Added: + info = AsyncServiceInfo(MDNS_TYPE_COMMISSIONABLE_NODE, name) + assert self._aiozc is not None + await info.async_request(self._aiozc.zeroconf, 3000) + LOGGER.debug("Discovered commissionable Matter node using MDNS: %s", info) + def _get_node_lock(self, node_id: int) -> asyncio.Lock: """Return lock for given node.""" if node_id not in self._node_lock: diff --git a/pyproject.toml b/pyproject.toml index 4dec48a5..04ac803a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ dependencies = [ [project.optional-dependencies] server = [ "home-assistant-chip-core==2024.1.0", - "cryptography==42.0.2" + "cryptography==42.0.2", + "zeroconf==0.2.0" ] test = [ "black==23.12.1", From d731dea0989e739f4978f759c4105c0fdd122ab9 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 09:38:22 +0100 Subject: [PATCH 2/5] typo --- matter_server/server/device_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 46ddec61..4413226c 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -959,7 +959,7 @@ def resubscription_attempted( # at the second resubscription attempt if node.available and self._last_subscription_attempt[node_id] >= 1: node.available = False - # NOTE: if the node is (re)discovered bt mdns, that callback will + # NOTE: if the node is (re)discovered by mdns, that callback will # take care of resubscribing to the node self.server.signal_event(EventType.NODE_UPDATED, node) self._last_subscription_attempt[node_id] += 1 From 59b95313084ebf28a44e2bdee1f524ea67f3a12b Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 09:53:00 +0100 Subject: [PATCH 3/5] reststructure offline node code --- matter_server/server/device_controller.py | 28 ++++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 4413226c..3d0b6c95 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -958,10 +958,9 @@ def resubscription_attempted( # we debounce it a bit so we only mark the node unavailable # at the second resubscription attempt if node.available and self._last_subscription_attempt[node_id] >= 1: - node.available = False # NOTE: if the node is (re)discovered by mdns, that callback will # take care of resubscribing to the node - self.server.signal_event(EventType.NODE_UPDATED, node) + asyncio.create_task(self._node_offline(node_id)) self._last_subscription_attempt[node_id] += 1 def resubscription_succeeded( @@ -1219,15 +1218,7 @@ async def _on_mdns_operational_node_state( if not node.available: return # node is already offline, nothing to do LOGGER.info("Node %s vanished according to MDNS", node_id) - # Remove and cancel any existing interview/subscription reschedule timer - if existing := self._sub_retry_timer.pop(node_id, None): - existing.cancel() - # shutdown existing subscriptions - if sub := self._subscriptions.pop(node_id, None): - await self._call_sdk(sub.Shutdown) - # mark node as unavailable - node.available = False - self.server.signal_event(EventType.NODE_UPDATED, node_id) + await self._node_offline(node_id) finally: self._mdns_inprogress.remove(node_id) @@ -1256,3 +1247,18 @@ def _write_node_state(self, node_id: int, force: bool = False) -> None: subkey=str(node_id), force=force, ) + + async def _node_offline(self, node_id: int) -> None: + """Mark node as offline.""" + # Remove and cancel any existing interview/subscription reschedule timer + if existing := self._sub_retry_timer.pop(node_id, None): + existing.cancel() + # shutdown existing subscriptions + if sub := self._subscriptions.pop(node_id, None): + await self._call_sdk(sub.Shutdown) + # mark node as unavailable + node = self._nodes[node_id] + if not node.available: + return + node.available = False + self.server.signal_event(EventType.NODE_UPDATED, node) From c68a8785164849c570f8070da348b30e0870be6c Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 09:57:09 +0100 Subject: [PATCH 4/5] typo --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 04ac803a..ebe8eff9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ server = [ "home-assistant-chip-core==2024.1.0", "cryptography==42.0.2", - "zeroconf==0.2.0" + "zeroconf==0.131.0" ] test = [ "black==23.12.1", From f10037f37657d918530cd107f09f5f491011a299 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 7 Feb 2024 10:39:58 +0100 Subject: [PATCH 5/5] some touches --- matter_server/server/device_controller.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 3d0b6c95..643a1e0d 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -976,7 +976,7 @@ def resubscription_succeeded( node_logger.info("Setting up attributes and events subscription.") interval_floor = 0 - interval_ceiling = 3600 if battery_powered else 30 + interval_ceiling = 300 if battery_powered else 30 self._last_subscription_attempt[node_id] = 0 future = loop.create_future() device = await self._resolve_node(node_id) @@ -1259,6 +1259,7 @@ async def _node_offline(self, node_id: int) -> None: # mark node as unavailable node = self._nodes[node_id] if not node.available: - return + return # nothing to do to node.available = False self.server.signal_event(EventType.NODE_UPDATED, node) + LOGGER.info("Marked node %s as offline", node_id)