[Python] Process attribute cache updates in Python thread (#35557)
* [Python] Process attribute cache updates in Python thread
Instead of processing the attribute update in the SDK thread, process
them on request in the Python thread. This avoids acks being sent back
too late to the device after the last DataReport if there are many
attribute updates sent at once.
Currently still the same data model and processing is done. There is
certainly also room for optimization to make this more efficient.
* Get updated attribute values
Make sure to get the attribute values again after each command to get
the updated attribute cache.
* Reference ReadEvent/ReadAttribute APIs on dev controller object
diff --git a/docs/guides/repl/Matter_Basic_Interactions.ipynb b/docs/guides/repl/Matter_Basic_Interactions.ipynb
index 41c1c78..bc021ae 100644
--- a/docs/guides/repl/Matter_Basic_Interactions.ipynb
+++ b/docs/guides/repl/Matter_Basic_Interactions.ipynb
@@ -3504,7 +3504,7 @@
"source": [
"#### Read Events:\n",
"\n",
- "A `ReadEvents` API exists that behaves similarly to the `ReadAttributes` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
+ "A `ReadEvent` API exists that behaves similarly to the `ReadAttribute` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations."
]
},
{
@@ -3609,7 +3609,7 @@
"source": [
"### Subscription Interaction\n",
"\n",
- "To subscribe to a Node, the same `ReadAttributes` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
+ "To subscribe to a Node, the same `ReadAttribute` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription."
]
},
{
diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py
index d76d415..8c751f7 100644
--- a/src/controller/python/chip/ChipDeviceCtrl.py
+++ b/src/controller/python/chip/ChipDeviceCtrl.py
@@ -1433,20 +1433,23 @@
else:
raise ValueError("Unsupported Attribute Path")
- async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
- None, # Empty tuple, all wildcard
- typing.Tuple[int], # Endpoint
- # Wildcard endpoint, Cluster id present
- typing.Tuple[typing.Type[ClusterObjects.Cluster]],
- # Wildcard endpoint, Cluster + Attribute present
- typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
- # Wildcard attribute id
- typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
- # Concrete path
- typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
- # Directly specified attribute path
- ClusterAttribute.AttributePath
- ]]] = None,
+ async def Read(
+ self,
+ nodeid: int,
+ attributes: typing.Optional[typing.List[typing.Union[
+ None, # Empty tuple, all wildcard
+ typing.Tuple[int], # Endpoint
+ # Wildcard endpoint, Cluster id present
+ typing.Tuple[typing.Type[ClusterObjects.Cluster]],
+ # Wildcard endpoint, Cluster + Attribute present
+ typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
+ # Wildcard attribute id
+ typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
+ # Concrete path
+ typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
+ # Directly specified attribute path
+ ClusterAttribute.AttributePath
+ ]]] = None,
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
typing.Union[
None, # Empty tuple, all wildcard
@@ -1461,10 +1464,11 @@
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]]] = None,
- eventNumberFilter: typing.Optional[int] = None,
- returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
- fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
- payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
+ eventNumberFilter: typing.Optional[int] = None,
+ returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
+ fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
+ payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
+ ):
'''
Read a list of attributes and/or events from a target node
@@ -1534,33 +1538,43 @@
eventPaths = [self._parseEventPathTuple(
v) for v in events] if events else None
- ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
+ transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
+ ClusterAttribute.Read(transaction, device=device.deviceProxy,
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
- eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
+ eventNumberFilter=eventNumberFilter,
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
reportInterval[0], reportInterval[1]) if reportInterval else None,
fabricFiltered=fabricFiltered,
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
- return await future
+ await future
- async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
- None, # Empty tuple, all wildcard
- typing.Tuple[int], # Endpoint
- # Wildcard endpoint, Cluster id present
- typing.Tuple[typing.Type[ClusterObjects.Cluster]],
- # Wildcard endpoint, Cluster + Attribute present
- typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
- # Wildcard attribute id
- typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
- # Concrete path
- typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
- # Directly specified attribute path
- ClusterAttribute.AttributePath
- ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
- returnClusterObject: bool = False,
- reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
- fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
- payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
+ if result := transaction.GetSubscriptionHandler():
+ return result
+ else:
+ return transaction.GetReadResponse()
+
+ async def ReadAttribute(
+ self,
+ nodeid: int,
+ attributes: typing.Optional[typing.List[typing.Union[
+ None, # Empty tuple, all wildcard
+ typing.Tuple[int], # Endpoint
+ # Wildcard endpoint, Cluster id present
+ typing.Tuple[typing.Type[ClusterObjects.Cluster]],
+ # Wildcard endpoint, Cluster + Attribute present
+ typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
+ # Wildcard attribute id
+ typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
+ # Concrete path
+ typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
+ # Directly specified attribute path
+ ClusterAttribute.AttributePath
+ ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
+ returnClusterObject: bool = False,
+ reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
+ fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
+ payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
+ ):
'''
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
@@ -1629,24 +1643,28 @@
else:
return res.attributes
- async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
- None, # Empty tuple, all wildcard
- typing.Tuple[str, int], # all wildcard with urgency set
- typing.Tuple[int, int], # Endpoint,
- # Wildcard endpoint, Cluster id present
- typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
- # Wildcard endpoint, Cluster + Event present
- typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
- # Wildcard event id
- typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
- # Concrete path
- typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
- ]], eventNumberFilter: typing.Optional[int] = None,
- fabricFiltered: bool = True,
- reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
- keepSubscriptions: bool = False,
- autoResubscribe: bool = True,
- payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
+ async def ReadEvent(
+ self,
+ nodeid: int,
+ events: typing.List[typing.Union[
+ None, # Empty tuple, all wildcard
+ typing.Tuple[str, int], # all wildcard with urgency set
+ typing.Tuple[int, int], # Endpoint,
+ # Wildcard endpoint, Cluster id present
+ typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
+ # Wildcard endpoint, Cluster + Event present
+ typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
+ # Wildcard event id
+ typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
+ # Concrete path
+ typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
+ ]], eventNumberFilter: typing.Optional[int] = None,
+ fabricFiltered: bool = True,
+ reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
+ keepSubscriptions: bool = False,
+ autoResubscribe: bool = True,
+ payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
+ ):
'''
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py
index 8412481..4d5bc1d 100644
--- a/src/controller/python/chip/clusters/Attribute.py
+++ b/src/controller/python/chip/clusters/Attribute.py
@@ -314,14 +314,17 @@
returnClusterObject: bool = False
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
default_factory=lambda: {})
- attributeCache: Dict[int, List[Cluster]] = field(
- default_factory=lambda: {})
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
default_factory=lambda: {})
+ _attributeCacheUpdateNeeded: set[AttributePath] = field(
+ default_factory=lambda: set())
+ _attributeCache: Dict[int, List[Cluster]] = field(
+ default_factory=lambda: {})
+
def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
''' Store data in TLV since that makes it easiest to eventually convert to either the
- cluster or attribute view representations (see below in UpdateCachedData).
+ cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
'''
if (path.EndpointId not in self.attributeTLVCache):
self.attributeTLVCache[path.EndpointId] = {}
@@ -344,7 +347,10 @@
clusterCache[path.AttributeId] = data
- def UpdateCachedData(self, changedPathSet: set[AttributePath]):
+ # For this path the attribute cache still requires an update.
+ self._attributeCacheUpdateNeeded.add(path)
+
+ def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
''' This converts the raw TLV data into a cluster object format.
Two formats are available:
@@ -381,12 +387,12 @@
except Exception as ex:
return ValueDecodeFailure(value, ex)
- for attributePath in changedPathSet:
+ for attributePath in self._attributeCacheUpdateNeeded:
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId
- if endpointId not in self.attributeCache:
- self.attributeCache[endpointId] = {}
- endpointCache = self.attributeCache[endpointId]
+ if endpointId not in self._attributeCache:
+ self._attributeCache[endpointId] = {}
+ endpointCache = self._attributeCache[endpointId]
if clusterId not in _ClusterIndex:
#
@@ -414,6 +420,8 @@
attributeType = _AttributeIndex[(clusterId, attributeId)][0]
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
+ self._attributeCacheUpdateNeeded.clear()
+ return self._attributeCache
class SubscriptionTransaction:
@@ -434,12 +442,12 @@
def GetAttributes(self):
''' Returns the attribute value cache tracking the latest state on the publisher.
'''
- return self._readTransaction._cache.attributeCache
+ return self._readTransaction._cache.GetUpdatedAttributeCache()
def GetAttribute(self, path: TypedAttributePath) -> Any:
''' Returns a specific attribute given a TypedAttributePath.
'''
- data = self._readTransaction._cache.attributeCache
+ data = self._readTransaction._cache.GetUpdatedAttributeCache()
if (self._readTransaction._cache.returnClusterObject):
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
@@ -650,6 +658,18 @@
def GetAllEventValues(self):
return self._events
+ def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
+ """Prepares and returns the ReadResponse object."""
+ return self.ReadResponse(
+ attributes=self._cache.GetUpdatedAttributeCache(),
+ events=self._events,
+ tlvAttributes=self._cache.attributeTLVCache
+ )
+
+ def GetSubscriptionHandler(self) -> SubscriptionTransaction | None:
+ """Returns subscription transaction."""
+ return self._subscription_handler
+
def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)
@@ -716,7 +736,7 @@
if not self._future.done():
self._subscription_handler = SubscriptionTransaction(
self, subscriptionId, self._devCtrl)
- self._future.set_result(self._subscription_handler)
+ self._future.set_result(self)
else:
self._subscription_handler._subscriptionId = subscriptionId
if self._subscription_handler._onResubscriptionSucceededCb is not None:
@@ -745,8 +765,6 @@
pass
def _handleReportEnd(self):
- self._cache.UpdateCachedData(self._changedPathSet)
-
if (self._subscription_handler is not None):
for change in self._changedPathSet:
try:
@@ -772,8 +790,7 @@
if self._resultError is not None:
self._future.set_exception(self._resultError.to_exception())
else:
- self._future.set_result(AsyncReadTransaction.ReadResponse(
- attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
+ self._future.set_result(self)
#
# Decrement the ref on ourselves to match the increment that happened at allocation.
@@ -1001,9 +1018,9 @@
)
-def Read(future: Future, eventLoop, device, devCtrl,
+def Read(transaction: AsyncReadTransaction, device,
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
- events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
+ events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
subscriptionParameters: Optional[SubscriptionParameters] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
@@ -1011,8 +1028,6 @@
"Must provide valid attribute list when data version filters is not null")
handle = chip.native.GetLibraryHandle()
- transaction = AsyncReadTransaction(
- future, eventLoop, devCtrl, returnClusterObject)
attributePathsForCffi = None
if attributes is not None:
@@ -1119,25 +1134,6 @@
return res
-def ReadAttributes(future: Future, eventLoop, device, devCtrl,
- attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
- returnClusterObject: bool = True,
- subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
- return Read(future=future, eventLoop=eventLoop, device=device,
- devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
- events=None, returnClusterObject=returnClusterObject,
- subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
-
-
-def ReadEvents(future: Future, eventLoop, device, devCtrl,
- events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
- subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
- return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
- dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
- returnClusterObject=returnClusterObject,
- subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)
-
-
def Init():
handle = chip.native.GetLibraryHandle()
diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py
index 10c2ad2..e18d119 100644
--- a/src/controller/python/test/test_scripts/cluster_objects.py
+++ b/src/controller/python/test/test_scripts/cluster_objects.py
@@ -216,12 +216,12 @@
sub.SetAttributeUpdateCallback(subUpdate)
try:
- data = sub.GetAttributes()
req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)
await asyncio.wait_for(event.wait(), timeout=11)
+ data = sub.GetAttributes()
if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 1):
raise ValueError("Current On/Off state should be 1")
@@ -232,6 +232,7 @@
await asyncio.wait_for(event.wait(), timeout=11)
+ data = sub.GetAttributes()
if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 0):
raise ValueError("Current On/Off state should be 0")
@@ -254,13 +255,12 @@
sub.SetAttributeUpdateCallback(subUpdate)
try:
- data = sub.GetAttributes()
-
req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)
await asyncio.wait_for(event.wait(), timeout=11)
+ data = sub.GetAttributes()
cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (not cluster.onOff):
raise ValueError("Current On/Off state should be True")
@@ -272,6 +272,7 @@
await asyncio.wait_for(event.wait(), timeout=11)
+ data = sub.GetAttributes()
cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (cluster.onOff):
raise ValueError("Current On/Off state should be False")
@@ -298,7 +299,6 @@
logger.info("Test Subscription With MinInterval of 0")
sub = await devCtrl.ReadAttribute(nodeid=NODE_ID,
attributes=[Clusters.OnOff, Clusters.LevelControl], reportInterval=(0, 60))
- data = sub.GetAttributes()
logger.info("Sending off command")
@@ -315,6 +315,7 @@
logger.info("Checking read back value is indeed 254")
+ data = sub.GetAttributes()
if (data[1][Clusters.LevelControl][Clusters.LevelControl.Attributes.CurrentLevel] != 254):
raise ValueError("Current Level should have been 254")