Skip to content

Commit d292892

Browse files
authored
[Python] Process attribute cache updates in Python thread (#35557)
* [Python] Process attribute cache updates in Python thread Instead of processing the attribute update in the SDK thread, process them on request in the Python thread. This avoids acks being sent back too late to the device after the last DataReport if there are many attribute updates sent at once. Currently still the same data model and processing is done. There is certainly also room for optimization to make this more efficient. * Get updated attribute values Make sure to get the attribute values again after each command to get the updated attribute cache. * Reference ReadEvent/ReadAttribute APIs on dev controller object
1 parent 64f859b commit d292892

File tree

4 files changed

+116
-101
lines changed

4 files changed

+116
-101
lines changed

docs/guides/repl/Matter_Basic_Interactions.ipynb

+2-2
Original file line numberDiff line numberDiff line change
@@ -3504,7 +3504,7 @@
35043504
"source": [
35053505
"#### Read Events:\n",
35063506
"\n",
3507-
"A `ReadEvents` API exists that behaves similarly to the `ReadAttributes` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
3507+
"A `ReadEvent` API exists that behaves similarly to the `ReadAttribute` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
35083508
]
35093509
},
35103510
{
@@ -3609,7 +3609,7 @@
36093609
"source": [
36103610
"### Subscription Interaction\n",
36113611
"\n",
3612-
"To subscribe to a Node, the same `ReadAttributes` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
3612+
"To subscribe to a Node, the same `ReadAttribute` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
36133613
]
36143614
},
36153615
{

src/controller/python/chip/ChipDeviceCtrl.py

+75-57
Original file line numberDiff line numberDiff line change
@@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
14331433
else:
14341434
raise ValueError("Unsupported Attribute Path")
14351435

1436-
async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
1437-
None, # Empty tuple, all wildcard
1438-
typing.Tuple[int], # Endpoint
1439-
# Wildcard endpoint, Cluster id present
1440-
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1441-
# Wildcard endpoint, Cluster + Attribute present
1442-
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1443-
# Wildcard attribute id
1444-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1445-
# Concrete path
1446-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1447-
# Directly specified attribute path
1448-
ClusterAttribute.AttributePath
1449-
]]] = None,
1436+
async def Read(
1437+
self,
1438+
nodeid: int,
1439+
attributes: typing.Optional[typing.List[typing.Union[
1440+
None, # Empty tuple, all wildcard
1441+
typing.Tuple[int], # Endpoint
1442+
# Wildcard endpoint, Cluster id present
1443+
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1444+
# Wildcard endpoint, Cluster + Attribute present
1445+
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1446+
# Wildcard attribute id
1447+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1448+
# Concrete path
1449+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1450+
# Directly specified attribute path
1451+
ClusterAttribute.AttributePath
1452+
]]] = None,
14501453
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
14511454
typing.Union[
14521455
None, # Empty tuple, all wildcard
@@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
14611464
# Concrete path
14621465
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
14631466
]]] = None,
1464-
eventNumberFilter: typing.Optional[int] = None,
1465-
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1466-
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1467-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1467+
eventNumberFilter: typing.Optional[int] = None,
1468+
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1469+
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1470+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1471+
):
14681472
'''
14691473
Read a list of attributes and/or events from a target node
14701474
@@ -1534,33 +1538,43 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
15341538
eventPaths = [self._parseEventPathTuple(
15351539
v) for v in events] if events else None
15361540

1537-
ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
1541+
transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
1542+
ClusterAttribute.Read(transaction, device=device.deviceProxy,
15381543
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
1539-
eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
1544+
eventNumberFilter=eventNumberFilter,
15401545
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
15411546
reportInterval[0], reportInterval[1]) if reportInterval else None,
15421547
fabricFiltered=fabricFiltered,
15431548
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
1544-
return await future
1549+
await future
15451550

1546-
async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
1547-
None, # Empty tuple, all wildcard
1548-
typing.Tuple[int], # Endpoint
1549-
# Wildcard endpoint, Cluster id present
1550-
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1551-
# Wildcard endpoint, Cluster + Attribute present
1552-
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1553-
# Wildcard attribute id
1554-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1555-
# Concrete path
1556-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1557-
# Directly specified attribute path
1558-
ClusterAttribute.AttributePath
1559-
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
1560-
returnClusterObject: bool = False,
1561-
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1562-
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1563-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1551+
if result := transaction.GetSubscriptionHandler():
1552+
return result
1553+
else:
1554+
return transaction.GetReadResponse()
1555+
1556+
async def ReadAttribute(
1557+
self,
1558+
nodeid: int,
1559+
attributes: typing.Optional[typing.List[typing.Union[
1560+
None, # Empty tuple, all wildcard
1561+
typing.Tuple[int], # Endpoint
1562+
# Wildcard endpoint, Cluster id present
1563+
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1564+
# Wildcard endpoint, Cluster + Attribute present
1565+
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1566+
# Wildcard attribute id
1567+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1568+
# Concrete path
1569+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1570+
# Directly specified attribute path
1571+
ClusterAttribute.AttributePath
1572+
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
1573+
returnClusterObject: bool = False,
1574+
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1575+
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1576+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1577+
):
15641578
'''
15651579
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
15661580
@@ -1629,24 +1643,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li
16291643
else:
16301644
return res.attributes
16311645

