Skip to content

Commit 7565e09

Browse files
committed
Add the Python tools for updating attr to NVM
Add the python tools for updating attributes to NVM: project-chip#35785
1 parent 358c064 commit 7565e09

File tree

3 files changed

+320
-0
lines changed

3 files changed

+320
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import requests
2+
from bs4 import BeautifulSoup
3+
import sys
4+
5+
def is_attribute_non_volatile(source, attribute_id):
6+
"""
7+
Checks if the attribute with the given hexadecimal ID has 'nonVolatile' persistence in the XML.
8+
Also prints the attribute name if found.
9+
10+
Args:
11+
source: Either a URL (string) or a file path (string) to the XML file.
12+
attribute_id: The hexadecimal ID of the attribute to check (e.g., 0x0000).
13+
14+
Returns:
15+
True if the attribute has 'nonVolatile' persistence, False otherwise.
16+
"""
17+
18+
if source.startswith("http://") or source.startswith("https://"):
19+
response = requests.get(source)
20+
response.raise_for_status()
21+
content = response.content
22+
23+
# Debug: Check the response status code
24+
print("Response status code:", response.status_code)
25+
26+
else:
27+
with open(source, 'r') as file:
28+
content = file.read()
29+
30+
print(f"source = {source}")
31+
print(f"attribute_id = {attribute_id}")
32+
soup = BeautifulSoup(content, 'lxml-xml')
33+
34+
# Debug: Print the parsed XML structure (for a quick visual check)
35+
# print("Parsed XML:", soup.prettify())
36+
37+
# Find the attribute with the given ID (convert hex ID from XML to integer for comparison)
38+
for attribute in soup.find_all('attribute', {'id': lambda x: x is not None and int(x, 16) == attribute_id}):
39+
40+
mandatoryConform = attribute.find('mandatoryConform')
41+
42+
# Ignore conformance which is Zigbee
43+
if mandatoryConform:
44+
condition = mandatoryConform.find('condition')
45+
if condition and condition.get('name') == 'Zigbee':
46+
print(f"Ignore conformance which is Zigbee")
47+
continue
48+
49+
quality_node = attribute.find('quality')
50+
51+
# Debug: Check if the quality node was found
52+
print("Quality node found:", quality_node is not None)
53+
54+
if quality_node and quality_node.get('persistence') == 'nonVolatile':
55+
print(f"Attribute name: {attribute['name']}")
56+
return True
57+
58+
return False
59+
60+
if __name__ == "__main__":
61+
if len(sys.argv) != 3:
62+
print(f"Usage: python {sys.argv[0]} <source> <attribute_id>")
63+
print(f"e.g. python {sys.argv[0]} data_model/1.3/clusters/OnOff.xml 0x0000")
64+
sys.exit(1)
65+
66+
source = sys.argv[1]
67+
attribute_id_to_check = int(sys.argv[2], 16)
68+
69+
result = is_attribute_non_volatile(source, attribute_id_to_check)
70+
print(f"Attribute with ID '0x{attribute_id_to_check:04X}' is non-volatile: {result}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import os
2+
import sys
3+
import xml.etree.ElementTree as ET
4+
5+
name2XmlMap = {}
6+
id2XmlMap = {}
7+
parentClusterMap = {}
8+
9+
def parse_xml_files_in_folder(folder_path):
10+
print(f"DEBUG: Starting to parse XML files in folder: {folder_path}")
11+
12+
for filename in os.listdir(folder_path):
13+
if filename.endswith('.xml'):
14+
full_path = os.path.join(folder_path, filename)
15+
print("==========================================================")
16+
print(f"DEBUG: Processing file: {filename}")
17+
18+
try:
19+
tree = ET.parse(full_path)
20+
root = tree.getroot()
21+
print(f"DEBUG: Successfully parsed {filename}")
22+
23+
# If it is a derived cluster
24+
classification = root.find('classification')
25+
if classification is not None:
26+
baseCluster = classification.get('baseCluster')
27+
else:
28+
baseCluster = None
29+
30+
31+
# Find the clusterIds
32+
clusterIds = root.find('clusterIds')
33+
if clusterIds:
34+
print(clusterIds)
35+
36+
clusterIdSet = clusterIds.findall('clusterId')
37+
print(clusterIdSet)
38+
39+
for clusterId in clusterIdSet:
40+
if baseCluster is not None:
41+
parentClusterMap[ int(clusterId.get('id'), 16) ] = {'name':clusterId.get('name'), 'file':full_path, 'baseCluster': baseCluster}
42+
print(f'Found cluster {clusterId} with parent cluster {baseCluster}')
43+
elif clusterId.get('id') is not None:
44+
id2XmlMap[ int(clusterId.get('id'), 16) ] = {'name':clusterId.get('name'), 'file':full_path}
45+
print(f'Found cluster with id {clusterId}')
46+
else:
47+
print(f'Found cluster without id {clusterId}')
48+
49+
name2XmlMap[str(clusterId.get('name'))] = {'file':full_path}
50+
51+
except ET.ParseError as e:
52+
print(f"ERROR: Error parsing {filename}: {e}")
53+
54+
print(name2XmlMap)
55+
56+
for key, value in parentClusterMap.items():
57+
print(f'Processing derived class: name: {value["name"]}, parent: {value["baseCluster"]}')
58+
id2XmlMap[key] = {'name': value['name'], 'file': name2XmlMap[value['baseCluster']]['file']}
59+
60+
61+
62+
if __name__ == "__main__":
63+
if len(sys.argv) < 2:
64+
print(f"Usage: python {sys.argv[0]} folder_path")
65+
print(f"e.g. python {sys.argv[0]} data_model/1.3/clusters")
66+
sys.exit(1)
67+
68+
folder_path = sys.argv[1]
69+
parse_xml_files_in_folder(folder_path)
70+
print(id2XmlMap)
+180
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import argparse
2+
import requests
3+
import json
4+
import os
5+
import glob
6+
7+
from ParseXmlFolder import *
8+
from ClusterReader import *
9+
10+
def download_zap_file(url, file_name):
11+
print(f"[DEBUG] Attempting to download file from URL: {url}")
12+
try:
13+
response = requests.get(url)
14+
response.raise_for_status() # Ensure we got the file
15+
with open(file_name, 'wb') as file:
16+
file.write(response.content)
17+
print(f"[DEBUG] File downloaded successfully: {file_name}")
18+
return file_name
19+
except requests.exceptions.RequestException as e:
20+
print(f"[DEBUG] Failed to download file: {e}")
21+
return None
22+
23+
def parse_zap_file(file_name):
24+
print(f"[DEBUG] Attempting to parse file: {file_name}")
25+
try:
26+
with open(file_name, 'r') as file:
27+
data = json.load(file) # Assuming it's a JSON file
28+
print(f"[DEBUG] File parsed successfully. Data loaded.")
29+
return data
30+
except json.JSONDecodeError as e:
31+
print(f"[DEBUG] Failed to parse JSON: {e}")
32+
return None
33+
except Exception as e:
34+
print(f"[DEBUG] Error reading file: {e}")
35+
return None
36+
37+
def find_ram_attributes_and_replace(data, replace=False):
38+
print(f"[DEBUG] Searching for attributes with storageOption 'RAM'.")
39+
ram_attributes = []
40+
modified = False
41+
42+
# Traverse the JSON to find parent nodes and their attributes
43+
for endpointType in data.get('endpointTypes', []): # Get 'endpointType' is the parent node section
44+
endpoint_id = endpointType.get('id')
45+
# Filter rootnode endpoint
46+
if endpoint_id == 1:
47+
continue
48+
49+
for cluster in endpointType.get('clusters', []): # Get 'clusters' is the parent node section
50+
cluster_name = cluster.get('name')
51+
cluster_code = cluster.get('code')
52+
53+
if cluster_code > 0x7FFF: # Not standard cluster ID
54+
continue;
55+
56+
for attribute in cluster.get('attributes', []): # Iterate through the attributes
57+
attribute_code = attribute.get('code') # Get the attribute's code
58+
# Filter global element
59+
if attribute_code >= 0xF000: # Golbal attribute 0xF000 - 0xFFFE
60+
continue
61+
if attribute.get('storageOption') == 'RAM': # Check if the storageOption is 'RAM'
62+
63+
attribute_name = attribute.get('name') # Get the attribute's name
64+
65+
print(f"cluster_code = {cluster_code}, attribute_code={attribute_code}, attribute_name = {attribute_name}")
66+
spec_xml = id2XmlMap[cluster_code]['file']
67+
if not is_attribute_non_volatile(spec_xml, attribute_code):
68+
print(f"\033[41m Ignore cluster: {cluster_name}, name:{attribute_name} \033[0m")
69+
continue
70+
71+
print(f"\033[44m [DEBUG] Found RAM attribute: Parent Code: {cluster_code}, {cluster_name}, Attribute Code: {attribute_code}, Attribute Name: {attribute_name} \033[0m")
72+
ram_attributes.append({
73+
"cluster_code": cluster_code,
74+
"cluster_name" : cluster_name,
75+
"attribute_code": attribute_code,
76+
"attribute_name": attribute_name,
77+
"attribute": attribute
78+
})
79+
# Replace RAM to NVM
80+
if replace:
81+
attribute['storageOption'] = 'NVM'
82+
modified = True
83+
84+
print(f"[DEBUG] Found {len(ram_attributes)} attributes with storageOption 'RAM'.")
85+
for entry in ram_attributes:
86+
print(f"Parent Code: {entry['cluster_code']}, Attribute Code: {entry['attribute_code']}")
87+
print(json.dumps(entry['attribute'], indent=4))
88+
89+
return modified
90+
91+
def process_zap_file(input_file, in_place):
92+
# Check if it's a URL or a local file
93+
if input_file.startswith("http://") or input_file.startswith("https://"):
94+
print(f"[DEBUG] Detected URL input: {input_file}")
95+
local_file_name = "downloaded_zap_file.zap"
96+
input_file = download_zap_file(input_file, local_file_name)
97+
if not input_file:
98+
print(f"[DEBUG] Exiting due to failed file download.")
99+
return
100+
else:
101+
print(f"[DEBUG] Detected local file input: {input_file}")
102+
if not os.path.isfile(input_file):
103+
print(f"[DEBUG] Error: The file {input_file} does not exist.")
104+
return
105+
106+
# Parse the file and find RAM attributes
107+
parsed_data = parse_zap_file(input_file)
108+
if parsed_data:
109+
print(f"[DEBUG] Modifying storageOption from 'RAM' to 'NVM' in local file: {input_file}")
110+
modified = find_ram_attributes_and_replace(parsed_data, True)
111+
112+
if modified:
113+
# If it's a local file, modify the storageOption and save it
114+
if os.path.isfile(input_file):
115+
116+
if in_place:
117+
# Save the modified JSON back to the original file
118+
modified_file = input_file
119+
print(f"[DEBUG] Saving in place: {modified_file}")
120+
else:
121+
# Save the modified JSON back to the file (or to a new file)
122+
modified_file = input_file.replace(".zap", "_modified.zap")
123+
print(f"[DEBUG] Saving modified file as: {modified_file}")
124+
125+
with open(modified_file, 'w') as file:
126+
json.dump(parsed_data, file, indent=2)
127+
print(f"[DEBUG] File saved successfully.")
128+
else:
129+
# Handle case where it's a URL (output RAM attributes, don't modify)
130+
print(f"[DEBUG] Not local file, unable to modify")
131+
else:
132+
print(f"[DEBUG] No modifications were needed.")
133+
else:
134+
print(f"[DEBUG] Failed to parse the .zap file.")
135+
136+
def process_directory(directory, in_place):
137+
# Find all *.zap files in the directory
138+
print(f"[DEBUG] Processing all *.zap files in directory: {directory}")
139+
zap_files = glob.glob(os.path.join(directory, "*.zap"))
140+
if not zap_files:
141+
print(f"[DEBUG] No .zap files found in directory: {directory}")
142+
return
143+
144+
print(f"[DEBUG] Found {len(zap_files)} .zap files.")
145+
for zap_file in zap_files:
146+
print(f"[DEBUG] Processing file: {zap_file}")
147+
process_zap_file(zap_file, in_place)
148+
149+
150+
if __name__ == "__main__":
151+
# Command-line argument parsing
152+
parser = argparse.ArgumentParser(description="Parse a .zap file or a directory of .zap files and find/modify attributes with storageOption 'RAM'. ")
153+
154+
group = parser.add_mutually_exclusive_group(required=True)
155+
group.add_argument("-f", "--file", help="Process a single zap file (local or remote URL).")
156+
group.add_argument("-d", "--directory", help="Process all *.zap files in a specific directory.")
157+
158+
parser.add_argument("-s", "--spec", help="The folder path where spec xml files to be loaded", required=True)
159+
parser.add_argument("-i", "--in-place", action="store_true", help="Modify the files in place instead of creating a new file.", default=False)
160+
161+
try:
162+
args = parser.parse_args()
163+
except:
164+
parser.print_help()
165+
print(f"\n For example: \n\tpython sample_app_util/ParseZap.py -s ../../data_model/clusters/ -d devices/ -i \n")
166+
sys.exit(1)
167+
168+
parse_xml_files_in_folder(args.spec)
169+
print(f"id2XmlMap: {id2XmlMap}")
170+
171+
# Process the provided zap file or directory
172+
if args.file:
173+
print(f"[DEBUG] Starting process for file: {args.file}")
174+
process_zap_file(args.file, args.in_place)
175+
elif args.directory:
176+
print(f"[DEBUG] Starting process for directory: {args.directory}")
177+
process_directory(args.directory, args.in_place)
178+
179+
print(f"[DEBUG] Process complete.")
180+

0 commit comments

Comments
 (0)