@@ -84,6 +84,7 @@ def __init__(
84
84
self ._interview_limit : asyncio .Semaphore = asyncio .Semaphore (
85
85
INTERVIEW_TASK_LIMIT
86
86
)
87
+ self ._resolve_lock : asyncio .Lock = asyncio .Lock ()
87
88
self ._node_lock : dict [int , asyncio .Lock ] = {}
88
89
89
90
async def initialize (self ) -> None :
@@ -583,9 +584,9 @@ async def _subscribe_node(self, node_id: int) -> None:
583
584
node_logger .debug ("Unsubscribing from existing subscription." )
584
585
await self ._call_sdk (prev_sub .Shutdown )
585
586
586
- node_logger .debug ("Setting up attributes and events subscription." )
587
587
self ._attr_subscriptions [node_id ] = attr_subscriptions
588
588
async with node_lock :
589
+ node_logger .debug ("Setting up attributes and events subscription." )
589
590
sub : Attribute .SubscriptionTransaction = await self .chip_controller .Read (
590
591
nodeid = node_id ,
591
592
# In order to prevent network congestion due to wildcard subscriptions on all nodes,
@@ -744,7 +745,7 @@ async def _call_sdk(self, func: Callable[..., _T], *args: Any, **kwargs: Any) ->
744
745
)
745
746
746
747
async def _check_interview_and_subscription (
747
- self , node_id : int , reschedule_interval : int = 300
748
+ self , node_id : int , reschedule_interval : int = 30
748
749
) -> None :
749
750
"""Handle interview (if needed) and subscription for known node."""
750
751
@@ -756,8 +757,8 @@ def reschedule() -> None:
756
757
asyncio .create_task ,
757
758
self ._check_interview_and_subscription (
758
759
node_id ,
759
- # increase interval at each attempt with maximum of 1 hour
760
- min (reschedule_interval + 300 , 3600 ),
760
+ # increase interval at each attempt with maximum of 10 minutes
761
+ min (reschedule_interval + 10 , 600 ),
761
762
),
762
763
)
763
764
@@ -837,37 +838,23 @@ def _parse_attributes_from_read_result(
837
838
result [attribute_path ] = attr_value
838
839
return result
839
840
840
- async def _resolve_node (
841
- self , node_id : int , retries : int = 3 , allow_pase : bool = False
842
- ) -> None :
841
+ async def _resolve_node (self , node_id : int , retries : int = 3 ) -> None :
843
842
"""Resolve a Node on the network."""
844
843
node_lock = self ._get_node_lock (node_id )
845
844
if self .chip_controller is None :
846
845
raise RuntimeError ("Device Controller not initialized." )
847
846
try :
848
- if allow_pase :
849
- # last attempt allows PASE connection (last resort)
850
- LOGGER .debug (
851
- "Attempting to resolve node %s (with PASE connection)" , node_id
847
+ async with node_lock , self ._resolve_lock :
848
+ LOGGER .info ("Attempting to resolve node %s..." , node_id )
849
+ await self ._call_sdk (
850
+ self .chip_controller .ResolveNode ,
851
+ nodeid = node_id ,
852
852
)
853
- async with node_lock :
854
- await self ._call_sdk (
855
- self .chip_controller .GetConnectedDeviceSync ,
856
- nodeid = node_id ,
857
- allowPASE = True ,
858
- timeoutMs = 30000 ,
859
- )
860
- return
861
- LOGGER .debug ("Resolving node %s" , node_id )
862
- await self ._call_sdk (self .chip_controller .ResolveNode , nodeid = node_id )
863
853
except (ChipStackError , TimeoutError ) as err :
864
- if not retries :
854
+ if retries <= 1 :
865
855
# when we're out of retries, raise NodeNotResolving
866
856
raise NodeNotResolving (f"Unable to resolve Node { node_id } " ) from err
867
- async with node_lock :
868
- await self ._resolve_node (
869
- node_id = node_id , retries = retries - 1 , allow_pase = retries - 1 == 0
870
- )
857
+ await self ._resolve_node (node_id = node_id , retries = retries - 1 )
871
858
await asyncio .sleep (2 )
872
859
873
860
def _handle_endpoints_removed (self , node_id : int , endpoints : Iterable [int ]) -> None :
0 commit comments