1632-
async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
1633-
None, # Empty tuple, all wildcard
1634-
typing.Tuple[str, int], # all wildcard with urgency set
1635-
typing.Tuple[int, int], # Endpoint,
1636-
# Wildcard endpoint, Cluster id present
1637-
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
1638-
# Wildcard endpoint, Cluster + Event present
1639-
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
1640-
# Wildcard event id
1641-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
1642-
# Concrete path
1643-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
1644-
]], eventNumberFilter: typing.Optional[int] = None,
1645-
fabricFiltered: bool = True,
1646-
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1647-
keepSubscriptions: bool = False,
1648-
autoResubscribe: bool = True,
1649-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1646+
async def ReadEvent(
1647+
self,
1648+
nodeid: int,
1649+
events: typing.List[typing.Union[
1650+
None, # Empty tuple, all wildcard
1651+
typing.Tuple[str, int], # all wildcard with urgency set
1652+
typing.Tuple[int, int], # Endpoint,
1653+
# Wildcard endpoint, Cluster id present
1654+
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
1655+
# Wildcard endpoint, Cluster + Event present
1656+
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
1657+
# Wildcard event id
1658+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
1659+
# Concrete path
1660+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
1661+
]], eventNumberFilter: typing.Optional[int] = None,
1662+
fabricFiltered: bool = True,
1663+
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1664+
keepSubscriptions: bool = False,
1665+
autoResubscribe: bool = True,
1666+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1667+
):
16501668
'''
16511669
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
16521670

src/controller/python/chip/clusters/Attribute.py

+34-38
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,17 @@ class AttributeCache:
314314
returnClusterObject: bool = False
315315
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
316316
default_factory=lambda: {})
317-
attributeCache: Dict[int, List[Cluster]] = field(
318-
default_factory=lambda: {})
319317
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
320318
default_factory=lambda: {})
321319

320+
_attributeCacheUpdateNeeded: set[AttributePath] = field(
321+
default_factory=lambda: set())
322+
_attributeCache: Dict[int, List[Cluster]] = field(
323+
default_factory=lambda: {})
324+
322325
def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
323326
''' Store data in TLV since that makes it easiest to eventually convert to either the
324-
cluster or attribute view representations (see below in UpdateCachedData).
327+
cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
325328
'''
326329
if (path.EndpointId not in self.attributeTLVCache):
327330
self.attributeTLVCache[path.EndpointId] = {}
@@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V
344347

345348
clusterCache[path.AttributeId] = data
346349

347-
def UpdateCachedData(self, changedPathSet: set[AttributePath]):
350+
# For this path the attribute cache still requires an update.
351+
self._attributeCacheUpdateNeeded.add(path)
352+
353+
def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
348354
''' This converts the raw TLV data into a cluster object format.
349355
350356
Two formats are available:
@@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
381387
except Exception as ex:
382388
return ValueDecodeFailure(value, ex)
383389

384-
for attributePath in changedPathSet:
390+
for attributePath in self._attributeCacheUpdateNeeded:
385391
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId
386392

387-
if endpointId not in self.attributeCache:
388-
self.attributeCache[endpointId] = {}
389-
endpointCache = self.attributeCache[endpointId]
393+
if endpointId not in self._attributeCache:
394+
self._attributeCache[endpointId] = {}
395+
endpointCache = self._attributeCache[endpointId]
390396

391397
if clusterId not in _ClusterIndex:
392398
#
@@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
414420

