Skip to content

Commit 140840c

Browse files
Steps progress
1 parent fb7ba4a commit 140840c

File tree

1 file changed

+225
-8
lines changed

1 file changed

+225
-8
lines changed

src/python_testing/TC_SC_4_3.py

+225-8
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,241 @@
1515
# limitations under the License.
1616
#
1717

18-
import asyncio
1918
import logging
20-
import queue
21-
import time
22-
from threading import Event
19+
import re
2320

2421
import chip.clusters as Clusters
25-
from chip.clusters import ClusterObjects as ClustersObjects
26-
from chip.clusters.Attribute import SubscriptionTransaction, TypedAttributePath
27-
from chip.utils import CommissioningBuildingBlocks
2822
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
2923
from mobly import asserts
24+
from mdns_discovery.mdns_discovery import MdnsDiscovery, MdnsServiceType
3025

3126

27+
'''
28+
Category
29+
Functional conformance
30+
31+
Purpose
32+
The purpose of this test case is to verify that a Matter node is discoverable
33+
and can advertise its services in a Matter network.
34+
35+
Test Plan
36+
https://github.com/CHIP-Specifications/chip-test-plans/blob/master/src/securechannel.adoc#343-tc-sc-43-discovery-dut_commissionee
37+
'''
38+
3239
class TC_SC_4_3(MatterBaseTest):
40+
41+
ONE_HOUR_IN_MS = 3600000
42+
MAX_SAT_VALUE = 65535
43+
MAX_T_VALUE = 6
44+
45+
async def get_descriptor_server_list(self):
46+
return await self.read_single_attribute_check_success(
47+
endpoint=0,
48+
dev_ctrl=self.default_controller,
49+
cluster=Clusters.Descriptor,
50+
attribute=Clusters.Descriptor.Attributes.ServerList
51+
)
52+
async def get_idle_mode_threshhold_ms(self):
53+
return await self.read_single_attribute_check_success(
54+
endpoint=0,
55+
dev_ctrl=self.default_controller,
56+
cluster=Clusters.IcdManagement,
57+
attribute=Clusters.IcdManagement.Attributes.ActiveModeThreshold
58+
)
59+
async def get_icd_feature_map(self):
60+
return await self.read_single_attribute_check_success(
61+
endpoint=0,
62+
dev_ctrl=self.default_controller,
63+
cluster=Clusters.IcdManagement,
64+
attribute=Clusters.IcdManagement.Attributes.FeatureMap
65+
)
66+
def get_dut_instance_name(self) -> str:
67+
node_id = self.dut_node_id
68+
compressed_fabric_id = self.default_controller.GetCompressedFabricId()
69+
instance_name = f'{compressed_fabric_id:016X}-{node_id:016X}'
70+
return instance_name
71+
def get_operational_subtype(self) -> str:
72+
compressed_fabric_id = self.default_controller.GetCompressedFabricId()
73+
service_name = f'_I{compressed_fabric_id:016X}._sub.{MdnsServiceType.OPERATIONAL.value}'
74+
return service_name
75+
@staticmethod
76+
def verify_decimal_value(input_value, comparison_value: int):
77+
try:
78+
input_float = float(input_value)
79+
input_int = int(input_float)
80+
81+
if str(input_value).startswith("0") and input_int != 0:
82+
return (False, f"Input ({input_value}) has leading zeros.")
83+
84+
if input_float != input_int:
85+
return (False, f"Input ({input_value}) is not an integer.")
86+
87+
if input_int <= comparison_value:
88+
return (True, f"Input ({input_value}) is valid.")
89+
else:
90+
return (False, f"Input ({input_value}) exceeds the allowed value {comparison_value}.")
91+
except ValueError:
92+
return (False, f"Input ({input_value}) is not a valid decimal number.")
93+
def verify_t_value(self, t_value):
94+
# Verify t_value is a decimal number without leading zeros and less than or equal to 6
95+
try:
96+
T_int = int(t_value)
97+
if T_int < 0 or T_int > self.MAX_T_VALUE:
98+
return False, f"T value ({t_value}) is not in the range 0 to 6. ({t_value})"
99+
if str(t_value).startswith("0") and T_int != 0:
100+
return False, f"T value ({t_value}) has leading zeros."
101+
if T_int != float(t_value):
102+
return False, f"T value ({t_value}) is not an integer."
103+
104+
# Convert to bitmap and verify bit 0 is clear
105+
if T_int & 1 == 0:
106+
return True, f"T value ({t_value}) is valid and bit 0 is clear."
107+
else:
108+
return False, f"Bit 0 is not clear. T value ({t_value})"
109+
except ValueError:
110+
return False, "T value ({t_value}) is not a valid decimal number."
111+
@staticmethod
112+
def contains_ipv6_address(addresses):
113+
# IPv6 pattern for basic validation
114+
ipv6_pattern = re.compile(r'(?:(?:[0-9a-fA-F]{1,4}:){7}(?:[0-9a-fA-F]{1,4}|:))|(?:[0-9a-fA-F]{1,4}:){6}(?::[0-9a-fA-F]{1,4}|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?:[0-9a-fA-F]{1,4}:){5}(?:(?::[0-9a-fA-F]{1,4}){1,2}|:((?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?:[0-9a-fA-F]{1,4}:){4}(?:(?::[0-9a-fA-F]{1,4}){1,3}|:((?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?:[0-9a-fA-F]{1,4}:){3}(?:(?::[0-9a-fA-F]{1,4}){1,4}|:((?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?:[0-9a-fA-F]{1,4}:){2}(?:(?::[0-9a-fA-F]{1,4}){1,5}|:((?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?:[0-9a-fA-F]{1,4}:){1}(?:(?::[0-9a-fA-F]{1,4}){1,6}|:((?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|:)|(?::(?::[0-9a-fA-F]{1,4}){1,7}|:)', re.VERBOSE)
115+
116+
for address in addresses:
117+
if ipv6_pattern.match(address):
118+
return True, "At least one IPv6 address is present."
119+
120+
return False, "No IPv6 addresses found."
121+
33122
@async_test_body
34123
async def test_TC_SC_4_3(self):
35-
print()
124+
print(f"\n"*10)
125+
126+
supports_icd = None
127+
supports_lit = None
128+
active_mode_threshold_ms = None
129+
instance_name = None
130+
icd_value = None
131+
sit_mode = None
132+
133+
# *** STEP 1 ***
134+
self.print_step("1", "DUT is commissioned on the same fabric as TH.")
135+
136+
# *** STEP 2 ***
137+
self.print_step("2", "TH reads ServerList attribute from the Descriptor cluster on EP0. If the ICD Management cluster ID (70,0x46) is present in the list, set supports_icd to true, otherwise set supports_icd to false.")
138+
ep0_servers = await self.get_descriptor_server_list()
139+
140+
# Check if ep0_servers contains the ICD Management cluster ID (0x0046)
141+
supports_icd = Clusters.IcdManagement.id in ep0_servers
142+
logging.info(f"\n\n\tsupports_icd: {supports_icd}\n\n")
143+
144+
# *** STEP 3 ***
145+
self.print_step("3", "If supports_icd is true, TH reads ActiveModeThreshold from the ICD Management cluster on EP0 and saves as active_mode_threshold.")
146+
if supports_icd:
147+
active_mode_threshold_ms = await self.get_idle_mode_threshhold_ms()
148+
logging.info(f"\n\n\tactive_mode_threshold_ms: {active_mode_threshold_ms}\n\n")
149+
150+
# *** STEP 4 ***
151+
self.print_step("4", "If supports_icd is true, TH reads FeatureMap from the ICD Management cluster on EP0. If the LITS feature is set, set supports_lit to true. Otherwise set supports_lit to false.")
152+
if supports_icd:
153+
feature_map = await self.get_icd_feature_map()
154+
LITS = Clusters.IcdManagement.Bitmaps.Feature.kLongIdleTimeSupport
155+
supports_lit = bool(feature_map & LITS == LITS)
156+
logging.info(f"\n\n\tkLongIdleTimeSupport set: {supports_lit}\n\n")
157+
158+
# *** STEP 5 ***
159+
self.print_step("5", "TH constructs the instance name for the DUT as the 64-bit compressed Fabric identifier, and the assigned 64-bit Node identifier, each expressed as a fixed-length sixteen-character hexadecimal string, encoded as ASCII (UTF-8) text using capital letters, separated by a hyphen.")
160+
instance_name = self.get_dut_instance_name()
161+
162+
163+
164+
# PENDING STEPS 6-8
165+
166+
167+
mdns = MdnsDiscovery()
168+
operational = await mdns.get_operational_service(
169+
service_name=f"{instance_name}.{MdnsServiceType.OPERATIONAL.value}",
170+
service_type=MdnsServiceType.OPERATIONAL.value,
171+
log_output=True
172+
)
173+
174+
# *** STEP 9 ***
175+
self.print_step("9", "TH verifies ICD, SII, SAI, SAT, and T TXT record keys/vales of the returned record.")
176+
177+
# ICD TXT KEY
178+
if supports_lit:
179+
logging.info(f"supports_lit is true, verify the ICD key IS present in the TXT record, and it has the value of 0 or 1 (ASCII).")
180+
181+
# Verify the ICD key IS present
182+
asserts.assert_in('ICD', operational.txt_record, "ICD key is NOT present in the TXT record.")
183+
184+
# Verify it has the value of 0 or 1 (ASCII)
185+
icd_value = int(operational.txt_record['ICD'])
186+
asserts.assert_true(icd_value == 0 or icd_value == 1, "ICD value is different than 0 or 1 (ASCII).")
187+
else:
188+
logging.info(f"supports_lit is false, verify that the ICD key is NOT present in the TXT record.")
189+
asserts.assert_not_in('ICD', operational.txt_record, "ICD key is present in the TXT record.")
190+
191+
# SII TXT KEY
192+
if supports_icd and not supports_lit:
193+
sit_mode = True
194+
195+
if supports_lit and supports_lit:
196+
if icd_value == 0:
197+
sit_mode = True
198+
else:
199+
sit_mode = False
200+
201+
if not supports_icd:
202+
sit_mode = False
203+
204+
if sit_mode:
205+
logging.info(f"sit_mode is True, verify the SII key IS present.")
206+
asserts.assert_in('SII', operational.txt_record, "SII key is NOT present in the TXT record.")
207+
208+
logging.info(f"Verify SII value is a decimal with no leading zeros and is less than or equal to 3600000 (1h in ms).")
209+
sii_value = operational.txt_record['SII']
210+
result, message = self.verify_decimal_value(sii_value, self.ONE_HOUR_IN_MS)
211+
asserts.assert_true(result, message)
212+
213+
# SAI TXT KEY
214+
if supports_icd:
215+
logging.info(f"supports_icd is True, verify the SAI key IS present.")
216+
asserts.assert_in('SAI', operational.txt_record, "SAI key is NOT present in the TXT record.")
217+
218+
logging.info(f"Verify SAI value is a decimal with no leading zeros and is less than or equal to 3600000 (1h in ms).")
219+
sai_value = operational.txt_record['SAI']
220+
result, message = self.verify_decimal_value(sai_value, self.ONE_HOUR_IN_MS)
221+
asserts.assert_true(result, message)
222+
223+
# SAT TXT KEY
224+
if 'SAT' in operational.txt_record:
225+
logging.info(f"SAT key is present in TXT record, verify that it is a decimal value with no leading zeros and is less than or equal to 65535.")
226+
sat_value = operational.txt_record['SAT']
227+
result, message = self.verify_decimal_value(sat_value, self.MAX_SAT_VALUE)
228+
asserts.assert_true(result, message)
229+
230+
if supports_icd:
231+
logging.info(f"supports_icd is True, verify the SAT value is equal to active_mode_threshold.")
232+
asserts.assert_equal(int(sat_value), active_mode_threshold_ms)
233+
234+
# # T TXT KEY
235+
# if 'T' in operational.txt_record:
236+
# logging.info(f"T key is present in TXT record, verify if that it is a decimal value with no leading zeros and is less than or equal to 6. Convert the value to a bitmap and verify bit 0 is clear.")
237+
# t_value = operational.txt_record['T']
238+
# result, message = self.verify_t_value(t_value)
239+
# asserts.assert_true(result, message)
240+
241+
# AAAA
242+
logging.info(f"Verify the AAAA record contains at least one IPv6 address")
243+
result, message = self.contains_ipv6_address(operational.addresses)
244+
asserts.assert_true(result, message)
245+
246+
# *** STEP 10 ***
247+
self.print_step("10", "Verify DUT returns a PTR record with DNS-SD instance name set instance_name.")
248+
service_types = await mdns.get_service_types(log_output=True)
249+
op_sub_type = self.get_operational_subtype()
250+
asserts.assert_in(op_sub_type, service_types, f"No PTR record with DNS-SD instance name '{op_sub_type}'")
251+
252+
print(f"\n"*10)
36253

37254

38255
if __name__ == "__main__":

0 commit comments

Comments
 (0)