forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzap_cluster_list.py
executable file
·100 lines (72 loc) · 3.23 KB
/
zap_cluster_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
"""Parses a ZAP input file and outputs directories to compile."""
import argparse
import json
import os
import pathlib
import sys
import typing
def get_cluster_sources(clusters: typing.Set[str],
source_map: typing.Dict[str,
typing.List[str]], side: str):
"""Returns a list of cluster source directories for the given clusters.
Returns:
The set of source directories to build.
"""
cluster_sources: typing.Set[str] = set()
for cluster in clusters:
if cluster in source_map:
cluster_sources.update(source_map[cluster])
return cluster_sources
def dump_zapfile_clusters(zap_file_path: pathlib.Path,
implementation_data_path: pathlib.Path):
"""Prints all of the source directories to build for a given ZAP file.
Arguments:
zap_file_path - Path to the ZAP input file.
"""
# List of directories in src/app/clusters to build for server clusters.
SERVER_CLUSTERS: typing.Dict[str, typing.List[str]] = {}
# List of directories in src/app/clusters to build for client clusters.
CLIENT_CLUSTERS: typing.Dict[str, typing.List[str]] = {}
with open(implementation_data_path, "r") as implementation_data_file:
implementation_data = json.load(implementation_data_file)
SERVER_CLUSTERS = implementation_data["ServerDirectories"]
CLIENT_CLUSTERS = implementation_data["ClientDirectories"]
client_clusters: typing.Set[str] = set()
server_clusters: typing.Set[str] = set()
with open(zap_file_path, "r") as zap_file:
zap_json = json.loads(zap_file.read())
for endpoint_type in zap_json.get('endpointTypes'):
for cluster in endpoint_type.get('clusters'):
side: str = cluster.get('side')
if side == 'client':
clusters_set = client_clusters
elif side == 'server':
clusters_set = server_clusters
else:
raise ValueError("Invalid side for cluster: %s" % side)
if cluster.get('enabled') == 1:
clusters_set.add(cluster.get('define'))
cluster_sources: typing.Set[str] = set()
cluster_sources.update(
get_cluster_sources(server_clusters, SERVER_CLUSTERS, 'server'))
cluster_sources.update(
get_cluster_sources(client_clusters, CLIENT_CLUSTERS, 'client'))
for cluster in sorted(cluster_sources):
print(cluster)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--zap_file',
help='Path to .zap file',
required=True,
type=pathlib.Path)
parser.add_argument('--cluster-implementation-data',
help='Path to .json file which lists the directories cluster implementations live in',
required=False,
type=pathlib.Path,
default=os.path.join(os.path.dirname(__file__), "zap_cluster_list.json"))
args = parser.parse_args()
dump_zapfile_clusters(args.zap_file, args.cluster_implementation_data)
sys.exit(0)
if __name__ == '__main__':
main()