|
15 | 15 | # limitations under the License.
|
16 | 16 | #
|
17 | 17 |
|
18 |
| -import glob |
| 18 | +import importlib |
| 19 | +import importlib.resources as pkg_resources |
19 | 20 | import logging
|
20 |
| -import os |
21 | 21 | import typing
|
22 | 22 | import xml.etree.ElementTree as ElementTree
|
23 | 23 | from copy import deepcopy
|
24 | 24 | from dataclasses import dataclass
|
25 | 25 | from enum import Enum, auto
|
26 |
| -from typing import Callable, Optional |
| 26 | +from importlib.abc import Traversable |
| 27 | +from typing import Callable, Optional, Union |
27 | 28 |
|
28 | 29 | import chip.clusters as Clusters
|
29 | 30 | import chip.testing.conformance as conformance_support
|
@@ -512,56 +513,83 @@ class PrebuiltDataModelDirectory(Enum):
|
512 | 513 | k1_4 = auto()
|
513 | 514 | kMaster = auto()
|
514 | 515 |
|
515 |
| - |
516 |
| -class DataModelLevel(str, Enum): |
517 |
| - kCluster = 'clusters' |
518 |
| - kDeviceType = 'device_types' |
519 |
| - |
520 |
| - |
521 |
| -def _get_data_model_root() -> str: |
522 |
| - """Attempts to find ${CHIP_ROOT}/data_model or equivalent.""" |
523 |
| - |
524 |
| - # Since this class is generally in a module, we have to rely on being bootstrapped or |
525 |
| - # we use CWD if we cannot |
526 |
| - choices = [os.getcwd()] |
527 |
| - |
528 |
| - if 'PW_PROJECT_ROOT' in os.environ: |
529 |
| - choices.insert(0, os.environ['PW_PROJECT_ROOT']) |
530 |
| - |
531 |
| - for c in choices: |
532 |
| - data_model_path = os.path.join(c, 'data_model') |
533 |
| - if os.path.exists(os.path.join(data_model_path, 'master', 'scraper_version')): |
534 |
| - return data_model_path |
535 |
| - raise FileNotFoundError('Cannot find a CHIP_ROOT/data_model path. Tried %r as prefixes.' % choices) |
536 |
| - |
537 |
| - |
538 |
| -def get_data_model_directory(data_model_directory: typing.Union[PrebuiltDataModelDirectory, str], data_model_level: DataModelLevel) -> str: |
539 |
| - if data_model_directory == PrebuiltDataModelDirectory.k1_3: |
540 |
| - return os.path.join(_get_data_model_root(), '1.3', data_model_level) |
541 |
| - elif data_model_directory == PrebuiltDataModelDirectory.k1_4: |
542 |
| - return os.path.join(_get_data_model_root(), '1.4', data_model_level) |
543 |
| - elif data_model_directory == PrebuiltDataModelDirectory.kMaster: |
544 |
| - return os.path.join(_get_data_model_root(), 'master', data_model_level) |
| 516 | + @property |
| 517 | + def dirname(self): |
| 518 | + if self == PrebuiltDataModelDirectory.k1_3: |
| 519 | + return "1.3" |
| 520 | + if self == PrebuiltDataModelDirectory.k1_4: |
| 521 | + return "1.4" |
| 522 | + if self == PrebuiltDataModelDirectory.kMaster: |
| 523 | + return "master" |
| 524 | + raise KeyError("Invalid enum: %r" % self) |
| 525 | + |
| 526 | + |
| 527 | +class DataModelLevel(Enum): |
| 528 | + kCluster = auto() |
| 529 | + kDeviceType = auto() |
| 530 | + |
| 531 | + @property |
| 532 | + def dirname(self): |
| 533 | + if self == DataModelLevel.kCluster: |
| 534 | + return "clusters" |
| 535 | + if self == DataModelLevel.kDeviceType: |
| 536 | + return "device_types" |
| 537 | + raise KeyError("Invalid enum: %r" % self) |
| 538 | + |
| 539 | + |
| 540 | +def get_data_model_directory(data_model_directory: Union[PrebuiltDataModelDirectory, Traversable], data_model_level: DataModelLevel = DataModelLevel.kCluster) -> Traversable: |
| 541 | + """ |
| 542 | + Get the directory of the data model for a specific version and level from the installed package. |
| 543 | +
|
| 544 | +
|
| 545 | + `data_model_directory` given as a path MUST be of type Traversable (often `pathlib.Path(somepathstring)`). |
| 546 | + If `data_model_directory` is given as a Traversable, it is returned directly WITHOUT using the data_model_level at all. |
| 547 | + """ |
| 548 | + # If it's a prebuilt directory, build the path based on the version and data model level |
| 549 | + if isinstance(data_model_directory, PrebuiltDataModelDirectory): |
| 550 | + return pkg_resources.files(importlib.import_module('chip.testing')).joinpath( |
| 551 | + 'data_model').joinpath(data_model_directory.dirname).joinpath(data_model_level.dirname) |
545 | 552 | else:
|
546 | 553 | return data_model_directory
|
547 | 554 |
|
548 | 555 |
|
549 |
| -def build_xml_clusters(data_model_directory: typing.Union[PrebuiltDataModelDirectory, str] = PrebuiltDataModelDirectory.k1_4) -> tuple[dict[uint, XmlCluster], list[ProblemNotice]]: |
550 |
| - dir = get_data_model_directory(data_model_directory, DataModelLevel.kCluster) |
| 556 | +def build_xml_clusters(data_model_directory: Union[PrebuiltDataModelDirectory, Traversable] = PrebuiltDataModelDirectory.k1_4) -> typing.Tuple[dict[int, dict], list]: |
| 557 | + """ |
| 558 | + Build XML clusters from the specified data model directory. |
| 559 | + This function supports both pre-built locations and full paths. |
| 560 | +
|
| 561 | + `data_model_directory`` given as a path MUST be of type Traversable (often `pathlib.Path(somepathstring)`). |
| 562 | + If data_model_directory is a Travesable, it is assumed to already contain `clusters` (i.e. be a directory |
| 563 | + with all XML files in it) |
| 564 | + """ |
551 | 565 |
|
552 | 566 | clusters: dict[int, XmlCluster] = {}
|
553 | 567 | pure_base_clusters: dict[str, XmlCluster] = {}
|
554 | 568 | ids_by_name: dict[str, int] = {}
|
555 | 569 | problems: list[ProblemNotice] = []
|
556 |
| - files = glob.glob(f'{dir}/*.xml') |
557 |
| - if not files: |
558 |
| - raise SpecParsingException(f'No data model files found in specified directory {dir}') |
559 | 570 |
|
560 |
| - for xml in files: |
561 |
| - logging.info(f'Parsing file {xml}') |
562 |
| - tree = ElementTree.parse(f'{xml}') |
563 |
| - root = tree.getroot() |
564 |
| - add_cluster_data_from_xml(root, clusters, pure_base_clusters, ids_by_name, problems) |
| 571 | + top = get_data_model_directory(data_model_directory, DataModelLevel.kCluster) |
| 572 | + logging.info("Reading XML clusters from %r", top) |
| 573 | + |
| 574 | + found_xmls = 0 |
| 575 | + for f in top.iterdir(): |
| 576 | + if not f.name.endswith('.xml'): |
| 577 | + logging.info("Ignoring non-XML file %s", f.name) |
| 578 | + continue |
| 579 | + |
| 580 | + logging.info('Parsing file %s', f.name) |
| 581 | + found_xmls += 1 |
| 582 | + with f.open("r", encoding="utf8") as file: |
| 583 | + root = ElementTree.parse(file).getroot() |
| 584 | + add_cluster_data_from_xml(root, clusters, pure_base_clusters, ids_by_name, problems) |
| 585 | + |
| 586 | + # For now we assume even a single XML means the directory was probaly OK |
| 587 | + # we may increase this later as most our data model directories are larger |
| 588 | + # |
| 589 | + # Intent here is to make user aware of typos in paths instead of silently having |
| 590 | + # empty parsing |
| 591 | + if found_xmls < 1: |
| 592 | + raise SpecParsingException(f'No data model files found in specified directory {top:!r}') |
565 | 593 |
|
566 | 594 | # There are a few clusters where the conformance columns are listed as desc. These clusters need specific, targeted tests
|
567 | 595 | # to properly assess conformance. Here, we list them as Optional to allow these for the general test. Targeted tests are described below.
|
@@ -721,7 +749,7 @@ def combine_attributes(base: dict[uint, XmlAttribute], derived: dict[uint, XmlAt
|
721 | 749 | xml_clusters[id] = new
|
722 | 750 |
|
723 | 751 |
|
724 |
| -def parse_single_device_type(root: ElementTree.Element) -> tuple[list[ProblemNotice], dict[int, XmlDeviceType]]: |
| 752 | +def parse_single_device_type(root: ElementTree.Element) -> tuple[dict[int, XmlDeviceType], list[ProblemNotice]]: |
725 | 753 | problems: list[ProblemNotice] = []
|
726 | 754 | device_types: dict[int, XmlDeviceType] = {}
|
727 | 755 | device = root.iter('deviceType')
|
@@ -793,17 +821,26 @@ def parse_single_device_type(root: ElementTree.Element) -> tuple[list[ProblemNot
|
793 | 821 | return device_types, problems
|
794 | 822 |
|
795 | 823 |
|
796 |
| -def build_xml_device_types(data_model_directory: typing.Union[PrebuiltDataModelDirectory, str] = PrebuiltDataModelDirectory.k1_4) -> tuple[dict[int, XmlDeviceType], list[ProblemNotice]]: |
797 |
| - dir = get_data_model_directory(data_model_directory, DataModelLevel.kDeviceType) |
| 824 | +def build_xml_device_types(data_model_directory: typing.Union[PrebuiltDataModelDirectory, Traversable] = PrebuiltDataModelDirectory.k1_4) -> tuple[dict[int, XmlDeviceType], list[ProblemNotice]]: |
| 825 | + top = get_data_model_directory(data_model_directory, DataModelLevel.kDeviceType) |
798 | 826 | device_types: dict[int, XmlDeviceType] = {}
|
799 | 827 | problems = []
|
800 |
| - for xml in glob.glob(f"{dir}/*.xml"): |
801 |
| - logging.info(f'Parsing file {xml}') |
802 |
| - tree = ElementTree.parse(f'{xml}') |
803 |
| - root = tree.getroot() |
804 |
| - tmp_device_types, tmp_problems = parse_single_device_type(root) |
805 |
| - problems = problems + tmp_problems |
806 |
| - device_types.update(tmp_device_types) |
| 828 | + |
| 829 | + found_xmls = 0 |
| 830 | + |
| 831 | + for file in top.iterdir(): |
| 832 | + if not file.name.endswith('.xml'): |
| 833 | + continue |
| 834 | + logging.info('Parsing file %r / %s', top, file.name) |
| 835 | + found_xmls += 1 |
| 836 | + with file.open('r', encoding="utf8") as xml: |
| 837 | + root = ElementTree.parse(xml).getroot() |
| 838 | + tmp_device_types, tmp_problems = parse_single_device_type(root) |
| 839 | + problems = problems + tmp_problems |
| 840 | + device_types.update(tmp_device_types) |
| 841 | + |
| 842 | + if found_xmls < 1: |
| 843 | + logging.warning("No XML files found in the specified device type directory: %r", top) |
807 | 844 |
|
808 | 845 | if -1 not in device_types.keys():
|
809 | 846 | raise ConformanceException("Base device type not found in device type xml data")
|
|
0 commit comments