|
63 | 63 | LOGGER = logging.getLogger(__name__)
|
64 | 64 | MAX_POLL_INTERVAL = 600
|
65 | 65 | MAX_COMMISSION_RETRIES = 3
|
| 66 | +NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE = 3 |
| 67 | +NODE_RESUBSCRIBE_TIMEOUT_OFFLINE = 30 * 60 * 1000 |
66 | 68 |
|
67 | 69 | MDNS_TYPE_OPERATIONAL_NODE = "_matter._tcp.local."
|
68 | 70 | MDNS_TYPE_COMMISSIONABLE_NODE = "_matterc._udp.local."
|
@@ -851,7 +853,7 @@ async def _subscribe_node(self, node_id: int) -> None:
|
851 | 853 | node_logger.debug("Re-using existing subscription.")
|
852 | 854 | return
|
853 | 855 | async with node_lock:
|
854 |
| - node_logger.debug("Unsubscribing from existing subscription.") |
| 856 | + node_logger.info("Unsubscribing from existing subscription.") |
855 | 857 | await self._call_sdk(prev_sub.Shutdown)
|
856 | 858 | del self._subscriptions[node_id]
|
857 | 859 |
|
@@ -976,14 +978,25 @@ def resubscription_attempted(
|
976 | 978 | terminationError,
|
977 | 979 | nextResubscribeIntervalMsec,
|
978 | 980 | )
|
979 |
| - # mark node as unavailable and signal consumers |
980 |
| - # we debounce it a bit so we only mark the node unavailable |
981 |
| - # at the second resubscription attempt |
982 |
| - if node.available and self._last_subscription_attempt[node_id] >= 1: |
983 |
| - # NOTE: if the node is (re)discovered by mdns, that callback will |
984 |
| - # take care of resubscribing to the node |
| 981 | + resubscription_attempt = self._last_subscription_attempt[node_id] + 1 |
| 982 | + self._last_subscription_attempt[node_id] = resubscription_attempt |
| 983 | + # Mark node as unavailable and signal consumers. |
| 984 | + # We debounce it a bit so we only mark the node unavailable |
| 985 | + # after some resubscription attempts and we shutdown the subscription |
| 986 | + # if the resubscription interval exceeds 30 minutes (TTL of mdns). |
| 987 | + # The node will be auto picked up by mdns if it's alive again. |
| 988 | + if ( |
| 989 | + node.available |
| 990 | + and resubscription_attempt >= NODE_RESUBSCRIBE_ATTEMPTS_UNAVAILABLE |
| 991 | + ): |
| 992 | + node.available = False |
| 993 | + self.server.signal_event(EventType.NODE_UPDATED, node) |
| 994 | + LOGGER.info("Marked node %s as unavailable", node_id) |
| 995 | + if ( |
| 996 | + not node.available |
| 997 | + and nextResubscribeIntervalMsec > NODE_RESUBSCRIBE_TIMEOUT_OFFLINE |
| 998 | + ): |
985 | 999 | asyncio.create_task(self._node_offline(node_id))
|
986 |
| - self._last_subscription_attempt[node_id] += 1 |
987 | 1000 |
|
988 | 1001 | def resubscription_succeeded(
|
989 | 1002 | transaction: Attribute.SubscriptionTransaction,
|
@@ -1249,7 +1262,7 @@ async def _node_offline(self, node_id: int) -> None:
|
1249 | 1262 | # shutdown existing subscriptions
|
1250 | 1263 | if sub := self._subscriptions.pop(node_id, None):
|
1251 | 1264 | await self._call_sdk(sub.Shutdown)
|
1252 |
| - # mark node as unavailable |
| 1265 | + # mark node as unavailable (if it wasn't already) |
1253 | 1266 | node = self._nodes[node_id]
|
1254 | 1267 | if not node.available:
|
1255 | 1268 | return # nothing to do to
|
|
0 commit comments