9
9
from datetime import datetime
10
10
from functools import partial
11
11
import logging
12
+ import time
12
13
from typing import TYPE_CHECKING , Any , Awaitable , Callable , Iterable , TypeVar , cast
13
14
14
15
from chip .ChipDeviceCtrl import DeviceProxyWrapper
60
61
DATA_KEY_LAST_NODE_ID = "last_node_id"
61
62
62
63
LOGGER = logging .getLogger (__name__ )
63
- MAX_POLL_INTERVAL = 600
64
+ NODE_SUBSCRIPTION_CEILING = 30
65
+ NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED = 1800
64
66
MAX_COMMISSION_RETRIES = 3
65
67
NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3
66
68
NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000
@@ -92,6 +94,7 @@ def __init__(
92
94
# we keep the last events in memory so we can include them in the diagnostics dump
93
95
self .event_history : deque [Attribute .EventReadResult ] = deque (maxlen = 25 )
94
96
self ._subscriptions : dict [int , Attribute .SubscriptionTransaction ] = {}
97
+ self ._mdns_last_seen : dict [int , float ] = {}
95
98
self ._nodes : dict [int , MatterNodeData ] = {}
96
99
self ._last_subscription_attempt : dict [int , int ] = {}
97
100
self .wifi_credentials_set : bool = False
@@ -101,9 +104,6 @@ def __init__(
101
104
self ._resolve_lock = asyncio .Lock ()
102
105
self ._aiobrowser : AsyncServiceBrowser | None = None
103
106
self ._aiozc : AsyncZeroconf | None = None
104
- self ._mdns_queues : dict [
105
- str , tuple [asyncio .Queue [ServiceStateChange ], asyncio .Task ]
106
- ] = {}
107
107
108
108
async def initialize (self ) -> None :
109
109
"""Async initialize of controller."""
@@ -175,9 +175,6 @@ async def stop(self) -> None:
175
175
await self ._call_sdk (sub .Shutdown )
176
176
self ._subscriptions = {}
177
177
# shutdown (and cleanup) mdns browser
178
- for key in tuple (self ._mdns_queues .keys ()):
179
- _ , mdns_task = self ._mdns_queues .pop (key )
180
- mdns_task .cancel ()
181
178
if self ._aiobrowser :
182
179
await self ._aiobrowser .async_cancel ()
183
180
if self ._aiozc :
@@ -673,7 +670,7 @@ async def subscribe_attribute(
673
670
The given attribute path(s) will be added to the list of attributes that
674
671
are watched for the given node. This is persistent over restarts.
675
672
"""
676
- LOGGER .warning (
673
+ LOGGER .debug (
677
674
"The subscribe_attribute command has been deprecated and will be removed from"
678
675
" a future version. You no longer need to call this to subscribe to attribute changes."
679
676
)
@@ -755,11 +752,10 @@ async def _subscribe_node(self, node_id: int) -> None:
755
752
node = self ._nodes [node_id ]
756
753
757
754
# check if we already have setup subscriptions for this node,
758
- # if so, we need to unsubscribe first unless nothing changed
759
- # in the attribute paths we want to subscribe.
755
+ # if so, we need to unsubscribe
760
756
if prev_sub := self ._subscriptions .get (node_id , None ):
761
757
async with node_lock :
762
- node_logger .info ("Unsubscribing from existing subscription." )
758
+ node_logger .debug ("Unsubscribing from existing subscription." )
763
759
await self ._call_sdk (prev_sub .Shutdown )
764
760
del self ._subscriptions [node_id ]
765
761
@@ -778,6 +774,7 @@ def attribute_updated_callback(
778
774
path : Attribute .TypedAttributePath ,
779
775
transaction : Attribute .SubscriptionTransaction ,
780
776
) -> None :
777
+ self ._mdns_last_seen [node_id ] = time .time ()
781
778
assert loop is not None
782
779
new_value = transaction .GetAttribute (path )
783
780
# failsafe: ignore ValueDecodeErrors
@@ -905,6 +902,7 @@ def resubscription_succeeded(
905
902
transaction : Attribute .SubscriptionTransaction ,
906
903
) -> None :
907
904
# pylint: disable=unused-argument, invalid-name
905
+ self ._mdns_last_seen [node_id ] = time .time ()
908
906
node_logger .info ("Re-Subscription succeeded" )
909
907
self ._last_subscription_attempt [node_id ] = 0
910
908
# mark node as available and signal consumers
@@ -914,7 +912,11 @@ def resubscription_succeeded(
914
912
915
913
node_logger .info ("Setting up attributes and events subscription." )
916
914
interval_floor = 0
917
- interval_ceiling = 600 if battery_powered else 120
915
+ interval_ceiling = (
916
+ NODE_SUBSCRIPTION_CEILING_BATTERY_POWERED
917
+ if battery_powered
918
+ else NODE_SUBSCRIPTION_CEILING
919
+ )
918
920
self ._last_subscription_attempt [node_id ] = 0
919
921
future = loop .create_future ()
920
922
device = await self ._resolve_node (node_id )
@@ -957,6 +959,7 @@ def resubscription_succeeded(
957
959
tlv_attributes = sub ._readTransaction ._cache .attributeTLVCache
958
960
node .attributes .update (parse_attributes_from_read_result (tlv_attributes ))
959
961
node_logger .info ("Subscription succeeded" )
962
+ self ._mdns_last_seen [node_id ] = time .time ()
960
963
self .server .signal_event (EventType .NODE_UPDATED , node )
961
964
962
965
def _get_next_node_id (self ) -> int :
@@ -1078,58 +1081,54 @@ def _on_mdns_service_state_change(
1078
1081
name : str ,
1079
1082
state_change : ServiceStateChange ,
1080
1083
) -> None :
1084
+ LOGGER .debug ("Received %s MDNS event for %s" , state_change , name )
1081
1085
if service_type == MDNS_TYPE_COMMISSIONABLE_NODE :
1082
1086
asyncio .create_task (
1083
1087
self ._on_mdns_commissionable_node_state (name , state_change )
1084
1088
)
1085
1089
return
1086
1090
if service_type == MDNS_TYPE_OPERATIONAL_NODE :
1087
- name = name .lower ()
1088
- if self .fabric_id_hex not in name :
1089
- # filter out messages that are not for our fabric
1090
- return
1091
- LOGGER .debug ("Received %s MDNS event for %s" , state_change , name )
1092
- if state_change not in (
1093
- ServiceStateChange .Added ,
1094
- ServiceStateChange .Updated ,
1095
- ):
1096
- # we're not interested in removals as this is already
1097
- # handled in the subscription logic
1098
- return
1099
- if existing := self ._mdns_queues .get (name ):
1100
- queue = existing [0 ]
1101
- else :
1102
- # we want mdns messages to be processes sequentially PER NODE but in
1103
- # PARALLEL overall, hence we create a node specific mdns queue per mdns name.
1104
- queue = asyncio .Queue ()
1105
- task = asyncio .create_task (self ._process_mdns_queue (name , queue ))
1106
- self ._mdns_queues [name ] = (queue , task )
1107
- queue .put_nowait (state_change )
1108
-
1109
- async def _process_mdns_queue (
1110
- self , name : str , queue : asyncio .Queue [ServiceStateChange ]
1091
+ self ._on_mdns_operational_node_state (name , state_change )
1092
+
1093
+ def _on_mdns_operational_node_state (
1094
+ self , name : str , state_change : ServiceStateChange
1111
1095
) -> None :
1112
- """Process the incoming MDNS messages of an (operational) Matter node."""
1096
+ """Handle a (operational) Matter node MDNS state change."""
1097
+ name = name .lower ()
1098
+ if self .fabric_id_hex not in name :
1099
+ # filter out messages that are not for our fabric
1100
+ return
1101
+
1102
+ if state_change == ServiceStateChange .Removed :
1103
+ # we're not interested in removals as this is already
1104
+ # handled in the subscription logic
1105
+ return
1106
+
1113
1107
# the mdns name is constructed as [fabricid]-[nodeid]._matter._tcp.local.
1114
1108
# extract the node id from the name
1115
1109
node_id = int (name .split ("-" )[1 ].split ("." )[0 ], 16 )
1116
- while True :
1117
- state_change = await queue .get ()
1118
- if node_id not in self ._nodes :
1119
- continue # this should not happen, but just in case
1120
- node = self ._nodes [node_id ]
1121
- if state_change not in (
1122
- ServiceStateChange .Added ,
1123
- ServiceStateChange .Updated ,
1124
- ):
1125
- # this should be already filtered out, but just in case
1126
- continue
1127
- if node .available :
1128
- # if the node is already set-up, no action is needed
1129
- continue
1130
- LOGGER .info ("Node %s discovered on MDNS" , node_id )
1131
- # setup the node
1132
- await self ._setup_node (node_id )
1110
+
1111
+ if not (node := self ._nodes .get (node_id )):
1112
+ return # this should not happen, but guard just in case
1113
+
1114
+ # mdns events for matter devices arrive in bursts of (duplicate) messages
1115
+ # so we debounce this as we only use the mdns messages for operational node discovery
1116
+ # and we have other logic in place to determine node aliveness
1117
+
1118
+ now = time .time ()
1119
+ last_seen = self ._mdns_last_seen .get (node_id , 0 )
1120
+ self ._mdns_last_seen [node_id ] = now
1121
+ if now - last_seen < NODE_SUBSCRIPTION_CEILING :
1122
+ return
1123
+
1124
+ # we treat UPDATE state changes as ADD if the node is marked as
1125
+ # unavailable to ensure we catch a node being operational
1126
+ if node .available and state_change == ServiceStateChange .Updated :
1127
+ return
1128
+
1129
+ LOGGER .info ("Node %s (re)discovered on MDNS" , node_id )
1130
+ # setup the node - this will (re) setup the subscriptions etc.
1131
+ asyncio .create_task (self ._setup_node (node_id ))
1133
1132
1134
1133
async def _on_mdns_commissionable_node_state (
1135
1134
self , name : str , state_change : ServiceStateChange
0 commit comments