Skip to content

Commit 459b203

Browse files
authored
Refactor the interview logic a small bit (#538)
1 parent 415fe93 commit 459b203

File tree

2 files changed

+20
-54
lines changed

2 files changed

+20
-54
lines changed

matter_server/server/const.py

+6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
"""Server-only constants for the Python Matter Server."""
2+
23
import pathlib
34
from typing import Final
45

56
# The minimum schema version (of a client) the server can support
67
MIN_SCHEMA_VERSION = 5
78

9+
# schema version of our data model
10+
# only bump if the format of the data in MatterNodeData changed
11+
# and a full re-interview is mandatory
12+
DATA_MODEL_SCHEMA_VERSION = 6
13+
814
# the paa-root-certs path is hardcoded in the sdk at this time
915
# and always uses the development subfolder
1016
# regardless of anything you pass into instantiating the controller

matter_server/server/device_controller.py

+14-54
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
MatterNodeEvent,
4848
NodePingResult,
4949
)
50-
from .const import PAA_ROOT_CERTS_DIR
50+
from .const import DATA_MODEL_SCHEMA_VERSION, PAA_ROOT_CERTS_DIR
5151
from .helpers.paa_certificates import fetch_certificates
5252

5353
if TYPE_CHECKING:
@@ -501,7 +501,7 @@ async def interview_node(self, node_id: int) -> None:
501501
existing_info.date_commissioned if existing_info else datetime.utcnow()
502502
),
503503
last_interview=datetime.utcnow(),
504-
interview_version=SCHEMA_VERSION,
504+
interview_version=DATA_MODEL_SCHEMA_VERSION,
505505
available=True,
506506
attributes=parse_attributes_from_read_result(read_response.tlvAttributes),
507507
)
@@ -1050,48 +1050,28 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
10501050
),
10511051
)
10521052

1053-
async def _check_interview_and_subscription(
1054-
self, node_id: int, reschedule_interval: int = 30
1055-
) -> None:
1056-
"""Handle interview (if needed) and subscription for known node."""
1057-
1053+
async def _setup_node(self, node_id: int) -> None:
1054+
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
10581055
if node_id not in self._nodes:
10591056
raise NodeNotExists(f"Node {node_id} does not exist.")
10601057

10611058
# (re)interview node (only) if needed
1062-
node_data = self._nodes.get(node_id)
1059+
node_data = self._nodes[node_id]
10631060
if (
1064-
node_data is None
1065-
# re-interview if the schema has changed
1066-
or node_data.interview_version < SCHEMA_VERSION
1061+
# re-interview if we dont have any node attributes (empty node)
1062+
not node_data.attributes
1063+
# re-interview if the data model schema has changed
1064+
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
10671065
):
10681066
try:
10691067
await self.interview_node(node_id)
1070-
except NodeNotResolving:
1071-
LOGGER.warning(
1072-
"Unable to interview Node %s as it is unavailable",
1073-
node_id,
1074-
)
1075-
# NOTE: the node will be picked up by mdns discovery automatically
1076-
# when it becomes available again.
1077-
except NodeInterviewFailed:
1078-
LOGGER.warning(
1079-
"Unable to interview Node %s, will retry later in the background.",
1080-
node_id,
1081-
)
1082-
# reschedule interview on error
1083-
# increase interval at each attempt with maximum of
1084-
# MAX_POLL_INTERVAL seconds (= 10 minutes)
1085-
self._schedule_interview(
1086-
node_id,
1087-
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
1088-
)
1068+
except (NodeNotResolving, NodeInterviewFailed) as err:
1069+
LOGGER.warning("Unable to interview Node %s", exc_info=err)
1070+
# NOTE: the node will be picked up by mdns discovery automatically
1071+
# when it comes available again.
10891072
return
10901073

10911074
# setup subscriptions for the node
1092-
if node_id in self._subscriptions:
1093-
return
1094-
10951075
try:
10961076
await self._subscribe_node(node_id)
10971077
except NodeNotResolving:
@@ -1102,26 +1082,6 @@ async def _check_interview_and_subscription(
11021082
# NOTE: the node will be picked up by mdns discovery automatically
11031083
# when it becomes available again.
11041084

1105-
def _schedule_interview(self, node_id: int, delay: int) -> None:
1106-
"""(Re)Schedule interview and/or initial subscription for a node."""
1107-
assert self.server.loop is not None
1108-
# cancel any existing (re)schedule timer
1109-
if existing := self._sub_retry_timer.pop(node_id, None):
1110-
existing.cancel()
1111-
1112-
def create_interview_task() -> None:
1113-
asyncio.create_task(
1114-
self._check_interview_and_subscription(
1115-
node_id,
1116-
)
1117-
)
1118-
# the handle to the timer can now be removed
1119-
self._sub_retry_timer.pop(node_id, None)
1120-
1121-
self._sub_retry_timer[node_id] = self.server.loop.call_later(
1122-
delay, create_interview_task
1123-
)
1124-
11251085
async def _resolve_node(
11261086
self, node_id: int, retries: int = 2, attempt: int = 1
11271087
) -> DeviceProxyWrapper:
@@ -1230,7 +1190,7 @@ async def _process_mdns_queue(
12301190
continue # node is already set-up, no action needed
12311191
LOGGER.info("Node %s discovered on MDNS", node_id)
12321192
# setup the node
1233-
await self._check_interview_and_subscription(node_id)
1193+
await self._setup_node(node_id)
12341194
elif state_change == ServiceStateChange.Removed:
12351195
if not node.available:
12361196
continue # node is already offline, nothing to do

0 commit comments

Comments
 (0)