42
42
from datetime import datetime , timezone
43
43
44
44
import chip .clusters as Clusters
45
- import chip .discovery as Discovery
46
45
from chip import ChipDeviceCtrl
47
- from chip .exceptions import ChipStackError
48
46
from chip .testing .matter_testing import MatterBaseTest , TestStep , async_test_body , default_matter_test_main
49
47
from mobly import asserts
50
48
49
+
51
50
# Create logger
52
51
logger = logging .getLogger (__name__ )
53
52
@@ -211,7 +210,7 @@ async def run_steps_8_to_9(self, trusted_root_list_original_size: int, is_first_
211
210
logger .info (
212
211
f'Step #12 - Repeated Step #9: After waiting for failsafe timeout the Breadcrumb attribute: { breadcrumb_info } ' )
213
212
214
- def get_current_utc_time_str (self ):
213
+ def get_current_utc_time_str (self , c_time = None ):
215
214
'''
216
215
Get the current time in UTC and return it as a formatted string.
217
216
@@ -222,11 +221,15 @@ def get_current_utc_time_str(self):
222
221
Returns:
223
222
str: The current time formatted as 'dd-mm hh:mm:ss.sss'.
224
223
'''
225
- current_time = time .time () # Get the current timestamp
224
+
225
+ # If c_time is not provided, use the current time
226
+ if c_time is None :
227
+ c_time = time .time ()
228
+
226
229
formatted_time = (
227
- datetime .fromtimestamp (current_time , tz = timezone . utc )
230
+ datetime .fromtimestamp (c_time )
228
231
.strftime ('%d-%m %H:%M:%S.' )
229
- + str (int ((current_time % 1 ) * 1000 )).zfill (3 )
232
+ + str (int ((c_time % 1 ) * 1000 )).zfill (3 )
230
233
)
231
234
return formatted_time
232
235
@@ -236,8 +239,7 @@ def desc_TC_CGEN_2_2(self) -> str:
236
239
def pics_TC_CGEN_2_2 (self ):
237
240
"""Return the PICS definitions associated with this test."""
238
241
pics = [
239
- "CGEN.S" ,
240
- "OPCREDS.S" ,
242
+ "CGEN.S"
241
243
]
242
244
return pics
243
245
@@ -357,7 +359,7 @@ async def test_TC_CGEN_2_2(self):
357
359
logger .info (f'Step #6: The updated size of the num_trusted_roots_original list: { trusted_root_list_original_size_updated } ' )
358
360
359
361
self .step (7 )
360
- if self .check_pics ( 'PICS_SDK_CI_ONLY' ) :
362
+ if self .is_pics_sdk_ci_only :
361
363
# Step 7 - In CI environments, the 'expire_failsafe_timer' function is used to immediately force the failsafe timer to expire,
362
364
# avoiding the original wait time defined in PIXIT.CGEN.FailsafeExpiryLengthSeconds
363
365
# and speeding up test execution by setting the expiration time to 1 second.
@@ -658,14 +660,13 @@ async def test_TC_CGEN_2_2(self):
658
660
# Verify that DebugText is empty or has a maximum length of 512 characters
659
661
debug_text = resp .debugText
660
662
assert debug_text == '' or len (debug_text ) <= 512 , "debugText must be empty or have a maximum length of 512 characters"
661
- logger .info (f'Step #32: { run_type } - ArmFailSafeResponse with ErrorCode as OK({ resp .errorCode } )' )
663
+ logger .info (f'Step #32: ArmFailSafeResponse with ErrorCode as OK({ resp .errorCode } )' )
662
664
663
665
self .step (33 )
664
- if self .check_pics ( 'PICS_SDK_CI_ONLY' ) :
666
+ if self .is_pics_sdk_ci_only :
665
667
# Step 33 - In CI environments avoiding the original wait time defined in PIXIT.CGEN.FailsafeExpiryLengthSeconds
666
668
# and speeding up test execution by setting the expiration time to 2 seconds.
667
669
668
- # Running identifier
669
670
run_type = "CI Test"
670
671
logger .info (
671
672
f'Step 33: { run_type } - Bypassing failsafe expiration to avoid unnecessary delays in CI environment.' )
@@ -680,7 +681,6 @@ async def test_TC_CGEN_2_2(self):
680
681
# Wait PIXIT.CGEN.FailsafeExpiryLengthSeconds time with an additional 0.5-second buffer, not allowing the fully exire (max_fail_safe).
681
682
await asyncio .sleep (failsafe_timeout_less_than_max + .5 )
682
683
else :
683
- # Running identifier
684
684
run_type = "Cert Test"
685
685
686
686
# Set the failsafe expiration timeout to PIXIT.CGEN.FailsafeExpiryLengthSecondsseconds, must be less than maxFailsafe (max_fail_safe).
@@ -739,18 +739,18 @@ async def test_TC_CGEN_2_2(self):
739
739
logger .info (f'Step #37: ArmFailSafeResponse with ErrorCode as OK({ resp .errorCode } )' )
740
740
741
741
self .step (38 )
742
- if self .check_pics ( 'PICS_SDK_CI_ONLY' ) :
742
+ if self .is_pics_sdk_ci_only :
743
743
# In CI environment, bypass the wait for the failsafe expiration to avoid unnecessary delays.
744
744
run_type = "CI Test"
745
745
logger .info (
746
746
f'Step 38: { run_type } - Bypassing due to failsafe expiration workaround to avoid unnecessary delays in CI environment.' )
747
747
else :
748
748
run_type = "Cert Test"
749
- start_time = time .time ()
749
+ t_start = time .time ()
750
750
751
751
# Get the current time and format it for logging
752
- start_time_formatted = self .get_current_utc_time_str ()
753
- logger .info (f'Step #38: { run_type } - TH1 saves the Current time: { start_time_formatted } ' )
752
+ start_time_formatted = self .get_current_utc_time_str (t_start )
753
+ logger .info (f'Step #38: { run_type } - TH1 saves the Current time as t_start : { start_time_formatted } ' )
754
754
755
755
self .step (39 )
756
756
# Reused TrustedRootCertificate created in step #5 - Send command to add new trusted root certificate
@@ -773,24 +773,25 @@ async def test_TC_CGEN_2_2(self):
773
773
self .step (41 )
774
774
# Limit maxFailsafe to 20 seconds to prevent excessively long waits in tests (due maxFailsafe = 900 seconds).
775
775
maxFailsafe = failsafe_expiration_seconds
776
- if self .check_pics ( 'PICS_SDK_CI_ONLY' ) :
776
+ if self .is_pics_sdk_ci_only :
777
777
# In CI environment, bypass the wait for the failsafe expiration to avoid unnecessary delays.
778
778
run_type = "CI Test"
779
779
logger .info (
780
780
f'Step 41: { run_type } - Bypassing due to failsafe expiration workaround to avoid unnecessary delays in CI environment.' )
781
781
else :
782
782
run_type = "Cert Test"
783
783
784
- # Make TH1 wait until the target_time is greater than or equal to half of the maxFailsafe time.
784
+ # Make TH1 wait until the target_time is greater than or equal to half of the maxFailsafe time with an additional 0.5-second buffer .
785
785
target_time = maxFailsafe / 2
786
786
await asyncio .sleep (target_time + .5 )
787
787
788
788
# Verify that at least half of the maxFailsafe time has passed, allowing TH1 to proceed.
789
- current_time = self .get_current_utc_time_str ()
789
+ current_time_formatted = self .get_current_utc_time_str ()
790
790
logger .info (
791
- f'Step #41: { run_type } - TH1 can proceed. '
792
- f'Started expiration time: { start_time_formatted } , Current time: { current_time } , '
793
- f'Half of the maxFailsafe duration ({ target_time } seconds) has passed. '
791
+ f'Step #41: { run_type } - - MaxFailsafe is { maxFailsafe } . '
792
+ f'TH1 can proceed. Started expiration time (t_start): { start_time_formatted } , '
793
+ f'Current time: { current_time_formatted } , '
794
+ f'The target time ({ target_time } seconds) has passed. '
794
795
f'Confirmation that ArmFailSafe has not expired yet.'
795
796
)
796
797
@@ -806,7 +807,7 @@ async def test_TC_CGEN_2_2(self):
806
807
logger .info (f'Step #42: ArmFailSafeResponse with ErrorCode as OK({ resp .errorCode } )' )
807
808
808
809
self .step (43 )
809
- if self .check_pics ( 'PICS_SDK_CI_ONLY' ) :
810
+ if self .is_pics_sdk_ci_only :
810
811
# Step 43 - In CI environments, the 'expire_failsafe_timer' function is used to immediately force the failsafe timer to expire,
811
812
# avoiding the original wait time defined in PIXIT.CGEN.FailsafeExpiryLengthSeconds,
812
813
# and speeding up test execution by setting the expiration time to 1 second.
@@ -830,22 +831,21 @@ async def test_TC_CGEN_2_2(self):
830
831
else :
831
832
run_type = "Cert Test"
832
833
833
- # Calculate the target time (Tstart + maxFailsafe)
834
- target_time = start_time + maxFailsafe
835
- logger .info (f'Step #43: target_time: { target_time } ' )
834
+ # Calculate the target time (Tstart + maxFailsafe) with an additional 0.5-second buffer
835
+ target_time = (t_start + maxFailsafe ) + .5
836
836
837
- # Make TH1 wait until the target_time is greater than or equal to maxFailsafe time.
838
- while time .time () < target_time :
839
- await asyncio .sleep (0.1 )
837
+ # Wait until the target_time is reached using asyncio.sleep to avoid busy-waiting
838
+ await asyncio .sleep (target_time - time .time ())
840
839
841
840
# Checks if the elapsed time from start_time has met or exceeded maxFailsafe
842
841
# TH1 process can proceed
843
- current_time = self .get_current_utc_time_str ()
842
+ current_time_formatted = self .get_current_utc_time_str ()
844
843
logger .info (
845
- f'Step #43: { run_type } - TH1 can proceed. '
846
- f'Started expiration time: { start_time_formatted } , Current time: { current_time } , '
847
- f'MaxFailsafe duration ({ target_time } seconds) has passed. '
848
- f'Confirmation that ArmFailSafe has expired.'
844
+ f'Step #43: { run_type } - - MaxFailsafe is { maxFailsafe } . '
845
+ f'TH1 can proceed. , '
846
+ f'Current time: { current_time_formatted } , '
847
+ f'The target time ({ maxFailsafe } seconds) has passed since expiration time started (t_start): { start_time_formatted } . '
848
+ f'Confirmation that ArmFailSafe has not expired yet.'
849
849
)
850
850
851
851
self .step (44 )
0 commit comments