Skip to content

Commit 64e9da9

Browse files
authored
Restore single subscription to a node instead of dual (#481)
1 parent 7b5b82f commit 64e9da9

File tree

1 file changed

+68
-108
lines changed

1 file changed

+68
-108
lines changed

matter_server/server/device_controller.py

+68-108
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,7 @@ def __init__(
8282
self.server = server
8383
# we keep the last events in memory so we can include them in the diagnostics dump
8484
self.event_history: deque[Attribute.EventReadResult] = deque(maxlen=25)
85-
self._subscriptions: dict[
86-
int,
87-
tuple[Attribute.SubscriptionTransaction, Attribute.SubscriptionTransaction],
88-
] = {}
85+
self._subscriptions: dict[int, Attribute.SubscriptionTransaction] = {}
8986
self._attr_subscriptions: dict[int, list[Attribute.AttributePath]] = {}
9087
self._resub_debounce_timer: dict[int, asyncio.TimerHandle] = {}
9188
self._sub_retry_timer: dict[int, asyncio.TimerHandle] = {}
@@ -148,9 +145,8 @@ async def stop(self) -> None:
148145
raise RuntimeError("Device Controller not initialized.")
149146

150147
# unsubscribe all node subscriptions
151-
for subs in self._subscriptions.values():
152-
for sub in subs:
153-
await self._call_sdk(sub.Shutdown)
148+
for sub in self._subscriptions.values():
149+
await self._call_sdk(sub.Shutdown)
154150
self._subscriptions = {}
155151
await self._call_sdk(self.chip_controller.Shutdown)
156152
LOGGER.debug("Stopped.")
@@ -514,9 +510,8 @@ async def remove_node(self, node_id: int) -> None:
514510
)
515511

516512
# shutdown any existing subscriptions
517-
if attr_subs := self._subscriptions.pop(node_id, None):
518-
for attr_sub in attr_subs:
519-
await self._call_sdk(attr_sub.Shutdown)
513+
if sub := self._subscriptions.pop(node_id, None):
514+
await self._call_sdk(sub.Shutdown)
520515

521516
# pop any existing interview/subscription reschedule timer
522517
self._sub_retry_timer.pop(node_id, None)
@@ -618,7 +613,9 @@ async def _subscribe_node(self, node_id: int) -> None:
618613
node = cast(MatterNodeData, self._nodes[node_id])
619614

620615
# work out all (current) attribute subscriptions
621-
attr_subscriptions: list[Attribute.AttributePath] = []
616+
attr_subscriptions: list[Attribute.AttributePath] = list(
617+
BASE_SUBSCRIBE_ATTRIBUTES
618+
)
622619
for (
623620
endpoint_id,
624621
cluster_id,
@@ -631,35 +628,29 @@ async def _subscribe_node(self, node_id: int) -> None:
631628
)
632629
if attr_path in attr_subscriptions:
633630
continue
634-
if cluster_id in (
635-
Clusters.BridgedDeviceBasicInformation.id,
636-
Clusters.BasicInformation.id,
637-
):
638-
# already watched in base subscription
639-
continue
640631
attr_subscriptions.append(attr_path)
641632

642-
if node.is_bridge or len(attr_subscriptions) > 3:
643-
# a matter device can only handle 3 attribute paths per subscription
644-
# and a maximum of 3 concurrent subscriptions per fabric
645-
# although the device can probably handle more, we play it safe and opt for
646-
# wildcard as soon as we have more than 3 paths to watch for.
647-
# note that we create 2 subscriptions to the device as we we watch some base
648-
# attributes in the first (lifeline) subscription.
633+
if node.is_bridge or len(attr_subscriptions) > 9:
634+
# A matter device can officially only handle 3 attribute paths per subscription
635+
# and a maximum of 3 concurrent subscriptions per fabric.
636+
# We cheat a bit here and use one single subscription for up to 9 paths,
637+
# because in our experience that is more stable than multiple subscriptions
638+
# to the same device. If we have more than 9 paths to watch for a node,
639+
# we switch to a wildcard subscription.
649640
attr_subscriptions = [Attribute.AttributePath()] # wildcard
650641

651642
# check if we already have setup subscriptions for this node,
652643
# if so, we need to unsubscribe first unless nothing changed
653644
# in the attribute paths we want to subscribe.
654-
if prev_subs := self._subscriptions.pop(node_id, None):
645+
if prev_sub := self._subscriptions.get(node_id, None):
655646
if self._attr_subscriptions.get(node_id) == attr_subscriptions:
656647
# the current subscription already matches, no need to re-setup
657648
node_logger.debug("Re-using existing subscription.")
658649
return
659650
async with node_lock:
660651
node_logger.debug("Unsubscribing from existing subscription.")
661-
for prev_sub in prev_subs:
662-
await self._call_sdk(prev_sub.Shutdown)
652+
await self._call_sdk(prev_sub.Shutdown)
653+
del self._subscriptions[node_id]
663654

664655
# store our list of subscriptions for this node
665656
self._attr_subscriptions[node_id] = attr_subscriptions
@@ -671,82 +662,9 @@ async def _subscribe_node(self, node_id: int) -> None:
671662
== Clusters.ThreadNetworkDiagnostics.Enums.RoutingRoleEnum.kSleepyEndDevice
672663
)
673664

