@@ -115,7 +115,7 @@ def __init__(
115
115
self ._fallback_node_scanner_task : asyncio .Task | None = None
116
116
self ._node_setup_throttle = asyncio .Semaphore (5 )
117
117
self ._mdns_event_timer : dict [str , asyncio .TimerHandle ] = {}
118
- self ._resolve_lock = asyncio .Lock ()
118
+ self ._node_lock : dict [ int , asyncio .Lock ] = {}
119
119
120
120
async def initialize (self ) -> None :
121
121
"""Async initialize of controller."""
@@ -434,14 +434,15 @@ async def open_commissioning_window(
434
434
if discriminator is None :
435
435
discriminator = randint (0 , 4095 ) # noqa: S311
436
436
437
- sdk_result = await self ._call_sdk (
438
- self .chip_controller .OpenCommissioningWindow ,
439
- nodeid = node_id ,
440
- timeout = timeout ,
441
- iteration = iteration ,
442
- discriminator = discriminator ,
443
- option = option ,
444
- )
437
+ async with self ._get_node_lock (node_id ):
438
+ sdk_result = await self ._call_sdk (
439
+ self .chip_controller .OpenCommissioningWindow ,
440
+ nodeid = node_id ,
441
+ timeout = timeout ,
442
+ iteration = iteration ,
443
+ discriminator = discriminator ,
444
+ option = option ,
445
+ )
445
446
self ._known_commissioning_params [node_id ] = params = CommissioningParameters (
446
447
setup_pin_code = sdk_result .setupPinCode ,
447
448
setup_manual_code = sdk_result .setupManualCode ,
@@ -501,13 +502,14 @@ async def interview_node(self, node_id: int) -> None:
501
502
502
503
try :
503
504
LOGGER .info ("Interviewing node: %s" , node_id )
504
- read_response : Attribute .AsyncReadTransaction .ReadResponse = (
505
- await self .chip_controller .Read (
506
- nodeid = node_id ,
507
- attributes = "*" ,
508
- fabricFiltered = False ,
505
+ async with self ._get_node_lock (node_id ):
506
+ read_response : Attribute .AsyncReadTransaction .ReadResponse = (
507
+ await self .chip_controller .Read (
508
+ nodeid = node_id ,
509
+ attributes = "*" ,
510
+ fabricFiltered = False ,
511
+ )
509
512
)
510
- )
511
513
except ChipStackError as err :
512
514
raise NodeInterviewFailed (f"Failed to interview node { node_id } " ) from err
513
515
@@ -563,14 +565,15 @@ async def send_device_command(
563
565
cluster_cls : Cluster = ALL_CLUSTERS [cluster_id ]
564
566
command_cls = getattr (cluster_cls .Commands , command_name )
565
567
command = dataclass_from_dict (command_cls , payload , allow_sdk_types = True )
566
- return await self .chip_controller .SendCommand (
567
- nodeid = node_id ,
568
- endpoint = endpoint_id ,
569
- payload = command ,
570
- responseType = response_type ,
571
- timedRequestTimeoutMs = timed_request_timeout_ms ,
572
- interactionTimeoutMs = interaction_timeout_ms ,
573
- )
568
+ async with self ._get_node_lock (node_id ):
569
+ return await self .chip_controller .SendCommand (
570
+ nodeid = node_id ,
571
+ endpoint = endpoint_id ,
572
+ payload = command ,
573
+ responseType = response_type ,
574
+ timedRequestTimeoutMs = timed_request_timeout_ms ,
575
+ interactionTimeoutMs = interaction_timeout_ms ,
576
+ )
574
577
575
578
@api_command (APICommand .READ_ATTRIBUTE )
576
579
async def read_attribute (
@@ -592,21 +595,22 @@ async def read_attribute(
592
595
593
596
future = self .server .loop .create_future ()
594
597
device = await self ._resolve_node (node_id )
595
- Attribute .Read (
596
- future = future ,
597
- eventLoop = self .server .loop ,
598
- device = device .deviceProxy ,
599
- devCtrl = self .chip_controller ,
600
- attributes = [
601
- Attribute .AttributePath (
602
- EndpointId = endpoint_id ,
603
- ClusterId = cluster_id ,
604
- AttributeId = attribute_id ,
605
- )
606
- ],
607
- fabricFiltered = fabric_filtered ,
608
- ).raise_on_error ()
609
- result : Attribute .AsyncReadTransaction .ReadResponse = await future
598
+ async with self ._get_node_lock (node_id ):
599
+ Attribute .Read (
600
+ future = future ,
601
+ eventLoop = self .server .loop ,
602
+ device = device .deviceProxy ,
603
+ devCtrl = self .chip_controller ,
604
+ attributes = [
605
+ Attribute .AttributePath (
606
+ EndpointId = endpoint_id ,
607
+ ClusterId = cluster_id ,
608
+ AttributeId = attribute_id ,
609
+ )
610
+ ],
611
+ fabricFiltered = fabric_filtered ,
612
+ ).raise_on_error ()
613
+ result : Attribute .AsyncReadTransaction .ReadResponse = await future
610
614
read_atributes = parse_attributes_from_read_result (result .tlvAttributes )
611
615
# update cached info in node attributes
612
616
self ._nodes [node_id ].attributes .update (read_atributes )
@@ -636,10 +640,11 @@ async def write_attribute(
636
640
value_type = attribute .attribute_type .Type ,
637
641
allow_sdk_types = True ,
638
642
)
639
- return await self .chip_controller .WriteAttribute (
640
- nodeid = node_id ,
641
- attributes = [(endpoint_id , attribute )],
642
- )
643
+ async with self ._get_node_lock (node_id ):
644
+ return await self .chip_controller .WriteAttribute (
645
+ nodeid = node_id ,
646
+ attributes = [(endpoint_id , attribute )],
647
+ )
643
648
644
649
@api_command (APICommand .REMOVE_NODE )
645
650
async def remove_node (self , node_id : int ) -> None :
@@ -998,16 +1003,17 @@ def resubscription_succeeded(
998
1003
else :
999
1004
interval_ceiling = NODE_SUBSCRIPTION_CEILING_THREAD
1000
1005
self ._last_subscription_attempt [node_id ] = 0
1001
- sub : Attribute .SubscriptionTransaction = await self .chip_controller .Read (
1002
- node_id ,
1003
- attributes = "*" ,
1004
- events = [("*" , 1 )],
1005
- returnClusterObject = False ,
1006
- reportInterval = (interval_floor , interval_ceiling ),
1007
- fabricFiltered = False ,
1008
- keepSubscriptions = True ,
1009
- autoResubscribe = True ,
1010
- )
1006
+ async with self ._get_node_lock (node_id ):
1007
+ sub : Attribute .SubscriptionTransaction = await self .chip_controller .Read (
1008
+ node_id ,
1009
+ attributes = "*" ,
1010
+ events = [("*" , 1 )],
1011
+ returnClusterObject = False ,
1012
+ reportInterval = (interval_floor , interval_ceiling ),
1013
+ fabricFiltered = False ,
1014
+ keepSubscriptions = True ,
1015
+ autoResubscribe = True ,
1016
+ )
1011
1017
1012
1018
sub .SetAttributeUpdateCallback (attribute_updated_callback )
1013
1019
sub .SetEventUpdateCallback (event_callback )
@@ -1133,7 +1139,7 @@ async def _resolve_node(
1133
1139
retries ,
1134
1140
)
1135
1141
time_start = time .time ()
1136
- async with self ._resolve_lock :
1142
+ async with self ._get_node_lock ( node_id ) :
1137
1143
return await self ._call_sdk (
1138
1144
self .chip_controller .GetConnectedDeviceSync ,
1139
1145
nodeid = node_id ,
@@ -1350,3 +1356,9 @@ def run_fallback_node_scanner() -> None:
1350
1356
self ._fallback_node_scanner_timer = self .server .loop .call_later (
1351
1357
FALLBACK_NODE_SCANNER_INTERVAL , run_fallback_node_scanner
1352
1358
)
1359
+
1360
+ def _get_node_lock (self , node_id : int ) -> asyncio .Lock :
1361
+ """Return lock for given node."""
1362
+ if node_id not in self ._node_lock :
1363
+ self ._node_lock [node_id ] = asyncio .Lock ()
1364
+ return self ._node_lock [node_id ]
0 commit comments