@@ -104,6 +104,7 @@ def __init__(
104
104
self .thread_credentials_set : bool = False
105
105
self .compressed_fabric_id : int | None = None
106
106
self ._node_lock : dict [int , asyncio .Lock ] = {}
107
+ self ._node_setup_lock : asyncio .Lock = asyncio .Lock ()
107
108
self ._aiobrowser : AsyncServiceBrowser | None = None
108
109
self ._aiozc : AsyncZeroconf | None = None
109
110
@@ -1042,35 +1043,38 @@ async def _setup_node(self, node_id: int) -> None:
1042
1043
self ._nodes_in_setup .add (node_id )
1043
1044
# pre-cache ip-addresses
1044
1045
await self .get_node_ip_addresses (node_id )
1045
- try :
1046
- # (re)interview node (only) if needed
1047
- node_data = self ._nodes [node_id ]
1048
- if (
1049
- # re-interview if we dont have any node attributes (empty node)
1050
- not node_data .attributes
1051
- # re-interview if the data model schema has changed
1052
- or node_data .interview_version != DATA_MODEL_SCHEMA_VERSION
1053
- ):
1046
+ # we use a lock for the node setup process to process nodes sequentially
1047
+ # to prevent a flood of the (thread) network when there are many nodes being setup.
1048
+ async with self ._node_setup_lock :
1049
+ try :
1050
+ # (re)interview node (only) if needed
1051
+ node_data = self ._nodes [node_id ]
1052
+ if (
1053
+ # re-interview if we dont have any node attributes (empty node)
1054
+ not node_data .attributes
1055
+ # re-interview if the data model schema has changed
1056
+ or node_data .interview_version != DATA_MODEL_SCHEMA_VERSION
1057
+ ):
1058
+ try :
1059
+ await self .interview_node (node_id )
1060
+ except (NodeNotResolving , NodeInterviewFailed ) as err :
1061
+ LOGGER .warning ("Unable to interview Node %s" , exc_info = err )
1062
+ # NOTE: the node will be picked up by mdns discovery automatically
1063
+ # when it comes available again.
1064
+ return
1065
+
1066
+ # setup subscriptions for the node
1054
1067
try :
1055
- await self .interview_node (node_id )
1056
- except (NodeNotResolving , NodeInterviewFailed ) as err :
1057
- LOGGER .warning ("Unable to interview Node %s" , exc_info = err )
1068
+ await self ._subscribe_node (node_id )
1069
+ except NodeNotResolving :
1070
+ LOGGER .warning (
1071
+ "Unable to subscribe to Node %s as it is unavailable" ,
1072
+ node_id ,
1073
+ )
1058
1074
# NOTE: the node will be picked up by mdns discovery automatically
1059
- # when it comes available again.
1060
- return
1061
-
1062
- # setup subscriptions for the node
1063
- try :
1064
- await self ._subscribe_node (node_id )
1065
- except NodeNotResolving :
1066
- LOGGER .warning (
1067
- "Unable to subscribe to Node %s as it is unavailable" ,
1068
- node_id ,
1069
- )
1070
- # NOTE: the node will be picked up by mdns discovery automatically
1071
- # when it becomes available again.
1072
- finally :
1073
- self ._nodes_in_setup .discard (node_id )
1075
+ # when it becomes available again.
1076
+ finally :
1077
+ self ._nodes_in_setup .discard (node_id )
1074
1078
1075
1079
async def _resolve_node (
1076
1080
self , node_id : int , retries : int = 2 , attempt : int = 1
0 commit comments