674-
async with node_lock:
675-
node_logger.info("Setting up attributes and events subscription.")
676-
interval_floor = 0
677-
interval_ceiling = (
678-
random.randint(60, 300) if battery_powered else random.randint(30, 60)
679-
)
680-
# we set-up 2 subscriptions to the node (we may maximum use 3 subs per node)
681-
# the first subscription is a base subscription with the mandatory clusters/attributes
682-
# we need to watch and can be considered as a lifeline to quickly notice if the
683-
# device is online/offline while the second interval actually subscribes to
684-
# the attributes and/or events.
685-
base_sub = await self._setup_subscription(
686-
node,
687-
attr_subscriptions=list(BASE_SUBSCRIBE_ATTRIBUTES),
688-
interval_floor=interval_floor,
689-
interval_ceiling=interval_ceiling,
690-
# subscribe to urgent device events only (e.g. button press etc.) only
691-
event_subscriptions=[
692-
Attribute.EventPath(
693-
EndpointId=None, Cluster=None, Event=None, Urgent=1
694-
)
695-
],
696-
)
697-
attr_sub = await self._setup_subscription(
698-
node,
699-
attr_subscriptions=attr_subscriptions,
700-
interval_floor=interval_floor,
701-
interval_ceiling=interval_ceiling,
702-
)
703-
# if we reach this point, it means the node could be resolved
704-
# and the initial subscription succeeded, mark the node available.
705-
self._subscriptions[node_id] = (base_sub, attr_sub)
706-
node.available = True
707-
# update attributes with current state from read request
708-
# NOTE: Make public method upstream for retrieving the attributeTLVCache
709-
# pylint: disable=protected-access
710-
for sub in (base_sub, attr_sub):
711-
tlv_attributes = sub._readTransaction._cache.attributeTLVCache
712-
node.attributes.update(parse_attributes_from_read_result(tlv_attributes))
713-
node_logger.info("Subscription succeeded")
714-
self.server.signal_event(EventType.NODE_UPDATED, node)
715-
716-
async def _setup_subscription(
717-
self,
718-
node: MatterNodeData,
719-
attr_subscriptions: list[Attribute.AttributePath],
720-
interval_floor: int = 0,
721-
interval_ceiling: int = 60,
722-
event_subscriptions: list[Attribute.EventPath] | None = None,
723-
) -> Attribute.SubscriptionTransaction:
724-
"""Handle Setup of a single Node AttributePath(s) subscription."""
725-
node_id = node.node_id
726-
node_logger = LOGGER.getChild(f"[node {node_id}]")
727-
assert self.chip_controller is not None
728-
node_logger.debug("Setting up attributes and events subscription.")
729-
self._last_subscription_attempt[node_id] = 0
730665
loop = cast(asyncio.AbstractEventLoop, self.server.loop)
731-
future = loop.create_future()
732-
device = await self._resolve_node(node_id)
733-
Attribute.Read(
734-
future=future,
735-
eventLoop=loop,
736-
device=device.deviceProxy,
737-
devCtrl=self.chip_controller,
738-
attributes=attr_subscriptions,
739-
events=event_subscriptions,
740-
returnClusterObject=False,
741-
subscriptionParameters=Attribute.SubscriptionParameters(
742-
interval_floor, interval_ceiling
743-
),
744-
# Use fabricfiltered as False to detect changes made by other controllers
745-
# and to be able to provide a list of all fabrics attached to the device
746-
fabricFiltered=False,
747-
autoResubscribe=True,
748-
).raise_on_error()
749-
sub: Attribute.SubscriptionTransaction = await future
666+
667+
# set-up the actual subscription
750668

