Skip to content

Commit d651326

Browse files
committedFeb 29, 2024
Squashed commit of the following:
commit f2ff761 Author: Marcel van der Veldt <m.vanderveldt@outlook.com> Date: Thu Feb 29 20:28:55 2024 +0100 adjust logger commit e59c880 Author: Marcel van der Veldt <m.vanderveldt@outlook.com> Date: Thu Feb 29 20:03:15 2024 +0100 Add timeouts to sdk calls
1 parent bdaeb2c commit d651326

File tree

1 file changed

+53
-30
lines changed

1 file changed

+53
-30
lines changed
 

‎matter_server/server/device_controller.py

+53-30
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import time
1515
from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast
1616

17+
import async_timeout
1718
from chip.ChipDeviceCtrl import DeviceProxyWrapper
1819
from chip.clusters import Attribute, Objects as Clusters
1920
from chip.clusters.Attribute import ValueDecodeFailure
@@ -61,6 +62,8 @@
6162
DATA_KEY_NODES = "nodes"
6263
DATA_KEY_LAST_NODE_ID = "last_node_id"
6364

65+
DEFAULT_CALL_TIMEOUT = 300
66+
6467
LOGGER = logging.getLogger(__name__)
6568
MIN_NODE_SUBSCRIPTION_CEILING = 30
6669
MAX_NODE_SUBSCRIPTION_CEILING = 300
@@ -996,25 +999,28 @@ def resubscription_succeeded(
996999
self._last_subscription_attempt[node_id] = 0
9971000
future = loop.create_future()
9981001
device = await self._resolve_node(node_id)
999-
Attribute.Read(
1000-
future=future,
1001-
eventLoop=loop,
1002-
device=device.deviceProxy,
1003-
devCtrl=self.chip_controller,
1004-
attributes=[Attribute.AttributePath()], # wildcard
1005-
events=[
1006-
Attribute.EventPath(EndpointId=None, Cluster=None, Event=None, Urgent=1)
1007-
],
1008-
returnClusterObject=False,
1009-
subscriptionParameters=Attribute.SubscriptionParameters(
1010-
interval_floor, interval_ceiling
1011-
),
1012-
# Use fabricfiltered as False to detect changes made by other controllers
1013-
# and to be able to provide a list of all fabrics attached to the device
1014-
fabricFiltered=False,
1015-
autoResubscribe=True,
1016-
).raise_on_error()
1017-
sub: Attribute.SubscriptionTransaction = await future
1002+
async with async_timeout.timeout(DEFAULT_CALL_TIMEOUT):
1003+
Attribute.Read(
1004+
future=future,
1005+
eventLoop=loop,
1006+
device=device.deviceProxy,
1007+
devCtrl=self.chip_controller,
1008+
attributes=[Attribute.AttributePath()], # wildcard
1009+
events=[
1010+
Attribute.EventPath(
1011+
EndpointId=None, Cluster=None, Event=None, Urgent=1
1012+
)
1013+
],
1014+
returnClusterObject=False,
1015+
subscriptionParameters=Attribute.SubscriptionParameters(
1016+
interval_floor, interval_ceiling
1017+
),
1018+
# Use fabricfiltered as False to detect changes made by other controllers
1019+
# and to be able to provide a list of all fabrics attached to the device
1020+
fabricFiltered=False,
1021+
autoResubscribe=True,
1022+
).raise_on_error()
1023+
sub: Attribute.SubscriptionTransaction = await future
10181024

10191025
sub.SetAttributeUpdateCallback(attribute_updated_callback)
10201026
sub.SetEventUpdateCallback(event_callback)
@@ -1041,18 +1047,26 @@ def _get_next_node_id(self) -> int:
10411047
self.server.storage.set(DATA_KEY_LAST_NODE_ID, next_node_id, force=True)
10421048
return next_node_id
10431049

1044-
async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
1050+
async def _call_sdk(
1051+
self,
1052+
func: Callable[..., _T],
1053+
*args: Any,
1054+
call_timeout: int = DEFAULT_CALL_TIMEOUT,
1055+
**kwargs: Any,
1056+
) -> _T:
10451057
"""Call function on the SDK in executor and return result."""
10461058
if self.server.loop is None:
10471059
raise RuntimeError("Server not started.")
10481060

1049-
return cast(
1050-
_T,
1051-
await self.server.loop.run_in_executor(
1052-
self._sdk_executor,
1053-
partial(func, *args, **kwargs),
1054-
),
1055-
)
1061+
# prevent a single job in the executor blocking everything with a timeout.
1062+
async with async_timeout.timeout(call_timeout):
1063+
return cast(
1064+
_T,
1065+
await self.server.loop.run_in_executor(
1066+
self._sdk_executor,
1067+
partial(func, *args, **kwargs),
1068+
),
1069+
)
10561070

10571071
async def _setup_node(self, node_id: int) -> None:
10581072
"""Handle set-up of subscriptions and interview (if needed) for known/discovered node."""
@@ -1085,17 +1099,26 @@ async def _setup_node(self, node_id: int) -> None:
10851099
try:
10861100
await self.interview_node(node_id)
10871101
except (NodeNotResolving, NodeInterviewFailed) as err:
1088-
LOGGER.warning("Unable to interview Node %s", exc_info=err)
1102+
LOGGER.warning(
1103+
"Unable to interview Node %s: %s",
1104+
node_id,
1105+
str(err),
1106+
# log full stack trace if debug logging is enabled
1107+
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
1108+
)
10891109
# NOTE: the node will be picked up by mdns discovery automatically
10901110
# when it comes available again.
10911111
return
10921112
# setup subscriptions for the node
10931113
try:
10941114
await self._subscribe_node(node_id)
1095-
except NodeNotResolving:
1115+
except (NodeNotResolving, TimeoutError) as err:
10961116
LOGGER.warning(
1097-
"Unable to subscribe to Node %s as it is unavailable",
1117+
"Unable to subscribe to Node %s: %s",
10981118
node_id,
1119+
str(err),
1120+
# log full stack trace if debug logging is enabled
1121+
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
10991122
)
11001123
# NOTE: the node will be picked up by mdns discovery automatically
11011124
# when it becomes available again.

0 commit comments

Comments
 (0)