Skip to content

Commit dcd0a70

Browse files
authored
Process nodes asynchronously (#266)
1 parent c3ce71b commit dcd0a70

File tree

1 file changed

+61
-21
lines changed

1 file changed

+61
-21
lines changed

matter_server/server/device_controller.py

+61-21
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from datetime import datetime
88
from functools import partial
99
import logging
10-
from typing import TYPE_CHECKING, Any, Callable, Deque, Type, TypeVar, cast
10+
import time
11+
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Deque, Type, TypeVar, cast
1112

1213
from chip.ChipDeviceCtrl import CommissionableNode
1314
from chip.clusters import Attribute, Objects as Clusters
@@ -42,6 +43,7 @@
4243
DATA_KEY_LAST_NODE_ID = "last_node_id"
4344

4445
LOGGER = logging.getLogger(__name__)
46+
INTERVIEW_TASK_LIMIT = 10
4547

4648

4749
class MatterDeviceController:
@@ -527,38 +529,75 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
527529

528530
async def _check_subscriptions_and_interviews(self) -> None:
529531
"""Run subscriptions (and interviews) for known nodes."""
532+
# Set default resubscribe interval to 1 hour
533+
reschedule_interval = 3600
534+
start_time = time.time()
535+
tasks: list[Coroutine[Any, Any, None]] = []
536+
task_limit: asyncio.Semaphore = asyncio.Semaphore(INTERVIEW_TASK_LIMIT)
537+
530538
for node_id, node in self._nodes.items():
531539
# (re)interview node (only) if needed
532540
if (
533541
node is None
534542
or node.interview_version < SCHEMA_VERSION
535543
or (datetime.utcnow() - node.last_interview).days > 30
536544
):
545+
546+
async def _interview_node(node_id: int) -> None:
547+
"""Run interview for node."""
548+
try:
549+
await self.interview_node(node_id)
550+
except NodeInterviewFailed as err:
551+
LOGGER.warning(
552+
"Unable to interview Node %s, we will retry later in the background.",
553+
node_id,
554+
exc_info=err,
555+
)
556+
raise err
557+
558+
tasks.append(_interview_node(node_id))
559+
continue
560+
561+
# setup subscriptions for the node
562+
if node_id in self._subscriptions:
563+
continue
564+
565+
async def _subscribe_node(node_id: int) -> None:
566+
"""Subscribe to node events."""
537567
try:
538-
await self.interview_node(node_id)
539-
except NodeInterviewFailed as err:
568+
await self.subscribe_node(node_id)
569+
except NodeNotResolving as err:
540570
LOGGER.warning(
541-
"Unable to interview Node %s, we will retry later in the background.",
571+
"Unable to subscribe to Node %s, "
572+
"we will retry later in the background.",
542573
node_id,
543574
exc_info=err,
544575
)
545-
continue
576+
raise err
546577

547-
# setup subscriptions for the node
548-
if node_id in self._subscriptions:
549-
continue
550-
try:
551-
await self.subscribe_node(node_id)
552-
except NodeNotResolving as err:
553-
# If the node is unreachable on the network now,
554-
# it will throw a NodeNotResolving exception, catch this,
555-
# log this and just try to resolve this node in the next run.
556-
LOGGER.warning(
557-
"Unable to contact Node %s,"
558-
" we will retry later in the background.",
559-
node_id,
560-
exc_info=err,
561-
)
578+
tasks.append(_subscribe_node(node_id))
579+
580+
async def _run_task(task: Coroutine[Any, Any, None]) -> None:
581+
"""Run coroutine and release semaphore."""
582+
async with task_limit:
583+
await task
584+
585+
LOGGER.debug("Running %s tasks", len(tasks))
586+
# wait for all tasks to finish
587+
results: list[Exception | None] = await asyncio.gather(
588+
*(_run_task(task) for task in tasks), return_exceptions=True
589+
)
590+
LOGGER.debug(
591+
"Done running %s tasks in %s seconds",
592+
len(results),
593+
start_time - time.time(),
594+
)
595+
# check if any of the tasks failed
596+
for result in results:
597+
if isinstance(result, Exception):
598+
# if any of the tasks failed, reschedule in 5 minutes
599+
reschedule_interval = 300
600+
break
562601

563602
# reschedule self to run every hour
564603
def _schedule() -> None:
@@ -567,8 +606,9 @@ def _schedule() -> None:
567606
self._check_subscriptions_and_interviews()
568607
)
569608

609+
LOGGER.debug("Rescheduling interviews in %s seconds", reschedule_interval)
570610
loop = cast(asyncio.AbstractEventLoop, self.server.loop)
571-
loop.call_later(3600, _schedule)
611+
loop.call_later(reschedule_interval, _schedule)
572612

573613
@staticmethod
574614
def _parse_attributes_from_read_result(

0 commit comments

Comments
 (0)