Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the interview logic a small bit #538

Merged
merged 7 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions matter_server/server/const.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""Server-only constants for the Python Matter Server."""

import pathlib
from typing import Final

# The minimum schema version (of a client) the server can support
MIN_SCHEMA_VERSION = 5

# schema version of our data model
# only bump if the format of the data in MatterNodeData changed
# and a full re-interview is mandatory
DATA_MODEL_SCHEMA_VERSION = 6 # NOTE: next time bump to 8!

# the paa-root-certs path is hardcoded in the sdk at this time
# and always uses the development subfolder
# regardless of anything you pass into instantiating the controller
Expand Down
68 changes: 14 additions & 54 deletions matter_server/server/device_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
MatterNodeEvent,
NodePingResult,
)
from .const import PAA_ROOT_CERTS_DIR
from .const import DATA_MODEL_SCHEMA_VERSION, PAA_ROOT_CERTS_DIR
from .helpers.paa_certificates import fetch_certificates

if TYPE_CHECKING:
Expand Down Expand Up @@ -501,7 +501,7 @@ async def interview_node(self, node_id: int) -> None:
existing_info.date_commissioned if existing_info else datetime.utcnow()
),
last_interview=datetime.utcnow(),
interview_version=SCHEMA_VERSION,
interview_version=DATA_MODEL_SCHEMA_VERSION,
available=True,
attributes=parse_attributes_from_read_result(read_response.tlvAttributes),
)
Expand Down Expand Up @@ -1050,48 +1050,28 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
),
)

async def _check_interview_and_subscription(
self, node_id: int, reschedule_interval: int = 30
) -> None:
"""Handle interview (if needed) and subscription for known node."""

async def _setup_node(self, node_id: int) -> None:
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
if node_id not in self._nodes:
raise NodeNotExists(f"Node {node_id} does not exist.")

# (re)interview node (only) if needed
node_data = self._nodes.get(node_id)
node_data = self._nodes[node_id]
if (
node_data is None
# re-interview if the schema has changed
or node_data.interview_version < SCHEMA_VERSION
# re-interview if we dont have any node attributes (empty node)
not node_data.attributes
# re-interview if the data model schema has changed
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
):
try:
await self.interview_node(node_id)
except NodeNotResolving:
LOGGER.warning(
"Unable to interview Node %s as it is unavailable",
node_id,
)
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.
except NodeInterviewFailed:
LOGGER.warning(
"Unable to interview Node %s, will retry later in the background.",
node_id,
)
# reschedule interview on error
# increase interval at each attempt with maximum of
# MAX_POLL_INTERVAL seconds (= 10 minutes)
self._schedule_interview(
node_id,
min(reschedule_interval + 10, MAX_POLL_INTERVAL),
)
except (NodeNotResolving, NodeInterviewFailed) as err:
LOGGER.warning("Unable to interview Node %s", exc_info=err)
# NOTE: the node will be picked up by mdns discovery automatically
# when it comes available again.
return

# setup subscriptions for the node
if node_id in self._subscriptions:
return

try:
await self._subscribe_node(node_id)
except NodeNotResolving:
Expand All @@ -1102,26 +1082,6 @@ async def _check_interview_and_subscription(
# NOTE: the node will be picked up by mdns discovery automatically
# when it becomes available again.

def _schedule_interview(self, node_id: int, delay: int) -> None:
"""(Re)Schedule interview and/or initial subscription for a node."""
assert self.server.loop is not None
# cancel any existing (re)schedule timer
if existing := self._sub_retry_timer.pop(node_id, None):
existing.cancel()

def create_interview_task() -> None:
asyncio.create_task(
self._check_interview_and_subscription(
node_id,
)
)
# the handle to the timer can now be removed
self._sub_retry_timer.pop(node_id, None)

self._sub_retry_timer[node_id] = self.server.loop.call_later(
delay, create_interview_task
)

async def _resolve_node(
self, node_id: int, retries: int = 2, attempt: int = 1
) -> DeviceProxyWrapper:
Expand Down Expand Up @@ -1230,7 +1190,7 @@ async def _process_mdns_queue(
continue # node is already set-up, no action needed
LOGGER.info("Node %s discovered on MDNS", node_id)
# setup the node
await self._check_interview_and_subscription(node_id)
await self._setup_node(node_id)
elif state_change == ServiceStateChange.Removed:
if not node.available:
continue # node is already offline, nothing to do
Expand Down