Skip to content

Commit b81eca2

Browse files
committed
Merge branch 'wildcard' into resub-fixes
2 parents af5f02f + 0ca4099 commit b81eca2

File tree

1 file changed

+4
-103
lines changed

1 file changed

+4
-103
lines changed

matter_server/server/device_controller.py

+4-103
Original file line numberDiff line numberDiff line change
@@ -73,27 +73,6 @@
7373
0, Clusters.ThreadNetworkDiagnostics.Attributes.RoutingRole
7474
)
7575

76-
BASE_SUBSCRIBE_ATTRIBUTES: tuple[Attribute.AttributePath, ...] = (
77-
# all endpoints, BasicInformation cluster
78-
Attribute.AttributePath(
79-
EndpointId=None, ClusterId=Clusters.BasicInformation.id, Attribute=None
80-
),
81-
# all endpoints, BridgedDeviceBasicInformation (bridges only)
82-
Attribute.AttributePath(
83-
EndpointId=None,
84-
ClusterId=Clusters.BridgedDeviceBasicInformation.id,
85-
Attribute=None,
86-
),
87-
# networkinterfaces attribute on general diagnostics cluster,
88-
# so we have the most accurate IP addresses for ping/diagnostics
89-
Attribute.AttributePath(
90-
EndpointId=0, Attribute=Clusters.GeneralDiagnostics.Attributes.NetworkInterfaces
91-
),
92-
# active fabrics attribute - to speedup node diagnostics
93-
Attribute.AttributePath(
94-
EndpointId=0, Attribute=Clusters.OperationalCredentials.Attributes.Fabrics
95-
),
96-
)
9776

9877
# pylint: disable=too-many-lines,too-many-locals,too-many-statements,too-many-branches,too-many-instance-attributes
9978

