Skip to content

Commit 74b0ca3

Browse files
authored
Add timeouts to sdk calls and subscription set-up (#604)
1 parent f14f729 commit 74b0ca3

File tree

1 file changed

+53
-30
lines changed

1 file changed

+53
-30
lines changed

matter_server/server/device_controller.py

+53-30
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import time
1515
from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast
1616

17+
import async_timeout
1718
from chip.ChipDeviceCtrl import DeviceProxyWrapper
1819
from chip.clusters import Attribute, Objects as Clusters
1920
from chip.clusters.Attribute import ValueDecodeFailure
@@ -61,6 +62,8 @@
6162
DATA_KEY_NODES = "nodes"
6263
DATA_KEY_LAST_NODE_ID = "last_node_id"
6364

65+
DEFAULT_CALL_TIMEOUT = 300
66+
6467
LOGGER = logging.getLogger(__name__)
6568
MIN_NODE_SUBSCRIPTION_CEILING = 30
6669
MAX_NODE_SUBSCRIPTION_CEILING = 300
@@ -990,25 +993,28 @@ def resubscription_succeeded(
990993
self._last_subscription_attempt[node_id] = 0
991994
future = loop.create_future()
992995
device = await self._resolve_node(node_id)
993-
Attribute.Read(
994-
future=future,
995-
eventLoop=loop,
996-
device=device.deviceProxy,
997-
devCtrl=self.chip_controller,
998-
attributes=[Attribute.AttributePath()], # wildcard
999-
events=[
1000-
Attribute.EventPath(EndpointId=None, Cluster=None, Event=None, Urgent=1)
1001-
],
1002-
returnClusterObject=False,
1003-
subscriptionParameters=Attribute.SubscriptionParameters(
1004-
interval_floor, interval_ceiling
1005-
),
1006-
# Use fabricfiltered as False to detect changes made by other controllers
1007-
# and to be able to provide a list of all fabrics attached to the device
1008-
fabricFiltered=False,
1009-
autoResubscribe=True,
1010-
).raise_on_error()
1011-
sub: Attribute.SubscriptionTransaction = await future
996+
async with async_timeout.timeout(DEFAULT_CALL_TIMEOUT):
997+
Attribute.Read(
998+
future=future,
999+
eventLoop=loop,
1000+
device=device.deviceProxy,
1001+
devCtrl=self.chip_controller,
1002+
attributes=[Attribute.AttributePath()], # wildcard
1003+
events=[
1004+
Attribute.EventPath(
1005+
EndpointId=None, Cluster=None, Event=None, Urgent=1
1006+
)
1007+
],
1008+
returnClusterObject=False,
1009+
subscriptionParameters=Attribute.SubscriptionParameters(
1010+
interval_floor, interval_ceiling
1011+
),
1012+
# Use fabricfiltered as False to detect changes made by other controllers
1013+
# and to be able to provide a list of all fabrics attached to the device
1014+
fabricFiltered=False,
1015+
autoResubscribe=True,
1016+
).raise_on_error()
1017+
sub: Attribute.SubscriptionTransaction = await future
10121018

10131019
sub.SetAttributeUpdateCallback(attribute_updated_callback)
10141020
sub.SetEventUpdateCallback(event_callback)
@@ -1035,18 +1041,26 @@ def _get_next_node_id(self) -> int:
10351041
self.server.storage.set(DATA_KEY_LAST_NODE_ID, next_node_id, force=True)
10361042
return next_node_id
10371043

1038-
async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
1044+
async def _call_sdk(
1045+
self,
1046+
func: Callable[..., _T],
1047+
*args: Any,
1048+
call_timeout: int = DEFAULT_CALL_TIMEOUT,
1049+
**kwargs: Any,
1050+
) -> _T:
10391051
"""Call function on the SDK in executor and return result."""
10401052
if self.server.loop is None:
10411053
raise RuntimeError("Server not started.")
10421054

1043-
return cast(
1044-
_T,
1045-
await self.server.loop.run_in_executor(
1046-
self._sdk_executor,
1047-
partial(func, *args, **kwargs),
1048-
),
1049-
)
1055+
# prevent a single job in the executor blocking everything with a timeout.
1056+
async with async_timeout.timeout(call_timeout):
1057+
return cast(
1058+
_T,
1059+
await self.server.loop.run_in_executor(
1060+
self._sdk_executor,
1061+
partial(func, *args, **kwargs),
1062+
),
1063+
)
10501064

10511065
async def _setup_node(self, node_id: int) -> None:
10521066
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
@@ -1079,17 +1093,26 @@ async def _setup_node(self, node_id: int) -> None:
10791093
try:
10801094
await self.interview_node(node_id)
10811095
except (NodeNotResolving, NodeInterviewFailed) as err:
1082-
LOGGER.warning("Unable to interview Node %s", exc_info=err)
1096+
LOGGER.warning(
1097+
"Unable to interview Node %s: %s",
1098+
node_id,
1099+
str(err),
1100+
# log full stack trace if debug logging is enabled
1101+
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
1102+
)
10831103
# NOTE: the node will be picked up by mdns discovery automatically
10841104
# when it comes available again.
10851105
return
10861106
# setup subscriptions for the node
10871107
try:
10881108
await self._subscribe_node(node_id)
1089-
except NodeNotResolving:
1109+
except (NodeNotResolving, TimeoutError) as err:
10901110
LOGGER.warning(
1091-
"Unable to subscribe to Node %s as it is unavailable",
1111+
"Unable to subscribe to Node %s: %s",
10921112
node_id,
1113+
str(err),
1114+
# log full stack trace if debug logging is enabled
1115+
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
10931116
)
10941117
# NOTE: the node will be picked up by mdns discovery automatically
10951118
# when it becomes available again.

0 commit comments

Comments
 (0)