Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zeroconf for operational node discovery #531

Merged
merged 5 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 129 additions & 32 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Controller that Manages Matter devices."""

# pylint: disable=too-many-lines

from __future__ import annotations
Expand All @@ -8,14 +9,15 @@
from datetime import datetime
from functools import partial
import logging
import random
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable, TypeVar, cast

from chip.ChipDeviceCtrl import DeviceProxyWrapper
from chip.clusters import Attribute, Objects as Clusters
from chip.clusters.Attribute import ValueDecodeFailure
from chip.clusters.ClusterObjects import ALL_ATTRIBUTES, ALL_CLUSTERS, Cluster
from chip.exceptions import ChipStackError
from zeroconf import IPVersion, ServiceStateChange, Zeroconf
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf

from matter_server.common.helpers.util import convert_ip_address
from matter_server.common.models import CommissionableNodeData, CommissioningParameters
Expand All @@ -27,6 +29,7 @@
NodeCommissionFailed,
NodeInterviewFailed,
NodeNotExists,
NodeNotReady,
NodeNotResolving,
)
from ..common.helpers.api import api_command
Expand Down Expand Up @@ -61,6 +64,10 @@
MAX_POLL_INTERVAL = 600
MAX_COMMISSION_RETRIES = 3

MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local."
MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local."


ROUTING_ROLE_ATTRIBUTE_PATH = create_attribute_path_from_attribute(
0, Clusters.ThreadNetworkDiagnostics.Attributes.RoutingRole
)
Expand Down Expand Up @@ -114,6 +121,9 @@ def __init__(
self.compressed_fabric_id: int | None = None
self._node_lock: dict[int, asyncio.Lock] = {}
self._resolve_lock = asyncio.Lock()
self._aiobrowser: AsyncServiceBrowser | None = None
self._aiozc: AsyncZeroconf | None = None
self._mdns_inprogress: set[int] = set()

async def initialize(self) -> None:
"""Async initialize of controller."""
Expand Down Expand Up @@ -150,30 +160,33 @@ async def start(self) -> None:
# always mark node as unavailable at startup until subscriptions are ready
node.available = False
self._nodes[node_id] = node
# setup subscription and (re)interview as task in the background
# as we do not want it to block our startup
if not node_dict.get("available"):
# if the node was not available last time we will delay
# the first attempt to initialize so that we prioritize nodes
# that are probably available so they are back online as soon as
# possible and we're not stuck trying to initialize nodes that are offline
self._schedule_interview(node_id, 30)
else:
asyncio.create_task(self._check_interview_and_subscription(node_id))
# cleanup orhpaned nodes from storage
for node_id_str in orphaned_nodes:
self.server.storage.remove(DATA_KEY_NODES, node_id_str)
LOGGER.info("Loaded %s nodes from stored configuration", len(self._nodes))
# set-up mdns browser
self._aiozc = AsyncZeroconf(ip_version=IPVersion.All)
services = [MDNS_TYPE_OPERATIONAL_NODE, MDNS_TYPE_COMMISSIONABLE_NODE]
self._aiobrowser = AsyncServiceBrowser(
self._aiozc.zeroconf,
services,
handlers=[self._on_mdns_service_state_change],
)

async def stop(self) -> None:
"""Handle logic on server stop."""
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")

# unsubscribe all node subscriptions
for sub in self._subscriptions.values():
await self._call_sdk(sub.Shutdown)
self._subscriptions = {}
# shutdown mdns browser
if self._aiobrowser:
await self._aiobrowser.async_cancel()
if self._aiozc:
await self._aiozc.async_close()
# shutdown the sdk device controller
await self._call_sdk(self.chip_controller.Shutdown)
LOGGER.debug("Stopped.")

Expand Down Expand Up @@ -246,7 +259,7 @@ async def commission_with_code(
while retries:
try:
await self.interview_node(node_id)
except NodeInterviewFailed as err:
except (NodeNotResolving, NodeInterviewFailed) as err:
if retries <= 0:
raise err
retries -= 1
Expand Down Expand Up @@ -469,16 +482,16 @@ async def interview_node(self, node_id: int) -> None:
fabricFiltered=False,
)
)
except (ChipStackError, NodeNotResolving) as err:
except ChipStackError as err:
raise NodeInterviewFailed(f"Failed to interview node {node_id}") from err

is_new_node = node_id not in self._nodes
existing_info = self._nodes.get(node_id)
node = MatterNodeData(
node_id=node_id,
date_commissioned=existing_info.date_commissioned
if existing_info
else datetime.utcnow(),
date_commissioned=(
existing_info.date_commissioned if existing_info else datetime.utcnow()
),
last_interview=datetime.utcnow(),
interview_version=SCHEMA_VERSION,
available=True,
Expand Down Expand Up @@ -519,7 +532,8 @@ async def send_device_command(
"""Send a command to a Matter node/device."""
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")

if (node := self._nodes.get(node_id)) is None or not node.available:
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
cluster_cls: Cluster = ALL_CLUSTERS[cluster_id]
command_cls = getattr(cluster_cls.Commands, command_name)
command = dataclass_from_dict(command_cls, payload)
Expand All @@ -541,6 +555,8 @@ async def read_attribute(
"""Read a single attribute (or Cluster) on a node."""
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")
if (node := self._nodes.get(node_id)) is None or not node.available:
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path)
assert self.server.loop is not None
future = self.server.loop.create_future()
Expand Down Expand Up @@ -580,6 +596,8 @@ async def write_attribute(
"""Write an attribute(value) on a target node."""
if self.chip_controller is None:
raise RuntimeError("Device Controller not initialized.")
if (node := self._nodes.get(node_id)) is None or not node.available:
raise NodeNotReady(f"Node {node_id} is not (yet) available.")
endpoint_id, cluster_id, attribute_id = parse_attribute_path(attribute_path)
attribute = ALL_ATTRIBUTES[cluster_id][attribute_id]()
attribute.value = Clusters.NullValue if value is None else value
Expand Down Expand Up @@ -803,7 +821,10 @@ async def _subscribe_node(self, node_id: int) -> None:
# if so, we need to unsubscribe first unless nothing changed
# in the attribute paths we want to subscribe.
if prev_sub := self._subscriptions.get(node_id, None):
if self._attr_subscriptions.get(node_id) == attr_subscriptions:
if (
node.available
and self._attr_subscriptions.get(node_id) == attr_subscriptions
):
# the current subscription already matches, no need to re-setup
node_logger.debug("Re-using existing subscription.")
return
Expand Down Expand Up @@ -937,8 +958,9 @@ def resubscription_attempted(
# we debounce it a bit so we only mark the node unavailable
# at the second resubscription attempt
if node.available and self._last_subscription_attempt[node_id] >= 1:
node.available = False
self.server.signal_event(EventType.NODE_UPDATED, node)
# NOTE: if the node is (re)discovered by mdns, that callback will
# take care of resubscribing to the node
asyncio.create_task(self._node_offline(node_id))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried that we get false positives here. But let's see.

Btw, you should store the task somewhere, see also https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task and https://bugs.python.org/issue44665.

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done. For reliable “fire-and-forget” background tasks, gather them in a collection:

self._last_subscription_attempt[node_id] += 1

def resubscription_succeeded(
Expand All @@ -954,9 +976,7 @@ def resubscription_succeeded(

node_logger.info("Setting up attributes and events subscription.")
interval_floor = 0
interval_ceiling = (
random.randint(60, 300) if battery_powered else random.randint(30, 120)
)
interval_ceiling = 300 if battery_powered else 30
self._last_subscription_attempt[node_id] = 0
future = loop.create_future()
device = await self._resolve_node(node_id)
Expand Down Expand Up @@ -1037,6 +1057,13 @@ async def _check_interview_and_subscription(
):
try:
await self.interview_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to interview Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
except NodeInterviewFailed:
LOGGER.warning(
"Unable to interview Node %s, will retry later in the background.",
Expand All @@ -1059,16 +1086,11 @@ async def _check_interview_and_subscription(
await self._subscribe_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to subscribe to Node %s as it is unavailable, "
"will retry later in the background.",
node_id,
)
# TODO: fix this once OperationalNodeDiscovery is available:
# https://github.com/project-chip/connectedhomeip/pull/26718
self._schedule_interview(
"Unable to subscribe to Node %s as it is unavailable",
node_id,
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.

def _schedule_interview(self, node_id: int, delay: int) -> None:
"""(Re)Schedule interview and/or initial subscription for a node."""
Expand Down Expand Up @@ -1151,6 +1173,65 @@ async def _handle_endpoints_added(
{"node_id": node_id, "endpoint_id": endpoint_id},
)

def _on_mdns_service_state_change(
self,
zeroconf: Zeroconf, # pylint: disable=unused-argument
service_type: str,
name: str,
state_change: ServiceStateChange,
) -> None:
if state_change not in (ServiceStateChange.Added, ServiceStateChange.Removed):
# we're not interested in update messages so return early
return
if service_type == MDNS_TYPE_COMMISSIONABLE_NODE:
asyncio.create_task(
self._on_mdns_commissionable_node_state(name, state_change)
)
return
if service_type == MDNS_TYPE_OPERATIONAL_NODE:
asyncio.create_task(
self._on_mdns_operational_node_state(name, state_change)
)

async def _on_mdns_operational_node_state(
self, name: str, state_change: ServiceStateChange
) -> None:
"""Handle a (operational) Matter node MDNS state change."""
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
# extract the node id from the name
node_id = int(name.split("-")[1].split(".")[0], 16)
if node_id not in self._nodes:
return # should not happen, but just in case
if node_id in self._mdns_inprogress:
# mdns records can potentially arrive multiplied so debounce any duplicates
return
try:
self._mdns_inprogress.add(node_id)
node = self._nodes[node_id]
if state_change == ServiceStateChange.Added:
if node.available:
return # node is already set-up, no action needed
LOGGER.info("Node %s discovered on MDNS", node_id)
# setup the node
await self._check_interview_and_subscription(node_id)
elif state_change == ServiceStateChange.Removed:
if not node.available:
return # node is already offline, nothing to do
LOGGER.info("Node %s vanished according to MDNS", node_id)
await self._node_offline(node_id)
finally:
self._mdns_inprogress.remove(node_id)

async def _on_mdns_commissionable_node_state(
self, name: str, state_change: ServiceStateChange
) -> None:
"""Handle a (commissionable) Matter node MDNS state change."""
if state_change == ServiceStateChange.Added:
info = AsyncServiceInfo(MDNS_TYPE_COMMISSIONABLE_NODE, name)
assert self._aiozc is not None
await info.async_request(self._aiozc.zeroconf, 3000)
LOGGER.debug("Discovered commissionable Matter node using MDNS: %s", info)

def _get_node_lock(self, node_id: int) -> asyncio.Lock:
"""Return lock for given node."""
if node_id not in self._node_lock:
Expand All @@ -1166,3 +1247,19 @@ def _write_node_state(self, node_id: int, force: bool = False) -> None:
subkey=str(node_id),
force=force,
)

async def _node_offline(self, node_id: int) -> None:
"""Mark node as offline."""
# Remove and cancel any existing interview/subscription reschedule timer
if existing := self._sub_retry_timer.pop(node_id, None):
existing.cancel()
# shutdown existing subscriptions
if sub := self._subscriptions.pop(node_id, None):
await self._call_sdk(sub.Shutdown)
Comment on lines +1253 to +1258
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is also used when deleting the node, I think we should extract it in a common function.

# mark node as unavailable
node = self._nodes[node_id]
if not node.available:
return # nothing to do to
node.available = False
self.server.signal_event(EventType.NODE_UPDATED, node)
LOGGER.info("Marked node %s as offline", node_id)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies = [
[project.optional-dependencies]
server = [
"home-assistant-chip-core==2024.1.0",
"cryptography==42.0.2"
"cryptography==42.0.2",
"zeroconf==0.131.0"
]
test = [
"black==23.12.1",
Expand Down
Loading