415421
attributeType = _AttributeIndex[(clusterId, attributeId)][0]
416422
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
423+
self._attributeCacheUpdateNeeded.clear()
424+
return self._attributeCache
417425

418426

419427
class SubscriptionTransaction:
@@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
434442
def GetAttributes(self):
435443
''' Returns the attribute value cache tracking the latest state on the publisher.
436444
'''
437-
return self._readTransaction._cache.attributeCache
445+
return self._readTransaction._cache.GetUpdatedAttributeCache()
438446

439447
def GetAttribute(self, path: TypedAttributePath) -> Any:
440448
''' Returns a specific attribute given a TypedAttributePath.
441449
'''
442-
data = self._readTransaction._cache.attributeCache
450+
data = self._readTransaction._cache.GetUpdatedAttributeCache()
443451

444452
if (self._readTransaction._cache.returnClusterObject):
445453
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
@@ -650,6 +658,18 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
650658
def GetAllEventValues(self):
651659
return self._events
652660

661+
def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
662+
"""Prepares and returns the ReadResponse object."""
663+
return self.ReadResponse(
664+
attributes=self._cache.GetUpdatedAttributeCache(),
665+
events=self._events,
666+
tlvAttributes=self._cache.attributeTLVCache
667+
)
668+
669+
def GetSubscriptionHandler(self) -> SubscriptionTransaction | None:
670+
"""Returns subscription transaction."""
671+
return self._subscription_handler
672+
653673
def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
654674
try:
655675
imStatus = chip.interaction_model.Status(status)
@@ -716,7 +736,7 @@ def _handleSubscriptionEstablished(self, subscriptionId):
716736
if not self._future.done():
717737
self._subscription_handler = SubscriptionTransaction(
718738
self, subscriptionId, self._devCtrl)
719-
self._future.set_result(self._subscription_handler)
739+
self._future.set_result(self)
720740
else:
721741
self._subscription_handler._subscriptionId = subscriptionId
722742
if self._subscription_handler._onResubscriptionSucceededCb is not None:
@@ -745,8 +765,6 @@ def _handleReportBegin(self):
745765
pass
746766

747767
def _handleReportEnd(self):
748-
self._cache.UpdateCachedData(self._changedPathSet)
749-
750768
if (self._subscription_handler is not None):
751769
for change in self._changedPathSet:
752770
try:
@@ -772,8 +790,7 @@ def _handleDone(self):
772790
if self._resultError is not None:
773791
self._future.set_exception(self._resultError.to_exception())
774792
else:
775-
self._future.set_result(AsyncReadTransaction.ReadResponse(
776-
attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
793+
self._future.set_result(self)
777794

778795
#
779796
# Decrement the ref on ourselves to match the increment that happened at allocation.
@@ -1001,18 +1018,16 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri
10011018
)
10021019

10031020

1004-
def Read(future: Future, eventLoop, device, devCtrl,
1021+
def Read(transaction: AsyncReadTransaction, device,
10051022
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
1006-
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
1023+
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
10071024
subscriptionParameters: Optional[SubscriptionParameters] = None,
10081025
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
10091026
if (not attributes) and dataVersionFilters:
10101027
raise ValueError(
10111028
"Must provide valid attribute list when data version filters is not null")
10121029

10131030
handle = chip.native.GetLibraryHandle()
1014-
transaction = AsyncReadTransaction(
1015-
future, eventLoop, devCtrl, returnClusterObject)
10161031

10171032
attributePathsForCffi = None
10181033
if attributes is not None:
@@ -1119,25 +1134,6 @@ def Read(future: Future, eventLoop, device, devCtrl,
11191134
return res
11201135

11211136

1122-
def ReadAttributes(future: Future, eventLoop, device, devCtrl,
1123-
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
1124-
returnClusterObject: bool = True,
1125-
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
1126-
return Read(future=future, eventLoop=eventLoop, device=device,
1127-
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
1128-
events=None, returnClusterObject=returnClusterObject,
1129-
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
1130-
1131-
1132-
def ReadEvents(future: Future, eventLoop, device, devCtrl,
1133-
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
1134-
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
1135-
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
1136-
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
1137-
returnClusterObject=returnClusterObject,
1138-
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
1139-
1140-
11411137
def Init():
11421138
handle = chip.native.GetLibraryHandle()
11431139

0 commit comments

Comments
 (0)