@@ -106,7 +106,7 @@ def stash_globally(o: object) -> str:
106
106
return id
107
107
108
108
109
- def unstash_globally (id : str ) -> object :
109
+ def unstash_globally (id : str ) -> Any :
110
110
return _GLOBAL_DATA .get (id )
111
111
112
112
@@ -205,7 +205,7 @@ def utc_datetime_from_posix_time_ms(posix_time_ms: int) -> datetime:
205
205
return datetime .fromtimestamp (seconds , timezone .utc ) + timedelta (milliseconds = millis )
206
206
207
207
208
- def compare_time (received : int , offset : timedelta = timedelta (), utc : int = None , tolerance : timedelta = timedelta (seconds = 5 )) -> None :
208
+ def compare_time (received : int , offset : timedelta = timedelta (), utc : Optional [ int ] = None , tolerance : timedelta = timedelta (seconds = 5 )) -> None :
209
209
if utc is None :
210
210
utc = utc_time_in_matter_epoch ()
211
211
@@ -243,7 +243,7 @@ def __init__(self, expected_cluster: ClusterObjects.Cluster):
243
243
"""This class creates a queue to store received event callbacks, that can be checked by the test script
244
244
expected_cluster: is the cluster from which the events are expected
245
245
"""
246
- self ._q = queue .Queue ()
246
+ self ._q : queue . Queue = queue .Queue ()
247
247
self ._expected_cluster = expected_cluster
248
248
249
249
async def start (self , dev_ctrl , node_id : int , endpoint : int , fabric_filtered : bool = False , min_interval_sec : int = 0 , max_interval_sec : int = 30 ) -> Any :
@@ -976,7 +976,7 @@ def get_test_steps(self, test: str) -> list[TestStep]:
976
976
steps = self .get_defined_test_steps (test )
977
977
return [TestStep (1 , "Run entire test" )] if steps is None else steps
978
978
979
- def get_defined_test_steps (self , test : str ) -> list [TestStep ]:
979
+ def get_defined_test_steps (self , test : str ) -> Optional [ list [TestStep ] ]:
980
980
steps_name = f'steps_{ test .removeprefix ("test_" )} '
981
981
try :
982
982
fn = getattr (self , steps_name )
@@ -996,7 +996,7 @@ def get_test_pics(self, test: str) -> list[str]:
996
996
pics = self ._get_defined_pics (test )
997
997
return [] if pics is None else pics
998
998
999
- def _get_defined_pics (self , test : str ) -> list [TestStep ]:
999
+ def _get_defined_pics (self , test : str ) -> Optional [ list [str ] ]:
1000
1000
steps_name = f'pics_{ test .removeprefix ("test_" )} '
1001
1001
try :
1002
1002
fn = getattr (self , steps_name )
@@ -1093,7 +1093,7 @@ def matter_test_config(self) -> MatterTestConfig:
1093
1093
return unstash_globally (self .user_params .get ("matter_test_config" ))
1094
1094
1095
1095
@property
1096
- def default_controller (self ) -> ChipDeviceCtrl :
1096
+ def default_controller (self ) -> ChipDeviceCtrl . ChipDeviceController :
1097
1097
return unstash_globally (self .user_params .get ("default_controller" ))
1098
1098
1099
1099
@property
@@ -1162,7 +1162,7 @@ def check_pics(self, pics_key: str) -> bool:
1162
1162
def is_pics_sdk_ci_only (self ) -> bool :
1163
1163
return self .check_pics ('PICS_SDK_CI_ONLY' )
1164
1164
1165
- async def open_commissioning_window (self , dev_ctrl : Optional [ChipDeviceCtrl ] = None , node_id : Optional [int ] = None , timeout : int = 900 ) -> CustomCommissioningParameters :
1165
+ async def open_commissioning_window (self , dev_ctrl : Optional [ChipDeviceCtrl . ChipDeviceController ] = None , node_id : Optional [int ] = None , timeout : int = 900 ) -> CustomCommissioningParameters :
1166
1166
rnd_discriminator = random .randint (0 , 4095 )
1167
1167
if dev_ctrl is None :
1168
1168
dev_ctrl = self .default_controller
@@ -1178,14 +1178,14 @@ async def open_commissioning_window(self, dev_ctrl: Optional[ChipDeviceCtrl] = N
1178
1178
asserts .fail (e .status , 'Failed to open commissioning window' )
1179
1179
1180
1180
async def read_single_attribute (
1181
- self , dev_ctrl : ChipDeviceCtrl , node_id : int , endpoint : int , attribute : object , fabricFiltered : bool = True ) -> object :
1181
+ self , dev_ctrl : ChipDeviceCtrl . ChipDeviceController , node_id : int , endpoint : int , attribute : object , fabricFiltered : bool = True ) -> object :
1182
1182
result = await dev_ctrl .ReadAttribute (node_id , [(endpoint , attribute )], fabricFiltered = fabricFiltered )
1183
1183
data = result [endpoint ]
1184
1184
return list (data .values ())[0 ][attribute ]
1185
1185
1186
1186
async def read_single_attribute_check_success (
1187
1187
self , cluster : Clusters .ClusterObjects .ClusterCommand , attribute : Clusters .ClusterObjects .ClusterAttributeDescriptor ,
1188
- dev_ctrl : ChipDeviceCtrl = None , node_id : int = None , endpoint : int = None , fabric_filtered : bool = True , assert_on_error : bool = True , test_name : str = "" ) -> object :
1188
+ dev_ctrl : Optional [ ChipDeviceCtrl . ChipDeviceController ] = None , node_id : Optional [ int ] = None , endpoint : Optional [ int ] = None , fabric_filtered : bool = True , assert_on_error : bool = True , test_name : str = "" ) -> object :
1189
1189
if dev_ctrl is None :
1190
1190
dev_ctrl = self .default_controller
1191
1191
if node_id is None :
@@ -1216,7 +1216,7 @@ async def read_single_attribute_check_success(
1216
1216
1217
1217
async def read_single_attribute_expect_error (
1218
1218
self , cluster : object , attribute : object ,
1219
- error : Status , dev_ctrl : ChipDeviceCtrl = None , node_id : int = None , endpoint : int = None ,
1219
+ error : Status , dev_ctrl : Optional [ ChipDeviceCtrl . ChipDeviceController ] = None , node_id : Optional [ int ] = None , endpoint : Optional [ int ] = None ,
1220
1220
fabric_filtered : bool = True , assert_on_error : bool = True , test_name : str = "" ) -> object :
1221
1221
if dev_ctrl is None :
1222
1222
dev_ctrl = self .default_controller
@@ -1241,7 +1241,7 @@ async def read_single_attribute_expect_error(
1241
1241
1242
1242
return attr_ret
1243
1243
1244
- async def write_single_attribute (self , attribute_value : object , endpoint_id : int = None , expect_success : bool = True ) -> Status :
1244
+ async def write_single_attribute (self , attribute_value : object , endpoint_id : Optional [ int ] = None , expect_success : bool = True ) -> Status :
1245
1245
"""Write a single `attribute_value` on a given `endpoint_id` and assert on failure.
1246
1246
1247
1247
If `endpoint_id` is None, the default DUT endpoint for the test is selected.
@@ -1263,7 +1263,7 @@ async def write_single_attribute(self, attribute_value: object, endpoint_id: int
1263
1263
1264
1264
async def send_single_cmd (
1265
1265
self , cmd : Clusters .ClusterObjects .ClusterCommand ,
1266
- dev_ctrl : ChipDeviceCtrl = None , node_id : int = None , endpoint : int = None ,
1266
+ dev_ctrl : Optional [ ChipDeviceCtrl . ChipDeviceController ] = None , node_id : Optional [ int ] = None , endpoint : Optional [ int ] = None ,
1267
1267
timedRequestTimeoutMs : typing .Union [None , int ] = None ,
1268
1268
payloadCapability : int = ChipDeviceCtrl .TransportPayloadCapability .MRP_PAYLOAD ) -> object :
1269
1269
if dev_ctrl is None :
0 commit comments