Skip to content

Commit d66652c

Browse files
committed
[Python] Process attribute cache updates in Python thread
Instead of processing the attribute update in the SDK thread, process them on request in the Python thread. This avoids acks being sent back too late to the device after the last DataReport if there are many attribute updates sent at once. Currently still the same data model and processing is done. There is certainly also room for optimization to make this more efficient.
1 parent d04a667 commit d66652c

File tree

2 files changed

+101
-94
lines changed

2 files changed

+101
-94
lines changed

src/controller/python/chip/ChipDeviceCtrl.py

+72-57
Original file line numberDiff line numberDiff line change
@@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
14331433
else:
14341434
raise ValueError("Unsupported Attribute Path")
14351435

1436-
async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
1437-
None, # Empty tuple, all wildcard
1438-
typing.Tuple[int], # Endpoint
1439-
# Wildcard endpoint, Cluster id present
1440-
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1441-
# Wildcard endpoint, Cluster + Attribute present
1442-
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1443-
# Wildcard attribute id
1444-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1445-
# Concrete path
1446-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1447-
# Directly specified attribute path
1448-
ClusterAttribute.AttributePath
1449-
]]] = None,
1436+
async def Read(
1437+
self,
1438+
nodeid: int,
1439+
attributes: typing.Optional[typing.List[typing.Union[
1440+
None, # Empty tuple, all wildcard
1441+
typing.Tuple[int], # Endpoint
1442+
# Wildcard endpoint, Cluster id present
1443+
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1444+
# Wildcard endpoint, Cluster + Attribute present
1445+
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1446+
# Wildcard attribute id
1447+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1448+
# Concrete path
1449+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1450+
# Directly specified attribute path
1451+
ClusterAttribute.AttributePath
1452+
]]] = None,
14501453
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
14511454
typing.Union[
14521455
None, # Empty tuple, all wildcard
@@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
14611464
# Concrete path
14621465
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
14631466
]]] = None,
1464-
eventNumberFilter: typing.Optional[int] = None,
1465-
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1466-
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1467-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1467+
eventNumberFilter: typing.Optional[int] = None,
1468+
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1469+
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1470+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1471+
):
14681472
'''
14691473
Read a list of attributes and/or events from a target node
14701474
@@ -1534,33 +1538,40 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
15341538
eventPaths = [self._parseEventPathTuple(
15351539
v) for v in events] if events else None
15361540

1537-
ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
1541+
transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
1542+
ClusterAttribute.Read(transaction,
15381543
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
1539-
eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
1544+
eventNumberFilter=eventNumberFilter,
15401545
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
15411546
reportInterval[0], reportInterval[1]) if reportInterval else None,
15421547
fabricFiltered=fabricFiltered,
15431548
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
1544-
return await future
1549+
await future
15451550

1546-
async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
1547-
None, # Empty tuple, all wildcard
1548-
typing.Tuple[int], # Endpoint
1549-
# Wildcard endpoint, Cluster id present
1550-
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1551-
# Wildcard endpoint, Cluster + Attribute present
1552-
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1553-
# Wildcard attribute id
1554-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1555-
# Concrete path
1556-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1557-
# Directly specified attribute path
1558-
ClusterAttribute.AttributePath
1559-
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
1560-
returnClusterObject: bool = False,
1561-
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1562-
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1563-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1551+
return transaction.GetReadResponse()
1552+
1553+
async def ReadAttribute(
1554+
self,
1555+
nodeid: int,
1556+
attributes: typing.Optional[typing.List[typing.Union[
1557+
None, # Empty tuple, all wildcard
1558+
typing.Tuple[int], # Endpoint
1559+
# Wildcard endpoint, Cluster id present
1560+
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
1561+
# Wildcard endpoint, Cluster + Attribute present
1562+
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1563+
# Wildcard attribute id
1564+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
1565+
# Concrete path
1566+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
1567+
# Directly specified attribute path
1568+
ClusterAttribute.AttributePath
1569+
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
1570+
returnClusterObject: bool = False,
1571+
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1572+
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
1573+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1574+
):
15641575
'''
15651576
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
15661577
@@ -1629,24 +1640,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li
16291640
else:
16301641
return res.attributes
16311642

1632-
async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
1633-
None, # Empty tuple, all wildcard
1634-
typing.Tuple[str, int], # all wildcard with urgency set
1635-
typing.Tuple[int, int], # Endpoint,
1636-
# Wildcard endpoint, Cluster id present
1637-
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
1638-
# Wildcard endpoint, Cluster + Event present
1639-
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
1640-
# Wildcard event id
1641-
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
1642-
# Concrete path
1643-
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
1644-
]], eventNumberFilter: typing.Optional[int] = None,
1645-
fabricFiltered: bool = True,
1646-
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1647-
keepSubscriptions: bool = False,
1648-
autoResubscribe: bool = True,
1649-
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
1643+
async def ReadEvent(
1644+
self,
1645+
nodeid: int,
1646+
events: typing.List[typing.Union[
1647+
None, # Empty tuple, all wildcard
1648+
typing.Tuple[str, int], # all wildcard with urgency set
1649+
typing.Tuple[int, int], # Endpoint,
1650+
# Wildcard endpoint, Cluster id present
1651+
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
1652+
# Wildcard endpoint, Cluster + Event present
1653+
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
1654+
# Wildcard event id
1655+
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
1656+
# Concrete path
1657+
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
1658+
]], eventNumberFilter: typing.Optional[int] = None,
1659+
fabricFiltered: bool = True,
1660+
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
1661+
keepSubscriptions: bool = False,
1662+
autoResubscribe: bool = True,
1663+
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
1664+
):
16501665
'''
16511666
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
16521667

src/controller/python/chip/clusters/Attribute.py

+29-37
Original file line numberDiff line numberDiff line change
@@ -314,14 +314,17 @@ class AttributeCache:
314314
returnClusterObject: bool = False
315315
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
316316
default_factory=lambda: {})
317-
attributeCache: Dict[int, List[Cluster]] = field(
318-
default_factory=lambda: {})
319317
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
320318
default_factory=lambda: {})
321319

320+
_attributeCacheUpdateNeeded: set[AttributePath] = field(
321+
default_factory=lambda: set())
322+
_attributeCache: Dict[int, List[Cluster]] = field(
323+
default_factory=lambda: {})
324+
322325
def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
323326
''' Store data in TLV since that makes it easiest to eventually convert to either the
324-
cluster or attribute view representations (see below in UpdateCachedData).
327+
cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
325328
'''
326329
if (path.EndpointId not in self.attributeTLVCache):
327330
self.attributeTLVCache[path.EndpointId] = {}
@@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V
344347

345348
clusterCache[path.AttributeId] = data
346349

347-
def UpdateCachedData(self, changedPathSet: set[AttributePath]):
350+
# For this path the attribute cache still requires an update.
351+
self._attributeCacheUpdateNeeded.add(path)
352+
353+
def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
348354
''' This converts the raw TLV data into a cluster object format.
349355
350356
Two formats are available:
@@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
381387
except Exception as ex:
382388
return ValueDecodeFailure(value, ex)
383389