751669
def attribute_updated_callback(
752670
path: Attribute.TypedAttributePath,
@@ -874,12 +792,54 @@ def resubscription_succeeded(
874792
node.available = True
875793
self.server.signal_event(EventType.NODE_UPDATED, node)
876794

877-
sub.SetAttributeUpdateCallback(attribute_updated_callback)
878-
sub.SetEventUpdateCallback(event_callback)
879-
sub.SetErrorCallback(error_callback)
880-
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
881-
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)
882-
return sub
795+
async with node_lock:
796+
node_logger.info("Setting up attributes and events subscription.")
797+
interval_floor = 0
798+
interval_ceiling = (
799+
random.randint(60, 300) if battery_powered else random.randint(30, 120)
800+
)
801+
self._last_subscription_attempt[node_id] = 0
802+
future = loop.create_future()
803+
device = await self._resolve_node(node_id)
804+
Attribute.Read(
805+
future=future,
806+
eventLoop=loop,
807+
device=device.deviceProxy,
808+
devCtrl=self.chip_controller,
809+
attributes=attr_subscriptions,
810+
events=[
811+
Attribute.EventPath(
812+
EndpointId=None, Cluster=None, Event=None, Urgent=1
813+
)
814+
],
815+
returnClusterObject=False,
816+
subscriptionParameters=Attribute.SubscriptionParameters(
817+
interval_floor, interval_ceiling
818+
),
819+
# Use fabricfiltered as False to detect changes made by other controllers
820+
# and to be able to provide a list of all fabrics attached to the device
821+
fabricFiltered=False,
822+
autoResubscribe=True,
823+
).raise_on_error()
824+
sub: Attribute.SubscriptionTransaction = await future
825+
826+
sub.SetAttributeUpdateCallback(attribute_updated_callback)
827+
sub.SetEventUpdateCallback(event_callback)
828+
sub.SetErrorCallback(error_callback)
829+
sub.SetResubscriptionAttemptedCallback(resubscription_attempted)
830+
sub.SetResubscriptionSucceededCallback(resubscription_succeeded)
831+
832+
# if we reach this point, it means the node could be resolved
833+
# and the initial subscription succeeded, mark the node available.
834+
self._subscriptions[node_id] = sub
835+
node.available = True
836+
# update attributes with current state from read request
837+
# NOTE: Make public method upstream for retrieving the attributeTLVCache
838+
# pylint: disable=protected-access
839+
tlv_attributes = sub._readTransaction._cache.attributeTLVCache
840+
node.attributes.update(parse_attributes_from_read_result(tlv_attributes))
841+
node_logger.info("Subscription succeeded")
842+
self.server.signal_event(EventType.NODE_UPDATED, node)
883843

884844
def _get_next_node_id(self) -> int:
885845
"""Return next node_id."""

0 commit comments

Comments
 (0)