|
38 | 38 | import threading
|
39 | 39 | import time
|
40 | 40 | import typing
|
41 |
| -from ctypes import (CDLL, CFUNCTYPE, POINTER, byref, c_bool, c_char, c_char_p, c_int, c_int32, c_size_t, c_uint8, c_uint16, |
| 41 | +from ctypes import (CDLL, CFUNCTYPE, POINTER, byref, c_bool, c_char, c_char_p, c_int, c_int32, c_size_t, c_ubyte, c_uint8, c_uint16, |
42 | 42 | c_uint32, c_uint64, c_void_p, create_string_buffer, pointer, py_object, resize, string_at)
|
43 | 43 | from dataclasses import dataclass
|
44 | 44 |
|
|
51 | 51 | from .clusters import ClusterObjects as ClusterObjects
|
52 | 52 | from .clusters import Command as ClusterCommand
|
53 | 53 | from .clusters import Objects as GeneratedObjects
|
| 54 | +from .clusters.Attribute import AttributeCache, AttributePath |
54 | 55 | from .clusters.CHIPClusters import ChipClusters
|
55 | 56 | from .crypto import p256keypair
|
56 | 57 | from .exceptions import UnknownAttribute, UnknownCommand
|
57 | 58 | from .interaction_model import InteractionModelError, SessionParameters, SessionParametersStruct
|
58 | 59 | from .interaction_model import delegate as im
|
59 | 60 | from .native import PyChipError
|
| 61 | +from .tlv import TLVReader |
60 | 62 |
|
61 | 63 | __all__ = ["ChipDeviceController", "CommissioningParameters"]
|
62 | 64 |
|
@@ -2025,3 +2027,58 @@ def __init__(self, operationalKey: p256keypair.P256Keypair, noc: bytes,
|
2025 | 2027 | self._set_dev_ctrl(devCtrl)
|
2026 | 2028 |
|
2027 | 2029 | self._finish_init()
|
| 2030 | + |
| 2031 | + |
| 2032 | +class TLVJsonConverter(): |
| 2033 | + ''' Converter class used to convert dumped Matter JsonTlv files into a cache. ''' |
| 2034 | + |
| 2035 | + def __init__(self, name: str = ''): |
| 2036 | + self._ChipStack = builtins.chipStack |
| 2037 | + self._dmLib = None |
| 2038 | + |
| 2039 | + self._InitLib() |
| 2040 | + |
| 2041 | + def _InitLib(self): |
| 2042 | + if self._dmLib is None: |
| 2043 | + self._dmLib = CDLL(self._ChipStack.LocateChipDLL()) |
| 2044 | + |
| 2045 | + self._dmLib.pychip_JsonToTlv.argtypes = [c_char_p, POINTER(c_ubyte), c_size_t] |
| 2046 | + self._dmLib.pychip_DeviceController_DeleteDeviceController.restype = c_size_t |
| 2047 | + |
| 2048 | + # PER ATTRIBUTE |
| 2049 | + def _attribute_to_tlv(self, json_string: str) -> bytearray: |
| 2050 | + ''' Converts the MatterJsonTlv for one attribute into TLV that can be parsed and put into the cache.''' |
| 2051 | + # We don't currently have a way to size this properly, but we know attributes need to fit into 1 MTU. |
| 2052 | + size = 1280 |
| 2053 | + buf = bytearray(size) |
| 2054 | + encoded_bytes = self._dmLib.pychip_JsonToTlv(json_string.encode("utf-8"), (ctypes.c_ubyte * size).from_buffer(buf), size) |
| 2055 | + return buf[:encoded_bytes] |
| 2056 | + |
| 2057 | + def convert_dump_to_cache(self, json_tlv: str) -> AttributeCache: |
| 2058 | + ''' Converts a string containing the MatterJsonTlv dump of an entire device into an AttributeCache object. |
| 2059 | + Input: |
| 2060 | + json_tlv: json string read from the dump file. |
| 2061 | + Returns: |
| 2062 | + AttributeCache with the data from the json_string |
| 2063 | + ''' |
| 2064 | + cache = AttributeCache() |
| 2065 | + for endpoint_id_str, endpoint in json_tlv.items(): |
| 2066 | + endpoint_id = int(endpoint_id_str, 0) |
| 2067 | + for cluster_id_str, cluster in endpoint.items(): |
| 2068 | + s = cluster_id_str.split(':') |
| 2069 | + cluster_id = int(s[0]) |
| 2070 | + for attribute_id_str, attribute in cluster.items(): |
| 2071 | + s = attribute_id_str.split(':') |
| 2072 | + attribute_id = int(s[0]) |
| 2073 | + json_str = json.dumps({attribute_id_str: attribute}, indent=2) |
| 2074 | + tmp = self._attribute_to_tlv(json_str) |
| 2075 | + path = AttributePath(EndpointId=endpoint_id, ClusterId=cluster_id, AttributeId=attribute_id) |
| 2076 | + # Each of these attributes contains only one item |
| 2077 | + try: |
| 2078 | + tlvData = next(iter(TLVReader(tmp).get().get("Any", {}).values())) |
| 2079 | + except StopIteration: |
| 2080 | + # no data, this is a value decode error |
| 2081 | + tlvData = ValueDecodeFailure() |
| 2082 | + cache.UpdateTLV(path=path, dataVersion=0, data=tlvData) |
| 2083 | + cache.UpdateCachedData(set([path])) |
| 2084 | + return cache |
0 commit comments