384-
for attributePath in changedPathSet:
390+
for attributePath in self._attributeCacheUpdateNeeded:
385391
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId
386392

387-
if endpointId not in self.attributeCache:
388-
self.attributeCache[endpointId] = {}
389-
endpointCache = self.attributeCache[endpointId]
393+
if endpointId not in self._attributeCache:
394+
self._attributeCache[endpointId] = {}
395+
endpointCache = self._attributeCache[endpointId]
390396

391397
if clusterId not in _ClusterIndex:
392398
#
@@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
414420

415421
attributeType = _AttributeIndex[(clusterId, attributeId)][0]
416422
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
423+
self._attributeCacheUpdateNeeded.clear()
424+
return self._attributeCache
417425

418426

419427
class SubscriptionTransaction:
@@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
434442
def GetAttributes(self):
435443
''' Returns the attribute value cache tracking the latest state on the publisher.
436444
'''
437-
return self._readTransaction._cache.attributeCache
445+
return self._readTransaction._cache.GetUpdatedAttributeCache()
438446

439447
def GetAttribute(self, path: TypedAttributePath) -> Any:
440448
''' Returns a specific attribute given a TypedAttributePath.
441449
'''
442-
data = self._readTransaction._cache.attributeCache
450+
data = self._readTransaction._cache.GetUpdatedAttributeCache()
443451

444452
if (self._readTransaction._cache.returnClusterObject):
445453
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
@@ -650,6 +658,14 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
650658
def GetAllEventValues(self):
651659
return self._events
652660

661+
def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
662+
"""Prepares and returns the ReadResponse object."""
663+
return self.ReadResponse(
664+
attributes=self._cache.GetUpdatedAttributeCache(),
665+
events=self._events,
666+
tlvAttributes=self._cache.attributeTLVCache
667+
)
668+
653669
def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
654670
try:
655671
imStatus = chip.interaction_model.Status(status)
@@ -745,8 +761,6 @@ def _handleReportBegin(self):
745761
pass
746762

747763
def _handleReportEnd(self):
748-
self._cache.UpdateCachedData(self._changedPathSet)
749-
750764
if (self._subscription_handler is not None):
751765
for change in self._changedPathSet:
752766
try:
@@ -772,8 +786,7 @@ def _handleDone(self):
772786
if self._resultError is not None:
773787
self._future.set_exception(self._resultError.to_exception())
774788
else:
775-
self._future.set_result(AsyncReadTransaction.ReadResponse(
776-
attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
789+
self._future.set_result(self)
777790

778791
#
779792
# Decrement the ref on ourselves to match the increment that happened at allocation.
@@ -1001,18 +1014,16 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri
10011014
)
10021015

10031016

1004-
def Read(future: Future, eventLoop, device, devCtrl,
1017+
def Read(transaction: AsyncReadTransaction,
10051018
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
1006-
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
1019+
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
10071020
subscriptionParameters: Optional[SubscriptionParameters] = None,
10081021
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
10091022
if (not attributes) and dataVersionFilters:
10101023
raise ValueError(
10111024
"Must provide valid attribute list when data version filters is not null")
10121025

10131026
handle = chip.native.GetLibraryHandle()
1014-
transaction = AsyncReadTransaction(
1015-
future, eventLoop, devCtrl, returnClusterObject)
10161027

10171028
attributePathsForCffi = None
10181029
if attributes is not None:
@@ -1119,25 +1130,6 @@ def Read(future: Future, eventLoop, device, devCtrl,
11191130
return res
11201131

11211132

1122-
def ReadAttributes(future: Future, eventLoop, device, devCtrl,
1123-
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
1124-
returnClusterObject: bool = True,
1125-
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
1126-
return Read(future=future, eventLoop=eventLoop, device=device,
1127-
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
1128-
events=None, returnClusterObject=returnClusterObject,
1129-
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
1130-
1131-
1132-
def ReadEvents(future: Future, eventLoop, device, devCtrl,
1133-
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
1134-
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
1135-
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
1136-
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
1137-
returnClusterObject=returnClusterObject,
1138-
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
1139-
1140-
11411133
def Init():
11421134
handle = chip.native.GetLibraryHandle()
11431135

0 commit comments

Comments
 (0)