-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement zeroconf for operational node discovery #531
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
"""Controller that Manages Matter devices.""" | ||
|
||
# pylint: disable=too-many-lines | ||
|
||
from __future__ import annotations | ||
|
@@ -8,14 +9,15 @@ | |
from datetime import datetime | ||
from functools import partial | ||
import logging | ||
import random | ||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, TypeVar, cast | ||
|
||
from chip.ChipDeviceCtrl import DeviceProxyWrapper | ||
from chip.clusters import Attribute, Objects as Clusters | ||
from chip.clusters.Attribute import ValueDecodeFailure | ||
from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster | ||
from chip.exceptions import ChipStackError | ||
from zeroconf import IPVersion, ServiceStateChange, Zeroconf | ||
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf | ||
|
||
from matter_server.common.helpers.util import convert_ip_address | ||
from matter_server.common.models import CommissionableNodeData, CommissioningParameters | ||
|
@@ -27,6 +29,7 @@ | |
NodeCommissionFailed, | ||
NodeInterviewFailed, | ||
NodeNotExists, | ||
NodeNotReady, | ||
NodeNotResolving, | ||
) | ||
from ..common.helpers.api import api_command | ||
|
@@ -61,6 +64,10 @@ | |
MAX_POLL_INTERVAL = 600 | ||
MAX_COMMISSION_RETRIES = 3 | ||
|
||
MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local." | ||
MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local." | ||
|
||
|
||
ROUTING_ROLE_ATTRIBUTE_PATH = create_attribute_path_from_attribute( | ||
0, Clusters.ThreadNetworkDiagnostics.Attributes.RoutingRole | ||
) | ||
|
@@ -114,6 +121,9 @@ def __init__( | |
self.compressed_fabric_id: int | None = None | ||
self._node_lock: dict[int, asyncio.Lock] = {} | ||
self._resolve_lock = asyncio.Lock() | ||
self._aiobrowser: AsyncServiceBrowser | None = None | ||
self._aiozc: AsyncZeroconf | None = None | ||
self._mdns_inprogress: set[int] = set() | ||
|
||
async def initialize(self) -> None: | ||
"""Async initialize of controller.""" | ||
|
@@ -150,30 +160,33 @@ async def start(self) -> None: | |
# always mark node as unavailable at startup until subscriptions are ready | ||
node.available = False | ||
self._nodes[node_id] = node | ||
# setup subscription and (re)interview as task in the background | ||
# as we do not want it to block our startup | ||
if not node_dict.get("available"): | ||
# if the node was not available last time we will delay | ||
# the first attempt to initialize so that we prioritize nodes | ||
# that are probably available so they are back online as soon as | ||
# possible and we're not stuck trying to initialize nodes that are offline | ||
self._schedule_interview(node_id, 30) | ||
else: | ||
asyncio.create_task(self._check_interview_and_subscription(node_id)) | ||
# cleanup orhpaned nodes from storage | ||
for node_id_str in orphaned_nodes: | ||
self.server.storage.remove(DATA_KEY_NODES, node_id_str) | ||
LOGGER.info("Loaded %s nodes from stored configuration", len(self._nodes)) | ||
# set-up mdns browser | ||
self._aiozc = AsyncZeroconf(ip_version=IPVersion.All) | ||
services = [MDNS_TYPE_OPERATIONAL_NODE, MDNS_TYPE_COMMISSIONABLE_NODE] | ||
self._aiobrowser = AsyncServiceBrowser( | ||
self._aiozc.zeroconf, | ||
services, | ||
handlers=[self._on_mdns_service_state_change], | ||
) | ||
|
||
async def stop(self) -> None: | ||
"""Handle logic on server stop.""" | ||
if self.chip_controller is None: | ||
raise RuntimeError("Device Controller not initialized.") | ||
|
||
# unsubscribe all node subscriptions | ||
for sub in self._subscriptions.values(): | ||
await self._call_sdk(sub.Shutdown) | ||
self._subscriptions = {} | ||
# shutdown mdns browser | ||
if self._aiobrowser: | ||
await self._aiobrowser.async_cancel() | ||
if self._aiozc: | ||
await self._aiozc.async_close() | ||
# shutdown the sdk device controller | ||
await self._call_sdk(self.chip_controller.Shutdown) | ||
LOGGER.debug("Stopped.") | ||
|
||
|
@@ -246,7 +259,7 @@ async def commission_with_code( | |
while retries: | ||
try: | ||
await self.interview_node(node_id) | ||
except NodeInterviewFailed as err: | ||
except (NodeNotResolving, NodeInterviewFailed) as err: | ||
if retries <= 0: | ||
raise err | ||
retries -= 1 | ||
|
@@ -469,16 +482,16 @@ async def interview_node(self, node_id: int) -> None: | |
fabricFiltered=False, | ||
) | ||
) | ||
except (ChipStackError, NodeNotResolving) as err: | ||
except ChipStackError as err: | ||
raise NodeInterviewFailed(f"Failed to interview node {node_id}") from err | ||
|
||
is_new_node = node_id not in self._nodes | ||
existing_info = self._nodes.get(node_id) | ||
node = MatterNodeData( | ||
node_id=node_id, | ||
date_commissioned=existing_info.date_commissioned | ||
if existing_info | ||
else datetime.utcnow(), | ||
date_commissioned=( | ||
existing_info.date_commissioned if existing_info else datetime.utcnow() | ||
), | ||
last_interview=datetime.utcnow(), | ||
interview_version=SCHEMA_VERSION, | ||
available=True, | ||
|
@@ -519,7 +532,8 @@ async def send_device_command( | |
"""Send a command to a Matter node/device.""" | ||
if self.chip_controller is None: | ||
raise RuntimeError("Device Controller not initialized.") | ||
|
||
if (node := self._nodes.get(node_id)) is None or not node.available: | ||
raise NodeNotReady(f"Node {node_id} is not (yet) available.") | ||
cluster_cls: Cluster = ALL_CLUSTERS[cluster_id] | ||
command_cls = getattr(cluster_cls.Commands, command_name) | ||
command = dataclass_from_dict(command_cls, payload) | ||
|
@@ -541,6 +555,8 @@ async def read_attribute( | |
"""Read a single attribute (or Cluster) on a node.""" | ||
if self.chip_controller is None: | ||
raise RuntimeError("Device Controller not initialized.") | ||
if (node := self._nodes.get(node_id)) is None or not node.available: | ||
raise NodeNotReady(f"Node {node_id} is not (yet) available.") | ||
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) | ||
assert self.server.loop is not None | ||
future = self.server.loop.create_future() | ||
|
@@ -580,6 +596,8 @@ async def write_attribute( | |
"""Write an attribute(value) on a target node.""" | ||
if self.chip_controller is None: | ||
raise RuntimeError("Device Controller not initialized.") | ||
if (node := self._nodes.get(node_id)) is None or not node.available: | ||
raise NodeNotReady(f"Node {node_id} is not (yet) available.") | ||
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path) | ||
attribute = ALL_ATTRIBUTES[cluster_id][attribute_id]() | ||
attribute.value = Clusters.NullValue if value is None else value | ||
|
@@ -803,7 +821,10 @@ async def _subscribe_node(self, node_id: int) -> None: | |
# if so, we need to unsubscribe first unless nothing changed | ||
# in the attribute paths we want to subscribe. | ||
if prev_sub := self._subscriptions.get(node_id, None): | ||
if self._attr_subscriptions.get(node_id) == attr_subscriptions: | ||
if ( | ||
node.available | ||
and self._attr_subscriptions.get(node_id) == attr_subscriptions | ||
): | ||
# the current subscription already matches, no need to re-setup | ||
node_logger.debug("Re-using existing subscription.") | ||
return | ||
|
@@ -937,8 +958,9 @@ def resubscription_attempted( | |
# we debounce it a bit so we only mark the node unavailable | ||
# at the second resubscription attempt | ||
if node.available and self._last_subscription_attempt[node_id] >= 1: | ||
node.available = False | ||
self.server.signal_event(EventType.NODE_UPDATED, node) | ||
# NOTE: if the node is (re)discovered by mdns, that callback will | ||
# take care of resubscribing to the node | ||
asyncio.create_task(self._node_offline(node_id)) | ||
self._last_subscription_attempt[node_id] += 1 | ||
|
||
def resubscription_succeeded( | ||
|
@@ -954,9 +976,7 @@ def resubscription_succeeded( | |
|
||
node_logger.info("Setting up attributes and events subscription.") | ||
interval_floor = 0 | ||
interval_ceiling = ( | ||
random.randint(60, 300) if battery_powered else random.randint(30, 120) | ||
) | ||
interval_ceiling = 300 if battery_powered else 30 | ||
self._last_subscription_attempt[node_id] = 0 | ||
future = loop.create_future() | ||
device = await self._resolve_node(node_id) | ||
|
@@ -1037,6 +1057,13 @@ async def _check_interview_and_subscription( | |
): | ||
try: | ||
await self.interview_node(node_id) | ||
except NodeNotResolving: | ||
LOGGER.warning( | ||
"Unable to interview Node %s as it is unavailable", | ||
node_id, | ||
) | ||
# NOTE: the node will be picked up by mdns discovery automatically | ||
# when it becomes available again. | ||
except NodeInterviewFailed: | ||
LOGGER.warning( | ||
"Unable to interview Node %s, will retry later in the background.", | ||
|
@@ -1059,16 +1086,11 @@ async def _check_interview_and_subscription( | |
await self._subscribe_node(node_id) | ||
except NodeNotResolving: | ||
LOGGER.warning( | ||
"Unable to subscribe to Node %s as it is unavailable, " | ||
"will retry later in the background.", | ||
node_id, | ||
) | ||
# TODO: fix this once OperationalNodeDiscovery is available: | ||
# https://github.com/project-chip/connectedhomeip/pull/26718 | ||
self._schedule_interview( | ||
"Unable to subscribe to Node %s as it is unavailable", | ||
node_id, | ||
min(reschedule_interval + 10, MAX_POLL_INTERVAL), | ||
) | ||
# NOTE: the node will be picked up by mdns discovery automatically | ||
# when it becomes available again. | ||
|
||
def _schedule_interview(self, node_id: int, delay: int) -> None: | ||
"""(Re)Schedule interview and/or initial subscription for a node.""" | ||
|
@@ -1151,6 +1173,65 @@ async def _handle_endpoints_added( | |
{"node_id": node_id, "endpoint_id": endpoint_id}, | ||
) | ||
|
||
def _on_mdns_service_state_change( | ||
self, | ||
zeroconf: Zeroconf, # pylint: disable=unused-argument | ||
service_type: str, | ||
name: str, | ||
state_change: ServiceStateChange, | ||
) -> None: | ||
if state_change not in (ServiceStateChange.Added, ServiceStateChange.Removed): | ||
# we're not interested in update messages so return early | ||
return | ||
if service_type == MDNS_TYPE_COMMISSIONABLE_NODE: | ||
asyncio.create_task( | ||
self._on_mdns_commissionable_node_state(name, state_change) | ||
) | ||
return | ||
if service_type == MDNS_TYPE_OPERATIONAL_NODE: | ||
asyncio.create_task( | ||
self._on_mdns_operational_node_state(name, state_change) | ||
) | ||
|
||
async def _on_mdns_operational_node_state( | ||
self, name: str, state_change: ServiceStateChange | ||
) -> None: | ||
"""Handle a (operational) Matter node MDNS state change.""" | ||
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local. | ||
# extract the node id from the name | ||
node_id = int(name.split("-")[1].split(".")[0], 16) | ||
if node_id not in self._nodes: | ||
return # should not happen, but just in case | ||
if node_id in self._mdns_inprogress: | ||
# mdns records can potentially arrive multiplied so debounce any duplicates | ||
return | ||
try: | ||
self._mdns_inprogress.add(node_id) | ||
node = self._nodes[node_id] | ||
if state_change == ServiceStateChange.Added: | ||
if node.available: | ||
return # node is already set-up, no action needed | ||
LOGGER.info("Node %s discovered on MDNS", node_id) | ||
# setup the node | ||
await self._check_interview_and_subscription(node_id) | ||
elif state_change == ServiceStateChange.Removed: | ||
if not node.available: | ||
return # node is already offline, nothing to do | ||
LOGGER.info("Node %s vanished according to MDNS", node_id) | ||
await self._node_offline(node_id) | ||
finally: | ||
self._mdns_inprogress.remove(node_id) | ||
|
||
async def _on_mdns_commissionable_node_state( | ||
self, name: str, state_change: ServiceStateChange | ||
) -> None: | ||
"""Handle a (commissionable) Matter node MDNS state change.""" | ||
if state_change == ServiceStateChange.Added: | ||
info = AsyncServiceInfo(MDNS_TYPE_COMMISSIONABLE_NODE, name) | ||
assert self._aiozc is not None | ||
await info.async_request(self._aiozc.zeroconf, 3000) | ||
LOGGER.debug("Discovered commissionable Matter node using MDNS: %s", info) | ||
|
||
def _get_node_lock(self, node_id: int) -> asyncio.Lock: | ||
"""Return lock for given node.""" | ||
if node_id not in self._node_lock: | ||
|
@@ -1166,3 +1247,19 @@ def _write_node_state(self, node_id: int, force: bool = False) -> None: | |
subkey=str(node_id), | ||
force=force, | ||
) | ||
|
||
async def _node_offline(self, node_id: int) -> None: | ||
"""Mark node as offline.""" | ||
# Remove and cancel any existing interview/subscription reschedule timer | ||
if existing := self._sub_retry_timer.pop(node_id, None): | ||
existing.cancel() | ||
# shutdown existing subscriptions | ||
if sub := self._subscriptions.pop(node_id, None): | ||
await self._call_sdk(sub.Shutdown) | ||
Comment on lines
+1253
to
+1258
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is also used when deleting the node, I think we should extract it in a common function. |
||
# mark node as unavailable | ||
node = self._nodes[node_id] | ||
if not node.available: | ||
return # nothing to do to | ||
node.available = False | ||
self.server.signal_event(EventType.NODE_UPDATED, node) | ||
LOGGER.info("Marked node %s as offline", node_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried that we get false positives here. But let's see.
Btw, you should store the task somewhere, see also https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task and https://bugs.python.org/issue44665.