1
1
"""Controller that Manages Matter devices."""
2
+
2
3
# pylint: disable=too-many-lines
3
4
4
5
from __future__ import annotations
8
9
from datetime import datetime
9
10
from functools import partial
10
11
import logging
11
- import random
12
12
from typing import TYPE_CHECKING , Any , Awaitable , Callable , Iterable , TypeVar , cast
13
13
14
14
from chip .ChipDeviceCtrl import DeviceProxyWrapper
15
15
from chip .clusters import Attribute , Objects as Clusters
16
16
from chip .clusters .Attribute import ValueDecodeFailure
17
17
from chip .clusters .ClusterObjects import ALL_ATTRIBUTES , ALL_CLUSTERS , Cluster
18
18
from chip .exceptions import ChipStackError
19
+ from zeroconf import IPVersion , ServiceStateChange , Zeroconf
20
+ from zeroconf .asyncio import AsyncServiceBrowser , AsyncServiceInfo , AsyncZeroconf
19
21
20
22
from matter_server .common .helpers .util import convert_ip_address
21
23
from matter_server .common .models import CommissionableNodeData , CommissioningParameters
27
29
NodeCommissionFailed ,
28
30
NodeInterviewFailed ,
29
31
NodeNotExists ,
32
+ NodeNotReady ,
30
33
NodeNotResolving ,
31
34
)
32
35
from ..common .helpers .api import api_command
61
64
MAX_POLL_INTERVAL = 600
62
65
MAX_COMMISSION_RETRIES = 3
63
66
67
+ MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local."
68
+ MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local."
69
+
70
+
64
71
ROUTING_ROLE_ATTRIBUTE_PATH = create_attribute_path_from_attribute (
65
72
0 , Clusters .ThreadNetworkDiagnostics .Attributes .RoutingRole
66
73
)
@@ -114,6 +121,9 @@ def __init__(
114
121
self .compressed_fabric_id : int | None = None
115
122
self ._node_lock : dict [int , asyncio .Lock ] = {}
116
123
self ._resolve_lock = asyncio .Lock ()
124
+ self ._aiobrowser : AsyncServiceBrowser | None = None
125
+ self ._aiozc : AsyncZeroconf | None = None
126
+ self ._mdns_inprogress : set [int ] = set ()
117
127
118
128
async def initialize (self ) -> None :
119
129
"""Async initialize of controller."""
@@ -150,30 +160,33 @@ async def start(self) -> None:
150
160
# always mark node as unavailable at startup until subscriptions are ready
151
161
node .available = False
152
162
self ._nodes [node_id ] = node
153
- # setup subscription and (re)interview as task in the background
154
- # as we do not want it to block our startup
155
- if not node_dict .get ("available" ):
156
- # if the node was not available last time we will delay
157
- # the first attempt to initialize so that we prioritize nodes
158
- # that are probably available so they are back online as soon as
159
- # possible and we're not stuck trying to initialize nodes that are offline
160
- self ._schedule_interview (node_id , 30 )
161
- else :
162
- asyncio .create_task (self ._check_interview_and_subscription (node_id ))
163
163
# cleanup orhpaned nodes from storage
164
164
for node_id_str in orphaned_nodes :
165
165
self .server .storage .remove (DATA_KEY_NODES , node_id_str )
166
166
LOGGER .info ("Loaded %s nodes from stored configuration" , len (self ._nodes ))
167
+ # set-up mdns browser
168
+ self ._aiozc = AsyncZeroconf (ip_version = IPVersion .All )
169
+ services = [MDNS_TYPE_OPERATIONAL_NODE , MDNS_TYPE_COMMISSIONABLE_NODE ]
170
+ self ._aiobrowser = AsyncServiceBrowser (
171
+ self ._aiozc .zeroconf ,
172
+ services ,
173
+ handlers = [self ._on_mdns_service_state_change ],
174
+ )
167
175
168
176
async def stop (self ) -> None :
169
177
"""Handle logic on server stop."""
170
178
if self .chip_controller is None :
171
179
raise RuntimeError ("Device Controller not initialized." )
172
-
173
180
# unsubscribe all node subscriptions
174
181
for sub in self ._subscriptions .values ():
175
182
await self ._call_sdk (sub .Shutdown )
176
183
self ._subscriptions = {}
184
+ # shutdown mdns browser
185
+ if self ._aiobrowser :
186
+ await self ._aiobrowser .async_cancel ()
187
+ if self ._aiozc :
188
+ await self ._aiozc .async_close ()
189
+ # shutdown the sdk device controller
177
190
await self ._call_sdk (self .chip_controller .Shutdown )
178
191
LOGGER .debug ("Stopped." )
179
192
@@ -246,7 +259,7 @@ async def commission_with_code(
246
259
while retries :
247
260
try :
248
261
await self .interview_node (node_id )
249
- except NodeInterviewFailed as err :
262
+ except ( NodeNotResolving , NodeInterviewFailed ) as err :
250
263
if retries <= 0 :
251
264
raise err
252
265
retries -= 1
@@ -469,16 +482,16 @@ async def interview_node(self, node_id: int) -> None:
469
482
fabricFiltered = False ,
470
483
)
471
484
)
472
- except ( ChipStackError , NodeNotResolving ) as err :
485
+ except ChipStackError as err :
473
486
raise NodeInterviewFailed (f"Failed to interview node { node_id } " ) from err
474
487
475
488
is_new_node = node_id not in self ._nodes
476
489
existing_info = self ._nodes .get (node_id )
477
490
node = MatterNodeData (
478
491
node_id = node_id ,
479
- date_commissioned = existing_info . date_commissioned
480
- if existing_info
481
- else datetime . utcnow ( ),
492
+ date_commissioned = (
493
+ existing_info . date_commissioned if existing_info else datetime . utcnow ()
494
+ ),
482
495
last_interview = datetime .utcnow (),
483
496
interview_version = SCHEMA_VERSION ,
484
497
available = True ,
@@ -519,7 +532,8 @@ async def send_device_command(
519
532
"""Send a command to a Matter node/device."""
520
533
if self .chip_controller is None :
521
534
raise RuntimeError ("Device Controller not initialized." )
522
-
535
+ if (node := self ._nodes .get (node_id )) is None or not node .available :
536
+ raise NodeNotReady (f"Node { node_id } is not (yet) available." )
523
537
cluster_cls : Cluster = ALL_CLUSTERS [cluster_id ]
524
538
command_cls = getattr (cluster_cls .Commands , command_name )
525
539
command = dataclass_from_dict (command_cls , payload )
@@ -541,6 +555,8 @@ async def read_attribute(
541
555
"""Read a single attribute (or Cluster) on a node."""
542
556
if self .chip_controller is None :
543
557
raise RuntimeError ("Device Controller not initialized." )
558
+ if (node := self ._nodes .get (node_id )) is None or not node .available :
559
+ raise NodeNotReady (f"Node { node_id } is not (yet) available." )
544
560
endpoint_id , cluster_id , attribute_id = parse_attribute_path (attribute_path )
545
561
assert self .server .loop is not None
546
562
future = self .server .loop .create_future ()
@@ -580,6 +596,8 @@ async def write_attribute(
580
596
"""Write an attribute(value) on a target node."""
581
597
if self .chip_controller is None :
582
598
raise RuntimeError ("Device Controller not initialized." )
599
+ if (node := self ._nodes .get (node_id )) is None or not node .available :
600
+ raise NodeNotReady (f"Node { node_id } is not (yet) available." )
583
601
endpoint_id , cluster_id , attribute_id = parse_attribute_path (attribute_path )
584
602
attribute = ALL_ATTRIBUTES [cluster_id ][attribute_id ]()
585
603
attribute .value = Clusters .NullValue if value is None else value
@@ -803,7 +821,10 @@ async def _subscribe_node(self, node_id: int) -> None:
803
821
# if so, we need to unsubscribe first unless nothing changed
804
822
# in the attribute paths we want to subscribe.
805
823
if prev_sub := self ._subscriptions .get (node_id , None ):
806
- if self ._attr_subscriptions .get (node_id ) == attr_subscriptions :
824
+ if (
825
+ node .available
826
+ and self ._attr_subscriptions .get (node_id ) == attr_subscriptions
827
+ ):
807
828
# the current subscription already matches, no need to re-setup
808
829
node_logger .debug ("Re-using existing subscription." )
809
830
return
@@ -938,6 +959,8 @@ def resubscription_attempted(
938
959
# at the second resubscription attempt
939
960
if node .available and self ._last_subscription_attempt [node_id ] >= 1 :
940
961
node .available = False
962
+ # NOTE: if the node is (re)discovered bt mdns, that callback will
963
+ # take care of resubscribing to the node
941
964
self .server .signal_event (EventType .NODE_UPDATED , node )
942
965
self ._last_subscription_attempt [node_id ] += 1
943
966
@@ -954,9 +977,7 @@ def resubscription_succeeded(
954
977
955
978
node_logger .info ("Setting up attributes and events subscription." )
956
979
interval_floor = 0
957
- interval_ceiling = (
958
- random .randint (60 , 300 ) if battery_powered else random .randint (30 , 120 )
959
- )
980
+ interval_ceiling = 3600 if battery_powered else 30
960
981
self ._last_subscription_attempt [node_id ] = 0
961
982
future = loop .create_future ()
962
983
device = await self ._resolve_node (node_id )
@@ -1037,6 +1058,13 @@ async def _check_interview_and_subscription(
1037
1058
):
1038
1059
try :
1039
1060
await self .interview_node (node_id )
1061
+ except NodeNotResolving :
1062
+ LOGGER .warning (
1063
+ "Unable to interview Node %s as it is unavailable" ,
1064
+ node_id ,
1065
+ )
1066
+ # NOTE: the node will be picked up by mdns discovery automatically
1067
+ # when it becomes available again.
1040
1068
except NodeInterviewFailed :
1041
1069
LOGGER .warning (
1042
1070
"Unable to interview Node %s, will retry later in the background." ,
@@ -1059,16 +1087,11 @@ async def _check_interview_and_subscription(
1059
1087
await self ._subscribe_node (node_id )
1060
1088
except NodeNotResolving :
1061
1089
LOGGER .warning (
1062
- "Unable to subscribe to Node %s as it is unavailable, "
1063
- "will retry later in the background." ,
1064
- node_id ,
1065
- )
1066
- # TODO: fix this once OperationalNodeDiscovery is available:
1067
- # https://github.com/project-chip/connectedhomeip/pull/26718
1068
- self ._schedule_interview (
1090
+ "Unable to subscribe to Node %s as it is unavailable" ,
1069
1091
node_id ,
1070
- min (reschedule_interval + 10 , MAX_POLL_INTERVAL ),
1071
1092
)
1093
+ # NOTE: the node will be picked up by mdns discovery automatically
1094
+ # when it becomes available again.
1072
1095
1073
1096
def _schedule_interview (self , node_id : int , delay : int ) -> None :
1074
1097
"""(Re)Schedule interview and/or initial subscription for a node."""
@@ -1151,6 +1174,73 @@ async def _handle_endpoints_added(
1151
1174
{"node_id" : node_id , "endpoint_id" : endpoint_id },
1152
1175
)
1153
1176
1177
+ def _on_mdns_service_state_change (
1178
+ self ,
1179
+ zeroconf : Zeroconf , # pylint: disable=unused-argument
1180
+ service_type : str ,
1181
+ name : str ,
1182
+ state_change : ServiceStateChange ,
1183
+ ) -> None :
1184
+ if state_change not in (ServiceStateChange .Added , ServiceStateChange .Removed ):
1185
+ # we're not interested in update messages so return early
1186
+ return
1187
+ if service_type == MDNS_TYPE_COMMISSIONABLE_NODE :
1188
+ asyncio .create_task (
1189
+ self ._on_mdns_commissionable_node_state (name , state_change )
1190
+ )
1191
+ return
1192
+ if service_type == MDNS_TYPE_OPERATIONAL_NODE :
1193
+ asyncio .create_task (
1194
+ self ._on_mdns_operational_node_state (name , state_change )
1195
+ )
1196
+
1197
+ async def _on_mdns_operational_node_state (
1198
+ self , name : str , state_change : ServiceStateChange
1199
+ ) -> None :
1200
+ """Handle a (operational) Matter node MDNS state change."""
1201
+ # the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
1202
+ # extract the node id from the name
1203
+ node_id = int (name .split ("-" )[1 ].split ("." )[0 ], 16 )
1204
+ if node_id not in self ._nodes :
1205
+ return # should not happen, but just in case
1206
+ if node_id in self ._mdns_inprogress :
1207
+ # mdns records can potentially arrive multiplied so debounce any duplicates
1208
+ return
1209
+ try :
1210
+ self ._mdns_inprogress .add (node_id )
1211
+ node = self ._nodes [node_id ]
1212
+ if state_change == ServiceStateChange .Added :
1213
+ if node .available :
1214
+ return # node is already set-up, no action needed
1215
+ LOGGER .info ("Node %s discovered on MDNS" , node_id )
1216
+ # setup the node
1217
+ await self ._check_interview_and_subscription (node_id )
1218
+ elif state_change == ServiceStateChange .Removed :
1219
+ if not node .available :
1220
+ return # node is already offline, nothing to do
1221
+ LOGGER .info ("Node %s vanished according to MDNS" , node_id )
1222
+ # Remove and cancel any existing interview/subscription reschedule timer
1223
+ if existing := self ._sub_retry_timer .pop (node_id , None ):
1224
+ existing .cancel ()
1225
+ # shutdown existing subscriptions
1226
+ if sub := self ._subscriptions .pop (node_id , None ):
1227
+ await self ._call_sdk (sub .Shutdown )
1228
+ # mark node as unavailable
1229
+ node .available = False
1230
+ self .server .signal_event (EventType .NODE_UPDATED , node_id )
1231
+ finally :
1232
+ self ._mdns_inprogress .remove (node_id )
1233
+
1234
+ async def _on_mdns_commissionable_node_state (
1235
+ self , name : str , state_change : ServiceStateChange
1236
+ ) -> None :
1237
+ """Handle a (commissionable) Matter node MDNS state change."""
1238
+ if state_change == ServiceStateChange .Added :
1239
+ info = AsyncServiceInfo (MDNS_TYPE_COMMISSIONABLE_NODE , name )
1240
+ assert self ._aiozc is not None
1241
+ await info .async_request (self ._aiozc .zeroconf , 3000 )
1242
+ LOGGER .debug ("Discovered commissionable Matter node using MDNS: %s" , info )
1243
+
1154
1244
def _get_node_lock (self , node_id : int ) -> asyncio .Lock :
1155
1245
"""Return lock for given node."""
1156
1246
if node_id not in self ._node_lock :
0 commit comments