From efa04e034081dca11fd59072cb48ead466109598 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 15:15:25 +0100 Subject: [PATCH 01/12] Remove Timeout --- matter_server/server/device_controller.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 9504faa0..cfc8bd7b 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -14,7 +14,6 @@ import time from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast -import async_timeout from chip.ChipDeviceCtrl import DeviceProxyWrapper from chip.clusters import Attribute, Objects as Clusters from chip.clusters.Attribute import ValueDecodeFailure @@ -1112,14 +1111,7 @@ async def _setup_node(self, node_id: int) -> None: return # setup subscriptions for the node try: - async with async_timeout.timeout(15 * 60): - await self._subscribe_node(node_id) - except TimeoutError: - LOGGER.warning( - "Setting up subscriptions for node %s did not " - "succeed after 15 minutes!", - node_id, - ) + await self._subscribe_node(node_id) except (NodeNotResolving, ChipStackError) as err: LOGGER.warning( "Unable to subscribe to Node %s: %s", From 3f7e2472f266b5a080f14e98db2157349f830709 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 15:21:21 +0100 Subject: [PATCH 02/12] Allow cached ip in background scanner --- matter_server/server/device_controller.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index cfc8bd7b..0216902f 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -1288,8 +1288,9 @@ async def _fallback_node_scanner(self) -> None: last_seen = self._node_last_seen.get(node_id, 0) if now - last_seen < FALLBACK_NODE_SCANNER_INTERVAL: continue - if await self.ping_node(node_id, attempts=3, allow_cached_ips=False): + if await self.ping_node(node_id, attempts=3, allow_cached_ips=True): LOGGER.info("Node %s discovered using fallback ping", node_id) + self._node_last_seen[node_id] = now await self._setup_node(node_id) def reschedule_self() -> None: From 488d98182ca1db6871c80d467313306e85f030f9 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 15:25:02 +0100 Subject: [PATCH 03/12] Remove cached ips params --- matter_server/server/device_controller.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 0216902f..2d0feb0b 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -727,9 +727,7 @@ async def subscribe_attribute( ) @api_command(APICommand.PING_NODE) - async def ping_node( - self, node_id: int, attempts: int = 1, allow_cached_ips: bool = True - ) -> NodePingResult: + async def ping_node(self, node_id: int, attempts: int = 1) -> NodePingResult: """Ping node on the currently known IP-adress(es).""" result: NodePingResult = {} node = self._nodes.get(node_id) @@ -763,7 +761,7 @@ async def _do_ping(ip_address: str) -> None: result[clean_ip] = await ping_ip(ip_address, timeout, attempts=attempts) ip_addresses = await self.get_node_ip_addresses( - node_id, prefer_cache=False, scoped=True, allow_cache=allow_cached_ips + node_id, prefer_cache=False, scoped=True ) tasks = [_do_ping(x) for x in ip_addresses] # TODO: replace this gather with a taskgroup once we bump our py version @@ -792,7 +790,6 @@ async def get_node_ip_addresses( node_id: int, prefer_cache: bool = False, scoped: bool = False, - allow_cache: bool = True, ) -> list[str]: """Return the currently known (scoped) IP-adress(es).""" cached_info = self._last_known_ip_addresses.get(node_id, []) @@ -810,7 +807,7 @@ async def get_node_ip_addresses( info = AsyncServiceInfo(MDNS_TYPE_OPERATIONAL_NODE, mdns_name) if TYPE_CHECKING: assert self._aiozc is not None - if not await info.async_request(self._aiozc.zeroconf, 3000) and allow_cache: + if not await info.async_request(self._aiozc.zeroconf, 3000): node_logger.info( "Node could not be discovered on the network, returning cached IP's" ) @@ -1076,9 +1073,7 @@ async def _setup_node(self, node_id: int) -> None: # Ping the node to rule out stale mdns reports and to prevent that we # send an unreachable node to the sdk which is very slow with resolving it. # This will also precache the ip addresses of the node for later use. - ping_result = await self.ping_node( - node_id, attempts=3, allow_cached_ips=False - ) + ping_result = await self.ping_node(node_id, attempts=3) if not any(ping_result.values()): LOGGER.warning( "Skip set-up for node %s because it does not appear to be reachable...", @@ -1288,7 +1283,7 @@ async def _fallback_node_scanner(self) -> None: last_seen = self._node_last_seen.get(node_id, 0) if now - last_seen < FALLBACK_NODE_SCANNER_INTERVAL: continue - if await self.ping_node(node_id, attempts=3, allow_cached_ips=True): + if await self.ping_node(node_id, attempts=3): LOGGER.info("Node %s discovered using fallback ping", node_id) self._node_last_seen[node_id] = now await self._setup_node(node_id) From 3da50b0231d73e46130141e8a7aaca90c7658256 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 15:43:04 +0100 Subject: [PATCH 04/12] Use SDK resolve in node setup --- matter_server/server/device_controller.py | 28 ++++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 2d0feb0b..c9382592 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -501,7 +501,6 @@ async def interview_node(self, node_id: int) -> None: try: if not (node := self._nodes.get(node_id)) or not node.available: - await self._resolve_node(node_id=node_id) LOGGER.info("Interviewing node: %s", node_id) read_response: Attribute.AsyncReadTransaction.ReadResponse = ( await self.chip_controller.Read( @@ -1070,17 +1069,23 @@ async def _setup_node(self, node_id: int) -> None: self._nodes_in_setup.add(node_id) try: async with self._node_setup_throttle: - # Ping the node to rule out stale mdns reports and to prevent that we - # send an unreachable node to the sdk which is very slow with resolving it. - # This will also precache the ip addresses of the node for later use. - ping_result = await self.ping_node(node_id, attempts=3) - if not any(ping_result.values()): + LOGGER.info("Setting-up node %s...", node_id) + + # try to resolve the node using the sdk first before do anything else + try: + await self._resolve_node(node_id=node_id) + except NodeNotResolving as err: LOGGER.warning( - "Skip set-up for node %s because it does not appear to be reachable...", + "Setup for node %s failed: %s", node_id, + str(err) or err.__class__.__name__, + # log full stack trace if debug logging is enabled + exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, ) + # NOTE: the node will be picked up by mdns discovery automatically + # when it comes available again. return - LOGGER.info("Setting-up node %s...", node_id) + # (re)interview node (only) if needed node_data = self._nodes[node_id] if ( @@ -1091,9 +1096,9 @@ async def _setup_node(self, node_id: int) -> None: ): try: await self.interview_node(node_id) - except (NodeNotResolving, NodeInterviewFailed) as err: + except NodeInterviewFailed as err: LOGGER.warning( - "Unable to interview Node %s: %s", + "Setup for node %s failed: %s", node_id, str(err) or err.__class__.__name__, # log full stack trace if debug logging is enabled @@ -1104,10 +1109,11 @@ async def _setup_node(self, node_id: int) -> None: # NOTE: the node will be picked up by mdns discovery automatically # when it comes available again. return + # setup subscriptions for the node try: await self._subscribe_node(node_id) - except (NodeNotResolving, ChipStackError) as err: + except ChipStackError as err: LOGGER.warning( "Unable to subscribe to Node %s: %s", node_id, From 0d83c04aa5db2044d1dd26cdbf757a8c0813bcb6 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 21:21:32 +0100 Subject: [PATCH 05/12] Allow running the loop in debug mode on dev setup --- matter_server/server/server.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/matter_server/server/server.py b/matter_server/server/server.py index 029e7b68..bd4338b8 100644 --- a/matter_server/server/server.py +++ b/matter_server/server/server.py @@ -9,7 +9,7 @@ import os from pathlib import Path import traceback -from typing import Any, Callable, Set, cast +from typing import TYPE_CHECKING, Any, Callable, Set, cast import weakref from aiohttp import web @@ -133,6 +133,7 @@ async def start(self) -> None: ) self.loop = asyncio.get_running_loop() self.loop.set_exception_handler(_global_loop_exception_handler) + self.loop.set_debug(os.environ.get("PYTHONDEBUG") == "1") await self.device_controller.initialize() await self.storage.start() await self.device_controller.start() @@ -219,11 +220,13 @@ def get_diagnostics(self) -> ServerDiagnostics: def signal_event(self, evt: EventType, data: Any = None) -> None: """Signal event to listeners.""" + if TYPE_CHECKING: + assert self.loop for callback in self._subscribers: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(evt, data)) else: - callback(evt, data) + self.loop.call_soon_threadsafe(callback, evt, data) def scope_ipv6_lla(self, ip_addr: str) -> str: """Scope IPv6 link-local addresses to primary interface. From 00a9ac9fa01308942caee46acdd7d95b71fbb61c Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 21:22:03 +0100 Subject: [PATCH 06/12] Optimizations --- matter_server/server/device_controller.py | 51 +++++++++++++++-------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index c9382592..b05c5e5d 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -114,6 +114,7 @@ def __init__( self._aiobrowser: AsyncServiceBrowser | None = None self._aiozc: AsyncZeroconf | None = None self._fallback_node_scanner_timer: asyncio.TimerHandle | None = None + self._fallback_node_scanner_task: asyncio.Task | None = None self._sdk_executor = ThreadPoolExecutor( max_workers=1, thread_name_prefix="SDKExecutor" ) @@ -181,7 +182,7 @@ async def start(self) -> None: handlers=[self._on_mdns_service_state_change], ) # set-up fallback node scanner - asyncio.create_task(self._fallback_node_scanner()) + self._schedule_fallback_scanner() async def stop(self) -> None: """Handle logic on server stop.""" @@ -192,6 +193,8 @@ async def stop(self) -> None: await self._aiobrowser.async_cancel() if self._fallback_node_scanner_timer: self._fallback_node_scanner_timer.cancel() + if (scan_task := self._fallback_node_scanner_task) and not scan_task.done(): + scan_task.cancel() if self._aiozc: await self._aiozc.async_close() # unsubscribe all node subscriptions @@ -583,7 +586,6 @@ async def read_attribute( if (node := self._nodes.get(node_id)) is None or not node.available: raise NodeNotReady(f"Node {node_id} is not (yet) available.") endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) - device = await self._resolve_node(node_id) # Read a list of attributes and/or events from a target node. # This is basically a re-implementation of the chip controller's Read function # but one that allows us to send/request custom attributes. @@ -908,7 +910,7 @@ def attribute_updated_callback( node.attributes[attr_path] = new_value # schedule save to persistent storage - self._write_node_state(node_id) + loop.call_soon_threadsafe(self._write_node_state, node_id) # This callback is running in the CHIP stack thread loop.call_soon_threadsafe( @@ -1067,17 +1069,17 @@ async def _setup_node(self, node_id: int) -> None: # prevent duplicate setup actions return self._nodes_in_setup.add(node_id) + node_logger = LOGGER.getChild(f"node_{node_id}") try: async with self._node_setup_throttle: - LOGGER.info("Setting-up node %s...", node_id) + node_logger.info("Setting-up node...") # try to resolve the node using the sdk first before do anything else try: await self._resolve_node(node_id=node_id) except NodeNotResolving as err: - LOGGER.warning( - "Setup for node %s failed: %s", - node_id, + node_logger.warning( + "Setup for node failed: %s", str(err) or err.__class__.__name__, # log full stack trace if debug logging is enabled exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, @@ -1097,9 +1099,8 @@ async def _setup_node(self, node_id: int) -> None: try: await self.interview_node(node_id) except NodeInterviewFailed as err: - LOGGER.warning( - "Setup for node %s failed: %s", - node_id, + node_logger.warning( + "Setup for node failed: %s", str(err) or err.__class__.__name__, # log full stack trace if debug logging is enabled exc_info=err @@ -1114,9 +1115,8 @@ async def _setup_node(self, node_id: int) -> None: try: await self._subscribe_node(node_id) except ChipStackError as err: - LOGGER.warning( - "Unable to subscribe to Node %s: %s", - node_id, + node_logger.warning( + "Unable to subscribe to Node: %s", str(err) or err.__class__.__name__, # log full stack trace if debug logging is enabled exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, @@ -1141,6 +1141,7 @@ async def _resolve_node( attempt, retries, ) + time_start = time.time() return await self._call_sdk( self.chip_controller.GetConnectedDeviceSync, nodeid=node_id, @@ -1156,6 +1157,11 @@ async def _resolve_node( return await self._resolve_node( node_id=node_id, retries=retries, attempt=attempt + 1 ) + finally: + resolve_time_seconds = int(time.time() - time_start) + LOGGER.debug( + "Resolving node %s took %s seconds", node_id, resolve_time_seconds + ) def _handle_endpoints_removed(self, node_id: int, endpoints: Iterable[int]) -> None: """Handle callback for when bridge endpoint(s) get deleted.""" @@ -1294,13 +1300,24 @@ async def _fallback_node_scanner(self) -> None: self._node_last_seen[node_id] = now await self._setup_node(node_id) - def reschedule_self() -> None: + # reschedule self to run at next interval + self._schedule_fallback_scanner() + + def _schedule_fallback_scanner(self) -> None: + """Schedule running the fallback node scanner at X interval.""" + if existing := self._fallback_node_scanner_timer: + existing.cancel() + + def run_fallback_node_scanner() -> None: self._fallback_node_scanner_timer = None - asyncio.create_task(self._fallback_node_scanner()) + if (existing := self._fallback_node_scanner_task) and not existing.done(): + existing.cancel() + self._fallback_node_scanner_task = asyncio.create_task( + self._fallback_node_scanner() + ) - # reschedule task to run at next interval if TYPE_CHECKING: assert self.server.loop self._fallback_node_scanner_timer = self.server.loop.call_later( - FALLBACK_NODE_SCANNER_INTERVAL, reschedule_self + FALLBACK_NODE_SCANNER_INTERVAL, run_fallback_node_scanner ) From 5720d4be31799dcdf5fb6cea1473a051828d8d51 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 21:40:55 +0100 Subject: [PATCH 07/12] fix typo in interview_node --- matter_server/server/device_controller.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index b05c5e5d..7cff8a3a 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -503,15 +503,14 @@ async def interview_node(self, node_id: int) -> None: raise RuntimeError("Device Controller not initialized.") try: - if not (node := self._nodes.get(node_id)) or not node.available: - LOGGER.info("Interviewing node: %s", node_id) - read_response: Attribute.AsyncReadTransaction.ReadResponse = ( - await self.chip_controller.Read( - nodeid=node_id, - attributes="*", - fabricFiltered=False, - ) + LOGGER.info("Interviewing node: %s", node_id) + read_response: Attribute.AsyncReadTransaction.ReadResponse = ( + await self.chip_controller.Read( + nodeid=node_id, + attributes="*", + fabricFiltered=False, ) + ) except ChipStackError as err: raise NodeInterviewFailed(f"Failed to interview node {node_id}") from err From a282370417cb33f3e6c8a3464afa1cbcc00b6f91 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 21:58:44 +0100 Subject: [PATCH 08/12] handle race condition in ping --- matter_server/server/helpers/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/matter_server/server/helpers/utils.py b/matter_server/server/helpers/utils.py index 517cb5f4..7b23093b 100644 --- a/matter_server/server/helpers/utils.py +++ b/matter_server/server/helpers/utils.py @@ -1,6 +1,7 @@ """Utils for Matter server.""" import asyncio +from contextlib import suppress import platform import async_timeout @@ -44,7 +45,8 @@ async def check_output(shell_cmd: str) -> tuple[int | None, bytes]: try: stdout, _ = await proc.communicate() except asyncio.CancelledError: - proc.terminate() - await proc.communicate() + with suppress(ProcessLookupError): + proc.terminate() + await proc.communicate() raise return (proc.returncode, stdout) From acb33e36ec97c69235029381496b7205a5669e0a Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 5 Mar 2024 23:11:18 +0100 Subject: [PATCH 09/12] better debouncing of mdns --- matter_server/server/device_controller.py | 90 ++++++++++++++++------- 1 file changed, 63 insertions(+), 27 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 7cff8a3a..3b3b94c1 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -20,7 +20,7 @@ from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster from chip.exceptions import ChipStackError from chip.native import PyChipError -from zeroconf import IPVersion, ServiceStateChange, Zeroconf +from zeroconf import BadTypeInNameException, IPVersion, ServiceStateChange, Zeroconf from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf from matter_server.common.models import CommissionableNodeData, CommissioningParameters @@ -72,7 +72,7 @@ NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000 NODE_PING_TIMEOUT = 10 NODE_PING_TIMEOUT_BATTERY_POWERED = 60 -NODE_MDNS_BACKOFF = 60 +NODE_MDNS_BACKOFF = 300 FALLBACK_NODE_SCANNER_INTERVAL = 1800 MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local." @@ -119,6 +119,7 @@ def __init__( max_workers=1, thread_name_prefix="SDKExecutor" ) self._node_setup_throttle = asyncio.Semaphore(10) + self._mdns_event_timer: dict[str, asyncio.TimerHandle] = {} async def initialize(self) -> None: """Async initialize of controller.""" @@ -1198,29 +1199,46 @@ def _on_mdns_service_state_change( name: str, state_change: ServiceStateChange, ) -> None: - LOGGER.debug("Received %s MDNS event for %s", state_change, name) + # mdns events may arrive in bursts of (duplicate) messages + # so we debounce this with a timer handle. + if state_change == ServiceStateChange.Removed: + # if we have an existing timer for this name, cancel it. + if cancel := self._mdns_event_timer.pop(name, None): + cancel.cancel() + if service_type == MDNS_TYPE_OPERATIONAL_NODE: + # we're not interested in operational node removals, + # this is already handled by the subscription logic + return + + if name in self._mdns_event_timer: + # We already have a timer to resolve this service, so ignore this callback. + return + + if TYPE_CHECKING: + assert self.server.loop + if service_type == MDNS_TYPE_COMMISSIONABLE_NODE: - asyncio.create_task( - self._on_mdns_commissionable_node_state(name, state_change) + # process the event with a debounce timer + self._mdns_event_timer[name] = self.server.loop.call_later( + 0.5, self._on_mdns_commissionable_node_state, name, state_change ) return + if service_type == MDNS_TYPE_OPERATIONAL_NODE: - self._on_mdns_operational_node_state(name, state_change) + if self.fabric_id_hex not in name.lower(): + # filter out messages that are not for our fabric + return + # process the event with a debounce timer + self._mdns_event_timer[name] = self.server.loop.call_later( + 0.5, self._on_mdns_operational_node_state, name, state_change + ) def _on_mdns_operational_node_state( self, name: str, state_change: ServiceStateChange ) -> None: """Handle a (operational) Matter node MDNS state change.""" - name = name.lower() - if self.fabric_id_hex not in name: - # filter out messages that are not for our fabric - return - - if state_change == ServiceStateChange.Removed: - # we're not interested in removals as this is already - # handled in the subscription logic - return - + self._mdns_event_timer.pop(name, None) + logger = LOGGER.getChild("mdns") # the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local. # extract the node id from the name node_id = int(name.split("-")[1].split(".")[0], 16) @@ -1228,16 +1246,11 @@ def _on_mdns_operational_node_state( if not (node := self._nodes.get(node_id)): return # this should not happen, but guard just in case - # mdns events for matter devices arrive in bursts of (duplicate) messages - # so we debounce this as we only use the mdns messages for operational node discovery - # and we have other logic in place to determine node aliveness now = time.time() last_seen = self._node_last_seen.get(node_id, 0) - if node.available and now - last_seen < NODE_MDNS_BACKOFF: - return self._node_last_seen[node_id] = now - # we treat UPDATE state changes as ADD if the node is marked as + # we only treat UPDATE state changes as ADD if the node is marked as # unavailable to ensure we catch a node being operational if node.available and state_change == ServiceStateChange.Updated: return @@ -1245,19 +1258,42 @@ def _on_mdns_operational_node_state( if node_id in self._nodes_in_setup: # prevent duplicate setup actions return - LOGGER.info("Node %s (re)discovered on MDNS", node_id) + + if node_id not in self._subscriptions: + logger.info("Node %s discovered on MDNS", node_id) + elif (now - last_seen) > NODE_MDNS_BACKOFF: + # node came back online after being offline for a while or restarted + logger.info("Node %s re-discovered on MDNS", node_id) + else: + # ignore all other cases + return + # setup the node - this will (re) setup the subscriptions etc. asyncio.create_task(self._setup_node(node_id)) - async def _on_mdns_commissionable_node_state( + def _on_mdns_commissionable_node_state( self, name: str, state_change: ServiceStateChange ) -> None: """Handle a (commissionable) Matter node MDNS state change.""" - if state_change == ServiceStateChange.Added: + self._mdns_event_timer.pop(name, None) + logger = LOGGER.getChild("mdns") + + try: info = AsyncServiceInfo(MDNS_TYPE_COMMISSIONABLE_NODE, name) - assert self._aiozc is not None + except BadTypeInNameException as ex: + logger.debug("Ignoring record with bad type in name: %s: %s", name, ex) + return + + async def handle_commissionable_node_added() -> None: + if TYPE_CHECKING: + assert self._aiozc is not None await info.async_request(self._aiozc.zeroconf, 3000) - LOGGER.debug("Discovered commissionable Matter node using MDNS: %s", info) + logger.debug("Discovered commissionable Matter node: %s", info) + + if state_change == ServiceStateChange.Added: + asyncio.create_task(handle_commissionable_node_added()) + elif state_change == ServiceStateChange.Removed: + logger.debug("Commissionable Matter node disappeared: %s", info) def _write_node_state(self, node_id: int, force: bool = False) -> None: """Schedule the write of the current node state to persistent storage.""" From 707db6f0b4f1fdb741e2bd0f31d52d8a57524e95 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 6 Mar 2024 09:54:37 +0100 Subject: [PATCH 10/12] fix typo in remove node --- matter_server/server/device_controller.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 3b3b94c1..866ddb7f 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -682,14 +682,12 @@ async def remove_node(self, node_id: int) -> None: return result: Clusters.OperationalCredentials.Commands.NOCResponse | None = None try: - result = await self._call_sdk( - self.chip_controller.SendCommand( - nodeid=node_id, - endpoint=0, - payload=Clusters.OperationalCredentials.Commands.RemoveFabric( - fabricIndex=fabric_index, - ), - ) + result = await self.chip_controller.SendCommand( + nodeid=node_id, + endpoint=0, + payload=Clusters.OperationalCredentials.Commands.RemoveFabric( + fabricIndex=fabric_index, + ), ) except ChipStackError as err: LOGGER.warning( From ae793c17327641ae966eef44ce6e7fdd07be79ca Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 6 Mar 2024 09:56:32 +0100 Subject: [PATCH 11/12] add extra guard for race condition --- matter_server/server/device_controller.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/matter_server/server/device_controller.py b/matter_server/server/device_controller.py index 866ddb7f..d1ad19ae 100644 --- a/matter_server/server/device_controller.py +++ b/matter_server/server/device_controller.py @@ -1295,6 +1295,8 @@ async def handle_commissionable_node_added() -> None: def _write_node_state(self, node_id: int, force: bool = False) -> None: """Schedule the write of the current node state to persistent storage.""" + if node_id not in self._nodes: + return # guard node = self._nodes[node_id] self.server.storage.set( DATA_KEY_NODES, From 4f270a4f820edefce5e0673d3877a262add00a89 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Wed, 6 Mar 2024 10:16:38 +0100 Subject: [PATCH 12/12] adjust PYTHONDEBUG check --- matter_server/server/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/matter_server/server/server.py b/matter_server/server/server.py index bd4338b8..5b29aeb8 100644 --- a/matter_server/server/server.py +++ b/matter_server/server/server.py @@ -133,7 +133,7 @@ async def start(self) -> None: ) self.loop = asyncio.get_running_loop() self.loop.set_exception_handler(_global_loop_exception_handler) - self.loop.set_debug(os.environ.get("PYTHONDEBUG") == "1") + self.loop.set_debug(os.environ.get("PYTHONDEBUG", "") != "") await self.device_controller.initialize() await self.storage.start() await self.device_controller.start()