@@ -113,9 +92,6 @@ def __init__(
11392
# we keep the last events in memory so we can include them in the diagnostics dump
11493
self.event_history: deque[Attribute.EventReadResult] = deque(maxlen=25)
11594
self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {}
116-
self._attr_subscriptions: dict[int, list[Attribute.AttributePath]] = {}
117-
self._resub_debounce_timer: dict[int, asyncio.TimerHandle] = {}
118-
self._sub_retry_timer: dict[int, asyncio.TimerHandle] = {}
11995
self._nodes: dict[int, MatterNodeData] = {}
12096
self._last_subscription_attempt: dict[int, int] = {}
12197
self.wifi_credentials_set: bool = False
@@ -640,10 +616,6 @@ async def remove_node(self, node_id: int) -> None:
640616

641617
LOGGER.info("Removing Node ID %s.", node_id)
642618

643-
# Remove and cancel any existing interview/subscription reschedule timer
644-
if existing := self._sub_retry_timer.pop(node_id, None):
645-
existing.cancel()
646-
647619
# shutdown any existing subscriptions
648620
if sub := self._subscriptions.pop(node_id, None):
649621
await self._call_sdk(sub.Shutdown)
@@ -701,40 +673,9 @@ async def subscribe_attribute(
701673
The given attribute path(s) will be added to the list of attributes that
702674
are watched for the given node. This is persistent over restarts.
703675
"""
704-
if self.chip_controller is None:
705-
raise RuntimeError("Device Controller not initialized.")
706-
707-
if node_id not in self._nodes:
708-
raise NodeNotExists(
709-
f"Node {node_id} does not exist or has not been interviewed."
710-
)
711-
712-
node = self._nodes[node_id]
713-
assert node is not None
714-
715-
# work out added subscriptions
716-
if not isinstance(attribute_path, list):
717-
attribute_path = [attribute_path]
718-
attribute_paths = {parse_attribute_path(x) for x in attribute_path}
719-
prev_subs = set(node.attribute_subscriptions)
720-
node.attribute_subscriptions.update(attribute_paths)
721-
if prev_subs == node.attribute_subscriptions:
722-
return # nothing to do
723-
# save updated node data
724-
self._write_node_state(node_id)
725-
726-
# (re)setup node subscription
727-
# this could potentially be called multiple times within a short timeframe
728-
# so debounce it a bit
729-
def resubscribe() -> None:
730-
self._resub_debounce_timer.pop(node_id, None)
731-
asyncio.create_task(self._subscribe_node(node_id))
732-
733-
if existing_timer := self._resub_debounce_timer.pop(node_id, None):
734-
existing_timer.cancel()
735-
assert self.server.loop is not None
736-
self._resub_debounce_timer[node_id] = self.server.loop.call_later(
737-
5, resubscribe
676+
LOGGER.warning(
677+
"The subscribe_attribute command has been deprecated and will be removed from"
678+
" a future version. You no longer need to call this to subscribe to attribute changes."
738679
)
739680

740681
@api_command(APICommand.PING_NODE)
@@ -813,52 +754,15 @@ async def _subscribe_node(self, node_id: int) -> None:
813754
node_lock = self._get_node_lock(node_id)
814755
node = self._nodes[node_id]
815756

816-
# work out all (current) attribute subscriptions
817-
attr_subscriptions: list[Attribute.AttributePath] = list(
818-
BASE_SUBSCRIBE_ATTRIBUTES
819-
)
820-
for (
821-
endpoint_id,
822-
cluster_id,
823-
attribute_id,
824-
) in node.attribute_subscriptions:
825-
attr_path = Attribute.AttributePath(
826-
EndpointId=endpoint_id,
827-
ClusterId=cluster_id,
828-
AttributeId=attribute_id,
829-
)
830-
if attr_path in attr_subscriptions:
831-
continue
832-
attr_subscriptions.append(attr_path)
833-
834-
if node.is_bridge or len(attr_subscriptions) > 9:
835-
# A matter device can officially only handle 3 attribute paths per subscription
836-
# and a maximum of 3 concurrent subscriptions per fabric.
837-
# We cheat a bit here and use one single subscription for up to 9 paths,
838-
# because in our experience that is more stable than multiple subscriptions
839-
# to the same device. If we have more than 9 paths to watch for a node,
840-
# we switch to a wildcard subscription.
841-
attr_subscriptions = [Attribute.AttributePath()] # wildcard
842-
843757
# check if we already have setup subscriptions for this node,
844758
# if so, we need to unsubscribe first unless nothing changed
845759
# in the attribute paths we want to subscribe.
846760
if prev_sub := self._subscriptions.get(node_id, None):
847-
if (
848-
node.available
849-
and self._attr_subscriptions.get(node_id) == attr_subscriptions
850-
):
851-
# the current subscription already matches, no need to re-setup
852-
node_logger.debug("Re-using existing subscription.")
853-
return
854761
async with node_lock:
855762
node_logger.info("Unsubscribing from existing subscription.")
856763
await self._call_sdk(prev_sub.Shutdown)
857764
del self._subscriptions[node_id]
858765

859-
# store our list of subscriptions for this node
860-
self._attr_subscriptions[node_id] = attr_subscriptions
861-
862766
# determine if node is battery powered sleeping device
863767
# Endpoint 0, ThreadNetworkDiagnostics Cluster, routingRole attribute
864768
battery_powered = (
@@ -1020,7 +924,7 @@ def resubscription_succeeded(
1020924
eventLoop=loop,
1021925
device=device.deviceProxy,
1022926
devCtrl=self.chip_controller,
1023-
attributes=attr_subscriptions,
927+
attributes=[Attribute.AttributePath()], # wildcard
1024928
events=[
1025929
Attribute.EventPath(
1026930
EndpointId=None, Cluster=None, Event=None, Urgent=1
@@ -1255,9 +1159,6 @@ def _write_node_state(self, node_id: int, force: bool = False) -> None:
12551159

12561160
async def _node_offline(self, node_id: int) -> None:
12571161
"""Mark node as offline."""
1258-
# Remove and cancel any existing interview/subscription reschedule timer
1259-
if existing := self._sub_retry_timer.pop(node_id, None):
1260-
existing.cancel()
12611162
# shutdown existing subscriptions
12621163
if sub := self._subscriptions.pop(node_id, None):
12631164
await self._call_sdk(sub.Shutdown)

0 commit comments

Comments
 (0)