Skip to content

Commit a7e1ad0

Browse files
committed
add semaphore for concurrent node setups
1 parent 8214e08 commit a7e1ad0

File tree

1 file changed

+42
-38
lines changed

1 file changed

+42
-38
lines changed

matter_server/server/device_controller.py

+42-38
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ def __init__(
118118
self._sdk_executor = ThreadPoolExecutor(
119119
max_workers=1, thread_name_prefix="SDKExecutor"
120120
)
121+
self._node_setup_throttle = asyncio.Semaphore(10)
121122

122123
async def initialize(self) -> None:
123124
"""Async initialize of controller."""
@@ -1070,53 +1071,56 @@ async def _setup_node(self, node_id: int) -> None:
10701071
return
10711072
self._nodes_in_setup.add(node_id)
10721073
try:
1073-
# Ping the node to rule out stale mdns reports and to prevent that we
1074-
# send an unreachable node to the sdk which is very slow with resolving it.
1075-
# This will also precache the ip addresses of the node for later use.
1076-
ping_result = await self.ping_node(
1077-
node_id, attempts=3, allow_cached_ips=False
1078-
)
1079-
if not any(ping_result.values()):
1080-
LOGGER.warning(
1081-
"Skip set-up for node %s because it does not appear to be reachable...",
1082-
node_id,
1074+
async with self._node_setup_throttle:
1075+
# Ping the node to rule out stale mdns reports and to prevent that we
1076+
# send an unreachable node to the sdk which is very slow with resolving it.
1077+
# This will also precache the ip addresses of the node for later use.
1078+
ping_result = await self.ping_node(
1079+
node_id, attempts=3, allow_cached_ips=False
10831080
)
1084-
return
1085-
LOGGER.info("Setting-up node %s...", node_id)
1086-
# (re)interview node (only) if needed
1087-
node_data = self._nodes[node_id]
1088-
if (
1089-
# re-interview if we dont have any node attributes (empty node)
1090-
not node_data.attributes
1091-
# re-interview if the data model schema has changed
1092-
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
1093-
):
1081+
if not any(ping_result.values()):
1082+
LOGGER.warning(
1083+
"Skip set-up for node %s because it does not appear to be reachable...",
1084+
node_id,
1085+
)
1086+
return
1087+
LOGGER.info("Setting-up node %s...", node_id)
1088+
# (re)interview node (only) if needed
1089+
node_data = self._nodes[node_id]
1090+
if (
1091+
# re-interview if we dont have any node attributes (empty node)
1092+
not node_data.attributes
1093+
# re-interview if the data model schema has changed
1094+
or node_data.interview_version != DATA_MODEL_SCHEMA_VERSION
1095+
):
1096+
try:
1097+
await self.interview_node(node_id)
1098+
except (NodeNotResolving, NodeInterviewFailed) as err:
1099+
LOGGER.warning(
1100+
"Unable to interview Node %s: %s",
1101+
node_id,
1102+
str(err) or err.__class__.__name__,
1103+
# log full stack trace if debug logging is enabled
1104+
exc_info=err
1105+
if LOGGER.isEnabledFor(logging.DEBUG)
1106+
else None,
1107+
)
1108+
# NOTE: the node will be picked up by mdns discovery automatically
1109+
# when it comes available again.
1110+
return
1111+
# setup subscriptions for the node
10941112
try:
1095-
await self.interview_node(node_id)
1096-
except (NodeNotResolving, NodeInterviewFailed) as err:
1113+
await self._subscribe_node(node_id)
1114+
except (NodeNotResolving, ChipStackError) as err:
10971115
LOGGER.warning(
1098-
"Unable to interview Node %s: %s",
1116+
"Unable to subscribe to Node %s: %s",
10991117
node_id,
11001118
str(err) or err.__class__.__name__,
11011119
# log full stack trace if debug logging is enabled
11021120
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
11031121
)
11041122
# NOTE: the node will be picked up by mdns discovery automatically
1105-
# when it comes available again.
1106-
return
1107-
# setup subscriptions for the node
1108-
try:
1109-
await self._subscribe_node(node_id)
1110-
except (NodeNotResolving, ChipStackError) as err:
1111-
LOGGER.warning(
1112-
"Unable to subscribe to Node %s: %s",
1113-
node_id,
1114-
str(err) or err.__class__.__name__,
1115-
# log full stack trace if debug logging is enabled
1116-
exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None,
1117-
)
1118-
# NOTE: the node will be picked up by mdns discovery automatically
1119-
# when it becomes available again.
1123+
# when it becomes available again.
11201124
finally:
11211125
self._nodes_in_setup.discard(node_id)
11221126

0 commit comments

Comments
 (0)