diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index 9a64332aea3fa9..5ce4c3771ad0f2 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -1069,14 +1069,16 @@ void InteractionModelEngine::OnResponseTimeout(Messaging::ExchangeContext * ec) } #if CHIP_CONFIG_ENABLE_READ_CLIENT -void InteractionModelEngine::OnActiveModeNotification(ScopedNodeId aPeer) +void InteractionModelEngine::OnActiveModeNotification(ScopedNodeId aPeer, uint64_t aMonitoredSubject) { for (ReadClient * pListItem = mpActiveReadClientList; pListItem != nullptr;) { auto pNextItem = pListItem->GetNextClient(); // It is possible that pListItem is destroyed by the app in OnActiveModeNotification. // Get the next item before invoking `OnActiveModeNotification`. - if (ScopedNodeId(pListItem->GetPeerNodeId(), pListItem->GetFabricIndex()) == aPeer) + CATValues cats; + mpFabricTable->FetchCATs(pListItem->GetFabricIndex(), cats); + if (ScopedNodeId(pListItem->GetPeerNodeId(), pListItem->GetFabricIndex()) == aPeer && cats.CheckSubjectAgainstCATs(aMonitoredSubject)) { pListItem->OnActiveModeNotification(); } diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 1f286f7dce2974..7bb9e60b40d813 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -236,16 +236,15 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, /** * Activate the idle subscriptions. * - * When subscribing to ICD and liveness timeout reached, the read client will move to `InactiveICDSubscription` state and - * resubscription can be triggered via OnActiveModeNotification(). + * See ReadClient::OnActiveModeNotification */ - void OnActiveModeNotification(ScopedNodeId aPeer); + void OnActiveModeNotification(ScopedNodeId aPeer, uint64_t aMonitoredSubject); /** * Used to notify when a peer becomes LIT ICD or vice versa. * * ReadClient will call this function when it finds any updates of the OperatingMode attribute from ICD management - * cluster. The application doesn't need to call this function, usually. + * cluster. The application doesn't need to call this function, usually. */ void OnPeerTypeChange(ScopedNodeId aPeer, ReadClient::PeerType aType); diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index da127253cf7d12..cec540b097645b 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -485,6 +485,11 @@ CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Build void ReadClient::OnActiveModeNotification() { VerifyOrDie(mpImEngine->InActiveReadClientList(this)); + + // Note: this API only works when issuing subscription via SendAutoResubscribeRequest. When SendAutoResubscribeRequest is + // called, either mEventPathParamsListSize or mAttributePathParamsListSize is not 0. + VerifyOrReturn(mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0); + // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription // logic in `OnLivenessTimeoutCallback`. if (IsInactiveICDSubscription()) @@ -493,6 +498,18 @@ void ReadClient::OnActiveModeNotification() return; } + // When receiving check-in message, it means all tracked subscriptions for this node has gone in server side, if there is a related active subscription + // and subscription has not yet rescheduled in client side, we should forcibly timeout the current subscription, and schedule a new one. + if (!mIsResubscriptionScheduled) + { + // Closing will ultimately trigger ScheduleResubscription with the aReestablishCASE argument set to true, effectively + // rendering the session defunct. + Close(CHIP_ERROR_TIMEOUT); + return; + } + + // If the server sends a check-in message and a subscription is already scheduled, it indicates a client-side subscription + // timeout or failure. Cancel the scheduled subscription and initiate a new one immediately. TriggerResubscribeIfScheduled("check-in message"); } @@ -504,10 +521,10 @@ void ReadClient::OnPeerTypeChange(PeerType aType) ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a"); - // If the peer is no longer LIT, try to wake up the subscription and do resubscribe when necessary. - if (!mIsPeerLIT) + // If the peer is no longer LIT and in inactive ICD subscription status, try to wake up the subscription and do resubscribe. + if (!mIsPeerLIT && IsInactiveICDSubscription()) { - OnActiveModeNotification(); + TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT); } } diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index d8b8a971db8c28..9cc62ab3d8177a 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -351,11 +351,17 @@ class ReadClient : public Messaging::ExchangeDelegate * Re-activate an inactive subscription. * * When subscribing to LIT-ICD and liveness timeout reached and OnResubscriptionNeeded returns - * CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT, the read client will move to the InactiveICDSubscription state and - * resubscription can be triggered via OnActiveModeNotification(). + * CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT, the read client will move to the InactiveICDSubscription state and + * resubscription can be triggered via OnActiveModeNotification(). * * If the subscription is not in the `InactiveICDSubscription` state, this function will do nothing. So it is always safe to - * call this function when a check-in message is received. + * call this function when a check-in message is received. + * + * If the server sends out check-in message, and there is a active tracked active subscription in client side at the same time, + * it means current client does not realize this tracked subscription has gone, and we should forcibly timeout current + * subscription, and schedule a new one. + * + * This API only works when issuing subscription via SendAutoResubscribeRequest. */ void OnActiveModeNotification(); diff --git a/src/app/icd/client/CheckInHandler.cpp b/src/app/icd/client/CheckInHandler.cpp index f4eaa48b114116..7b5d213f7cd7d6 100644 --- a/src/app/icd/client/CheckInHandler.cpp +++ b/src/app/icd/client/CheckInHandler.cpp @@ -138,7 +138,7 @@ CHIP_ERROR CheckInHandler::OnMessageReceived(Messaging::ExchangeContext * ec, co mpICDClientStorage->StoreEntry(clientInfo); mpCheckInDelegate->OnCheckInComplete(clientInfo); #if CHIP_CONFIG_ENABLE_READ_CLIENT - mpImEngine->OnActiveModeNotification(clientInfo.peer_node); + mpImEngine->OnActiveModeNotification(clientInfo.peer_node, clientInfo.monitored_subject); #endif // CHIP_CONFIG_ENABLE_READ_CLIENT } diff --git a/src/app/icd/client/RefreshKeySender.cpp b/src/app/icd/client/RefreshKeySender.cpp index de145b6182187e..2907d55792b0b4 100644 --- a/src/app/icd/client/RefreshKeySender.cpp +++ b/src/app/icd/client/RefreshKeySender.cpp @@ -69,7 +69,7 @@ CHIP_ERROR RefreshKeySender::RegisterClientWithNewKey(Messaging::ExchangeManager mpCheckInDelegate->OnCheckInComplete(mICDClientInfo); #if CHIP_CONFIG_ENABLE_READ_CLIENT - mpImEngine->OnActiveModeNotification(mICDClientInfo.peer_node); + mpImEngine->OnActiveModeNotification(mICDClientInfo.peer_node,mICDClientInfo.monitored_subject); #endif // CHIP_CONFIG_ENABLE_READ_CLIENT mpCheckInDelegate->OnKeyRefreshDone(this, CHIP_NO_ERROR); }; diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 4cb6cd1acf5679..9aca9cd648e754 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -2330,7 +2330,7 @@ TEST_F(TestRead, TestSubscribe_OnActiveModeNotification) GetLoopback().mNumMessagesToDrop = 0; callback.ClearCounters(); InteractionModelEngine::GetInstance()->OnActiveModeNotification( - ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex())); + ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast(0)); EXPECT_EQ(callback.mOnResubscriptionsAttempted, 1); EXPECT_EQ(callback.mLastError, CHIP_ERROR_TIMEOUT); @@ -2354,6 +2354,138 @@ TEST_F(TestRead, TestSubscribe_OnActiveModeNotification) EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u); } +/** + * When all tracked subscriptions go away in server, check-in message is received and OnActiveModeNotification is called in client + * side, the tracked subscription would be torn down and a new one would be rescheduled in client side. + */ +TEST_F(TestRead, TestSubscribe_SubGoAwayInserverOnActiveModeNotification) +{ + auto sessionHandle = GetSessionBobToAlice(); + + SetMRPMode(MessagingContext::MRPMode::kResponsive); + + { + TestResubscriptionCallback callback; + ReadClient readClient(InteractionModelEngine::GetInstance(), &GetExchangeManager(), callback, + ReadClient::InteractionType::Subscribe); + + callback.mScheduleLITResubscribeImmediately = false; + callback.SetReadClient(&readClient); + + ReadPrepareParams readPrepareParams(GetSessionBobToAlice()); + + // Read full wildcard paths, repeat twice to ensure chunking. + AttributePathParams attributePathParams[1]; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = MATTER_ARRAY_SIZE(attributePathParams); + attributePathParams[0].mEndpointId = kTestEndpointId; + attributePathParams[0].mClusterId = Clusters::UnitTesting::Id; + attributePathParams[0].mAttributeId = Clusters::UnitTesting::Attributes::Boolean::Id; + + constexpr uint16_t maxIntervalCeilingSeconds = 1; + + readPrepareParams.mMaxIntervalCeilingSeconds = maxIntervalCeilingSeconds; + readPrepareParams.mIsPeerLIT = true; + + auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); + EXPECT_EQ(err, CHIP_NO_ERROR); + + // + // Drive servicing IO till we have established a subscription. + // + GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; }); + EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1); + EXPECT_EQ(callback.mOnError, 0); + EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0); + + GetLoopback().mNumMessagesToDrop = 0; + callback.ClearCounters(); + InteractionModelEngine::GetInstance()->OnActiveModeNotification( + ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast(0)); + EXPECT_EQ(callback.mOnResubscriptionsAttempted, 1); + EXPECT_EQ(callback.mLastError, CHIP_ERROR_TIMEOUT); + + // + // Drive servicing IO till we have established a subscription. + // + GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount == 1; }); + EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1); + + // + // With re-sub enabled, we shouldn't have encountered any errors + // + EXPECT_EQ(callback.mOnError, 0); + EXPECT_EQ(callback.mOnDone, 0); + } + + SetMRPMode(MessagingContext::MRPMode::kDefault); + + InteractionModelEngine::GetInstance()->ShutdownActiveReads(); + EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u); +} + +/** + * When all tracked subscriptions go away in server, check-in message is received and OnActiveModeNotification is called in client + * side, the untracked subscription would be kept. + */ +TEST_F(TestRead, TestSubscribe_MismatchedSubGoAwayInserverOnActiveModeNotification) +{ + auto sessionHandle = GetSessionBobToAlice(); + + SetMRPMode(MessagingContext::MRPMode::kResponsive); + + { + TestResubscriptionCallback callback; + ReadClient readClient(InteractionModelEngine::GetInstance(), &GetExchangeManager(), callback, + ReadClient::InteractionType::Subscribe); + + callback.mScheduleLITResubscribeImmediately = false; + callback.SetReadClient(&readClient); + + ReadPrepareParams readPrepareParams(GetSessionBobToAlice()); + + // Read full wildcard paths, repeat twice to ensure chunking. + AttributePathParams attributePathParams[1]; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = MATTER_ARRAY_SIZE(attributePathParams); + attributePathParams[0].mEndpointId = kTestEndpointId; + attributePathParams[0].mClusterId = Clusters::UnitTesting::Id; + attributePathParams[0].mAttributeId = Clusters::UnitTesting::Attributes::Boolean::Id; + constexpr uint16_t maxIntervalCeilingSeconds = 1; + + readPrepareParams.mMaxIntervalCeilingSeconds = maxIntervalCeilingSeconds; + readPrepareParams.mIsPeerLIT = true; + + auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); + EXPECT_EQ(err, CHIP_NO_ERROR); + + // + // Drive servicing IO till we have established a subscription. + // + GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; }); + EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1); + EXPECT_EQ(callback.mOnError, 0); + EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0); + + GetLoopback().mNumMessagesToDrop = 0; + callback.ClearCounters(); + InteractionModelEngine::GetInstance()->OnActiveModeNotification( + ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast(0)); + EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0); + EXPECT_EQ(callback.mLastError, CHIP_NO_ERROR); + EXPECT_EQ(callback.mOnError, 0); + EXPECT_EQ(callback.mOnDone, 0); + } + + SetMRPMode(MessagingContext::MRPMode::kDefault); + + InteractionModelEngine::GetInstance()->ShutdownActiveReads(); + EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u); +} + TEST_F(TestRead, TestSubscribeFailed_OnActiveModeNotification) { auto sessionHandle = GetSessionBobToAlice(); @@ -2393,7 +2525,7 @@ TEST_F(TestRead, TestSubscribeFailed_OnActiveModeNotification) GetLoopback().mNumMessagesToDrop = 0; callback.ClearCounters(); InteractionModelEngine::GetInstance()->OnActiveModeNotification( - ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex())); + ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast(0)); // // Drive servicing IO till we have established a subscription. //