14
14
)
15
15
from matter_server .common .models import MatterNodeData
16
16
17
- from .device_type_instance import MatterDeviceTypeInstance
18
- from . device_types import ALL_TYPES as DEVICE_TYPES , Aggregator , BridgedDevice , RootNode
19
- from . node_device import (
20
- AbstractMatterNodeDevice ,
21
- MatterBridgedNodeDevice ,
22
- MatterNodeDevice ,
17
+ from .device_types import (
18
+ ALL_TYPES as DEVICE_TYPES ,
19
+ Aggregator ,
20
+ BridgedDevice ,
21
+ DeviceType ,
22
+ RootNode ,
23
23
)
24
24
25
25
LOGGER = logging .getLogger (__name__ )
@@ -53,9 +53,33 @@ def __init__(
53
53
self .node = node
54
54
self .endpoint_id = endpoint_id
55
55
self .clusters : dict [int , Clusters .Cluster ] = {}
56
- # unwrap cluster and clusterattributes from raw node data attributes
57
- for attribute_path , attribute_value in attributes_data .items ():
58
- self .set_attribute_value (attribute_path , attribute_value )
56
+ self .device_types : set [DeviceType ] = set ()
57
+ self .update (attributes_data )
58
+
59
+ @property
60
+ def is_bridged_device (self ) -> bool :
61
+ """Return if this endpoint represents a Bridged device."""
62
+ return BridgedDevice in self .device_types
63
+
64
+ @property
65
+ def is_composed_device (self ) -> bool :
66
+ """Return if this endpoint belons to a composed device."""
67
+ return self .node .get_compose_parent (self .endpoint_id ) is not None
68
+
69
+ @property
70
+ def device_info (self ) -> Clusters .BasicInformation | Clusters .BridgedDeviceBasic :
71
+ """
72
+ Return device info.
73
+
74
+ If this endpoint represents a BridgedDevice, returns BridgedDeviceBasic.
75
+ If this endpoint represents a ComposedDevice, returns the info of the compose device.
76
+ Otherwise, returns BasicInformation from the Node itself (endpoint 0).
77
+ """
78
+ if self .is_bridged_device :
79
+ return self .get_cluster (Clusters .BridgedDeviceBasic )
80
+ if compose_parent := self .node .get_compose_parent (self .endpoint_id ):
81
+ return compose_parent .device_info
82
+ return self .node .device_info
59
83
60
84
def has_cluster (self , cluster : type [_CLUSTER_T ] | int ) -> bool :
61
85
"""Check if endpoint has a specific cluster."""
@@ -70,8 +94,8 @@ def get_cluster(self, cluster: type[_CLUSTER_T] | int) -> _CLUSTER_T | None:
70
94
Return None if the Cluster is not present on the node.
71
95
"""
72
96
if isinstance (cluster , type ):
73
- return self .clusters [ cluster .id ] # type: ignore[no-any-return]
74
- return self .clusters [ cluster ] # type: ignore[no-any-return]
97
+ return self .clusters . get ( cluster .id )
98
+ return self .clusters . get ( cluster )
75
99
76
100
def get_attribute_value (
77
101
self ,
@@ -155,91 +179,69 @@ def set_attribute_value(self, attribute_path: str, attribute_value: Any) -> None
155
179
)
156
180
setattr (cluster_instance , attribute_name , attribute_value )
157
181
182
+ def update (self , attributes_data : dict [str , Any ]) -> None :
183
+ """Update MatterEndpoint from (endpoint-specific) raw Attributes data."""
184
+ # unwrap cluster and clusterattributes from raw node data attributes
185
+ for attribute_path , attribute_value in attributes_data .items ():
186
+ self .set_attribute_value (attribute_path , attribute_value )
187
+ # extract device types from Descriptor Cluster
188
+ cluster = self .get_cluster (Clusters .Descriptor )
189
+ assert cluster is not None
190
+ for dev_info in cluster .deviceTypeList : # type: ignore[unreachable]
191
+ device_type = DEVICE_TYPES .get (dev_info .type )
192
+ if device_type is None :
193
+ LOGGER .debug ("Found unknown device type %s" , dev_info )
194
+ continue
195
+ self .device_types .add (device_type )
196
+
197
+ def __repr__ (self ) -> str :
198
+ """Return the representation."""
199
+ return f"<MatterEndoint { self .endpoint_id } (node { self .node .node_id } )>"
200
+
158
201
159
202
class MatterNode :
160
203
"""Representation of a Matter Node."""
161
204
162
205
def __init__ (self , node_data : MatterNodeData ) -> None :
163
206
"""Initialize MatterNode from MatterNodeData."""
164
207
self .endpoints : dict [int , MatterEndpoint ] = {}
165
- self .root_device_type_instance : MatterDeviceTypeInstance [RootNode ] | None = None
166
- self .aggregator_device_type_instance : MatterDeviceTypeInstance [
167
- Aggregator
168
- ] | None = None
169
- self .node_devices : list [AbstractMatterNodeDevice ] = []
170
- self .device_type_instances : list [MatterDeviceTypeInstance ] = []
208
+ self ._is_bridge_device : bool = False
209
+ # composed devices reference to other endpoints through the partsList attribute
210
+ # create a mapping table
211
+ self ._composed_endpoints : dict [int , int ] = {}
171
212
self .update (node_data )
172
213
173
- def update (self , node_data : MatterNodeData ) -> None :
174
- """Update MatterNode from MatterNodeData."""
175
- # pylint: disable=too-many-branches
176
- self .node_data = node_data
177
- # collect per endpoint data
178
- endpoint_data : dict [int , dict [str , Any ]] = {}
179
- for attribute_path , attribute_data in node_data .attributes .items ():
180
- endpoint_id = int (attribute_path .split ("/" )[0 ])
181
- endpoint_data .setdefault (endpoint_id , {})
182
- endpoint_data [endpoint_id ][attribute_path ] = attribute_data
183
- # TODO: Should we update existing endpoints instead of overwriting them?
184
- for endpoint_id , attributes_data in endpoint_data .items ():
185
- self .endpoints [endpoint_id ] = MatterEndpoint (
186
- endpoint_id = endpoint_id , attributes_data = attributes_data , node = self
187
- )
188
- # lookup device types from node data
189
- for endpoint in self .endpoints .values ():
190
- # get DeviceTypeList Attribute on the Descriptor cluster
191
- cluster = endpoint .get_cluster (Clusters .Descriptor )
192
- if not cluster :
193
- LOGGER .debug (
194
- "No Descriptor cluster found on endpoint %s, Node %s" ,
195
- endpoint .endpoint_id ,
196
- endpoint .node .node_id ,
197
- )
198
- continue
199
-
200
- for dev_info in cluster .deviceTypeList : # type: ignore[unreachable]
201
- device_type = DEVICE_TYPES .get (dev_info .type )
202
- if device_type is None :
203
- LOGGER .debug ("Found unknown device type %s" , dev_info )
204
- continue
205
-
206
- instance : MatterDeviceTypeInstance [Any ] = MatterDeviceTypeInstance (
207
- self , device_type , endpoint , dev_info .revision
208
- )
209
- if device_type is RootNode :
210
- self .root_device_type_instance = instance
211
- elif device_type is Aggregator :
212
- self .aggregator_device_type_instance = instance
213
- else :
214
- self .device_type_instances .append (instance )
215
-
216
- if self .root_device_type_instance is None :
217
- raise ValueError ("No root device found" )
218
-
219
- # parse node devices
220
- self .node_devices = []
221
- if self .aggregator_device_type_instance :
222
- for instance in self .device_type_instances :
223
- if instance .device_type == BridgedDevice :
224
- self .node_devices .append (MatterBridgedNodeDevice (instance ))
225
- else :
226
- self .node_devices .append (MatterNodeDevice (self ))
227
-
228
- def update_attribute (self , attribute_path : str , new_value : Any ) -> None :
229
- """Handle Attribute value update."""
230
- endpoint_id = int (attribute_path .split ("/" )[0 ])
231
- self .endpoints [endpoint_id ].set_attribute_value (attribute_path , new_value )
232
-
233
214
@property
234
215
def node_id (self ) -> int :
235
216
"""Return Node ID."""
236
217
return self .node_data .node_id
237
218
219
+ @property
220
+ def name (self ) -> str | None :
221
+ """Return friendly name for this node."""
222
+ if info := self .device_info :
223
+ return cast (str , info .nodeLabel )
224
+ return None
225
+
238
226
@property
239
227
def available (self ) -> bool :
240
228
"""Return availability of the node."""
241
229
return self .node_data .available
242
230
231
+ @property
232
+ def device_info (self ) -> Clusters .BasicInformation :
233
+ """
234
+ Return device info for this Node.
235
+
236
+ Returns BasicInformation from the Node itself (endpoint 0).
237
+ """
238
+ return self .get_cluster (0 , Clusters .BasicInformation )
239
+
240
+ @property
241
+ def is_bridge_device (self ) -> bool :
242
+ """Return if this Node is a Bridge/Aggregator device."""
243
+ return self ._is_bridge_device
244
+
243
245
def get_attribute_value (
244
246
self ,
245
247
endpoint : int ,
@@ -270,18 +272,56 @@ def get_cluster(
270
272
"""
271
273
return self .endpoints [endpoint ].get_cluster (cluster )
272
274
273
- @property
274
- def name (self ) -> str | None :
275
- """Return friendly name for this node."""
276
- if self .root_device_type_instance is None :
277
- return None
278
- return cast (
279
- str ,
280
- self .root_device_type_instance .endpoint .get_attribute_value (
281
- None ,
282
- Clusters .BasicInformation .Attributes .NodeLabel ,
283
- ),
275
+ def get_compose_parent (self , endpoint_id : int ) -> MatterEndpoint | None :
276
+ """Return endpoint of parent if the endpoint belongs to a Composed device."""
277
+ if parent_id := self ._composed_endpoints .get (endpoint_id ):
278
+ return self .endpoints [parent_id ]
279
+ return None
280
+
281
+ def get_compose_child_ids (self , endpoint_id : int ) -> tuple [int , ...] | None :
282
+ """Return endpoint ID's of any childs if the endpoint represents a Composed device."""
283
+ return tuple (x for x , y in self ._composed_endpoints .items () if y == endpoint_id )
284
+
285
+ def update (self , node_data : MatterNodeData ) -> None :
286
+ """Update MatterNode from MatterNodeData."""
287
+ self .node_data = node_data
288
+ # collect per endpoint data
289
+ endpoint_data : dict [int , dict [str , Any ]] = {}
290
+ for attribute_path , attribute_data in node_data .attributes .items ():
291
+ endpoint_id = int (attribute_path .split ("/" )[0 ])
292
+ if endpoint_id not in endpoint_data :
293
+ endpoint_data [endpoint_id ] = {}
294
+ endpoint_data [endpoint_id ][attribute_path ] = attribute_data
295
+ for endpoint_id , attributes_data in endpoint_data .items ():
296
+ if endpoint_id in self .endpoints :
297
+ self .endpoints [endpoint_id ].update (attributes_data )
298
+ else :
299
+ self .endpoints [endpoint_id ] = MatterEndpoint (
300
+ endpoint_id = endpoint_id , attributes_data = attributes_data , node = self
301
+ )
302
+ # lookup if this is a bridge device
303
+ self ._is_bridge_device = any (
304
+ Aggregator in x .device_types for x in self .endpoints .values ()
284
305
)
306
+ # composed devices reference to other endpoints through the partsList attribute
307
+ # create a mapping table to quickly map this
308
+ for endpoint in self .endpoints .values ():
309
+ if RootNode in endpoint .device_types :
310
+ # ignore root endoint
311
+ continue
312
+ if Aggregator in endpoint .device_types :
313
+ # ignore Bridge endpoint (as that will also use partsList to indicate its childs)
314
+ continue
315
+ descriptor = endpoint .get_cluster (Clusters .Descriptor )
316
+ assert descriptor is not None
317
+ if descriptor .partsList : # type: ignore[unreachable]
318
+ for endpoint_id in descriptor .partsList :
319
+ self ._composed_endpoints [endpoint_id ] = endpoint .endpoint_id
320
+
321
+ def update_attribute (self , attribute_path : str , new_value : Any ) -> None :
322
+ """Handle Attribute value update."""
323
+ endpoint_id = int (attribute_path .split ("/" )[0 ])
324
+ self .endpoints [endpoint_id ].set_attribute_value (attribute_path , new_value )
285
325
286
326
def __repr__ (self ) -> str :
287
327
"""Return the representation."""
0 commit comments