Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ICD]Trigger resubscription when receiving check-in and subscription has not yet become abnormal #37831

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1069,14 +1069,16 @@ void InteractionModelEngine::OnResponseTimeout(Messaging::ExchangeContext * ec)
}

#if CHIP_CONFIG_ENABLE_READ_CLIENT
void InteractionModelEngine::OnActiveModeNotification(ScopedNodeId aPeer)
void InteractionModelEngine::OnActiveModeNotification(ScopedNodeId aPeer, uint64_t aMonitoredSubject)
{
for (ReadClient * pListItem = mpActiveReadClientList; pListItem != nullptr;)
{
auto pNextItem = pListItem->GetNextClient();
// It is possible that pListItem is destroyed by the app in OnActiveModeNotification.
// Get the next item before invoking `OnActiveModeNotification`.
if (ScopedNodeId(pListItem->GetPeerNodeId(), pListItem->GetFabricIndex()) == aPeer)
CATValues cats;
mpFabricTable->FetchCATs(pListItem->GetFabricIndex(), cats);
if (ScopedNodeId(pListItem->GetPeerNodeId(), pListItem->GetFabricIndex()) == aPeer && cats.CheckSubjectAgainstCATs(aMonitoredSubject))
{
pListItem->OnActiveModeNotification();
}
Expand Down
7 changes: 3 additions & 4 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,15 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
/**
* Activate the idle subscriptions.
*
* When subscribing to ICD and liveness timeout reached, the read client will move to `InactiveICDSubscription` state and
* resubscription can be triggered via OnActiveModeNotification().
* See ReadClient::OnActiveModeNotification
*/
void OnActiveModeNotification(ScopedNodeId aPeer);
void OnActiveModeNotification(ScopedNodeId aPeer, uint64_t aMonitoredSubject);

/**
* Used to notify when a peer becomes LIT ICD or vice versa.
*
* ReadClient will call this function when it finds any updates of the OperatingMode attribute from ICD management
* cluster. The application doesn't need to call this function, usually.
* cluster. The application doesn't need to call this function, usually.
*/
void OnPeerTypeChange(ScopedNodeId aPeer, ReadClient::PeerType aType);

Expand Down
23 changes: 20 additions & 3 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Build
void ReadClient::OnActiveModeNotification()
{
VerifyOrDie(mpImEngine->InActiveReadClientList(this));

// Note: this API only works when issuing subscription via SendAutoResubscribeRequest. When SendAutoResubscribeRequest is
// called, either mEventPathParamsListSize or mAttributePathParamsListSize is not 0.
VerifyOrReturn(mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0);

// When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
// logic in `OnLivenessTimeoutCallback`.
if (IsInactiveICDSubscription())
Expand All @@ -493,6 +498,18 @@ void ReadClient::OnActiveModeNotification()
return;
}

// When receiving check-in message, it means all tracked subscriptions for this node has gone in server side, if there is a related active subscription
// and subscription has not yet rescheduled in client side, we should forcibly timeout the current subscription, and schedule a new one.
if (!mIsResubscriptionScheduled)
{
// Closing will ultimately trigger ScheduleResubscription with the aReestablishCASE argument set to true, effectively
// rendering the session defunct.
Close(CHIP_ERROR_TIMEOUT);
return;
}

// If the server sends a check-in message and a subscription is already scheduled, it indicates a client-side subscription
// timeout or failure. Cancel the scheduled subscription and initiate a new one immediately.
TriggerResubscribeIfScheduled("check-in message");
}

Expand All @@ -504,10 +521,10 @@ void ReadClient::OnPeerTypeChange(PeerType aType)

ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");

// If the peer is no longer LIT, try to wake up the subscription and do resubscribe when necessary.
if (!mIsPeerLIT)
// If the peer is no longer LIT and in inactive ICD subscription status, try to wake up the subscription and do resubscribe.
if (!mIsPeerLIT && IsInactiveICDSubscription())
{
OnActiveModeNotification();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am trying to understand why it's OK to remove this. The reason this seems to be here is that it's envisioning us having two subscriptions, one of which (call it A) is watching for peer type change and one of which (call it B) is IsInactiveICDSubscription(). When the transition to SIT is observed, B will no longer get any OnActiveModeNotifications, so how will it get out of the "inactive ICD subscription" state?

Of course if A does not exist and we just have B, and it's in the "inactive ICD subscription" state and then the server changes to A SIT, I think we just get stuck; that seems to be a hole in the whole logic of this stuff, including the spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the current change does not work on the below scenario

So I am trying to understand why it's OK to remove this. The reason this seems to be here is that it's envisioning us having two subscriptions, one of which (call it A) is watching for peer type change and one of which (call it B) is IsInactiveICDSubscription(). When the transition to SIT is observed, B will no longer get any OnActiveModeNotifications, so how will it get out of the "inactive ICD subscription" state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the code.

  1. Update PeerTypeChange and call TriggerResubscriptionForLivenessTimeout when peer type is changed to LIT and state is inactive icd subscription, instead of OnActiveModeNotification to help situation 1 in your question.
  2. If icd transition from lit to sit, it would send check-in message, which would bring up the in active subscription.

TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
}
}

Expand Down
12 changes: 9 additions & 3 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,17 @@ class ReadClient : public Messaging::ExchangeDelegate
* Re-activate an inactive subscription.
*
* When subscribing to LIT-ICD and liveness timeout reached and OnResubscriptionNeeded returns
* CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT, the read client will move to the InactiveICDSubscription state and
* resubscription can be triggered via OnActiveModeNotification().
* CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT, the read client will move to the InactiveICDSubscription state and
* resubscription can be triggered via OnActiveModeNotification().
*
* If the subscription is not in the `InactiveICDSubscription` state, this function will do nothing. So it is always safe to
* call this function when a check-in message is received.
* call this function when a check-in message is received.
*
* If the server sends out check-in message, and there is a active tracked active subscription in client side at the same time,
* it means current client does not realize this tracked subscription has gone, and we should forcibly timeout current
* subscription, and schedule a new one.
Comment on lines +360 to +362
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment does not match what the API actually does, really.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite the comments, thanks

*
* This API only works when issuing subscription via SendAutoResubscribeRequest.
*/
void OnActiveModeNotification();

Expand Down
2 changes: 1 addition & 1 deletion src/app/icd/client/CheckInHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ CHIP_ERROR CheckInHandler::OnMessageReceived(Messaging::ExchangeContext * ec, co
mpICDClientStorage->StoreEntry(clientInfo);
mpCheckInDelegate->OnCheckInComplete(clientInfo);
#if CHIP_CONFIG_ENABLE_READ_CLIENT
mpImEngine->OnActiveModeNotification(clientInfo.peer_node);
mpImEngine->OnActiveModeNotification(clientInfo.peer_node, clientInfo.monitored_subject);
#endif // CHIP_CONFIG_ENABLE_READ_CLIENT
}

Expand Down
2 changes: 1 addition & 1 deletion src/app/icd/client/RefreshKeySender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ CHIP_ERROR RefreshKeySender::RegisterClientWithNewKey(Messaging::ExchangeManager

mpCheckInDelegate->OnCheckInComplete(mICDClientInfo);
#if CHIP_CONFIG_ENABLE_READ_CLIENT
mpImEngine->OnActiveModeNotification(mICDClientInfo.peer_node);
mpImEngine->OnActiveModeNotification(mICDClientInfo.peer_node,mICDClientInfo.monitored_subject);
#endif // CHIP_CONFIG_ENABLE_READ_CLIENT
mpCheckInDelegate->OnKeyRefreshDone(this, CHIP_NO_ERROR);
};
Expand Down
136 changes: 134 additions & 2 deletions src/controller/tests/data_model/TestRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,7 @@ TEST_F(TestRead, TestSubscribe_OnActiveModeNotification)
GetLoopback().mNumMessagesToDrop = 0;
callback.ClearCounters();
InteractionModelEngine::GetInstance()->OnActiveModeNotification(
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()));
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast<uint64_t>(0));
EXPECT_EQ(callback.mOnResubscriptionsAttempted, 1);
EXPECT_EQ(callback.mLastError, CHIP_ERROR_TIMEOUT);

Expand All @@ -2354,6 +2354,138 @@ TEST_F(TestRead, TestSubscribe_OnActiveModeNotification)
EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u);
}

/**
* When all tracked subscriptions go away in server, check-in message is received and OnActiveModeNotification is called in client
* side, the tracked subscription would be torn down and a new one would be rescheduled in client side.
*/
TEST_F(TestRead, TestSubscribe_SubGoAwayInserverOnActiveModeNotification)
{
auto sessionHandle = GetSessionBobToAlice();

SetMRPMode(MessagingContext::MRPMode::kResponsive);

{
TestResubscriptionCallback callback;
ReadClient readClient(InteractionModelEngine::GetInstance(), &GetExchangeManager(), callback,
ReadClient::InteractionType::Subscribe);

callback.mScheduleLITResubscribeImmediately = false;
callback.SetReadClient(&readClient);

ReadPrepareParams readPrepareParams(GetSessionBobToAlice());

// Read full wildcard paths, repeat twice to ensure chunking.
AttributePathParams attributePathParams[1];
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = MATTER_ARRAY_SIZE(attributePathParams);
attributePathParams[0].mEndpointId = kTestEndpointId;
attributePathParams[0].mClusterId = Clusters::UnitTesting::Id;
attributePathParams[0].mAttributeId = Clusters::UnitTesting::Attributes::Boolean::Id;

constexpr uint16_t maxIntervalCeilingSeconds = 1;

readPrepareParams.mMaxIntervalCeilingSeconds = maxIntervalCeilingSeconds;
readPrepareParams.mIsPeerLIT = true;

auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
EXPECT_EQ(err, CHIP_NO_ERROR);

//
// Drive servicing IO till we have established a subscription.
//
GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1);
EXPECT_EQ(callback.mOnError, 0);
EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0);

GetLoopback().mNumMessagesToDrop = 0;
callback.ClearCounters();
InteractionModelEngine::GetInstance()->OnActiveModeNotification(
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast<uint64_t>(0));
EXPECT_EQ(callback.mOnResubscriptionsAttempted, 1);
EXPECT_EQ(callback.mLastError, CHIP_ERROR_TIMEOUT);

//
// Drive servicing IO till we have established a subscription.
//
GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount == 1; });
EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1);

//
// With re-sub enabled, we shouldn't have encountered any errors
//
EXPECT_EQ(callback.mOnError, 0);
EXPECT_EQ(callback.mOnDone, 0);
}

SetMRPMode(MessagingContext::MRPMode::kDefault);

InteractionModelEngine::GetInstance()->ShutdownActiveReads();
EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u);
}

/**
* When all tracked subscriptions go away in server, check-in message is received and OnActiveModeNotification is called in client
* side, the untracked subscription would be kept.
*/
TEST_F(TestRead, TestSubscribe_MismatchedSubGoAwayInserverOnActiveModeNotification)
{
auto sessionHandle = GetSessionBobToAlice();

SetMRPMode(MessagingContext::MRPMode::kResponsive);

{
TestResubscriptionCallback callback;
ReadClient readClient(InteractionModelEngine::GetInstance(), &GetExchangeManager(), callback,
ReadClient::InteractionType::Subscribe);

callback.mScheduleLITResubscribeImmediately = false;
callback.SetReadClient(&readClient);

ReadPrepareParams readPrepareParams(GetSessionBobToAlice());

// Read full wildcard paths, repeat twice to ensure chunking.
AttributePathParams attributePathParams[1];
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = MATTER_ARRAY_SIZE(attributePathParams);
attributePathParams[0].mEndpointId = kTestEndpointId;
attributePathParams[0].mClusterId = Clusters::UnitTesting::Id;
attributePathParams[0].mAttributeId = Clusters::UnitTesting::Attributes::Boolean::Id;
constexpr uint16_t maxIntervalCeilingSeconds = 1;

readPrepareParams.mMaxIntervalCeilingSeconds = maxIntervalCeilingSeconds;
readPrepareParams.mIsPeerLIT = true;

auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
EXPECT_EQ(err, CHIP_NO_ERROR);

//
// Drive servicing IO till we have established a subscription.
//
GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
EXPECT_EQ(callback.mOnSubscriptionEstablishedCount, 1);
EXPECT_EQ(callback.mOnError, 0);
EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0);

GetLoopback().mNumMessagesToDrop = 0;
callback.ClearCounters();
InteractionModelEngine::GetInstance()->OnActiveModeNotification(
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast<uint64_t>(0));
EXPECT_EQ(callback.mOnResubscriptionsAttempted, 0);
EXPECT_EQ(callback.mLastError, CHIP_NO_ERROR);
EXPECT_EQ(callback.mOnError, 0);
EXPECT_EQ(callback.mOnDone, 0);
}

SetMRPMode(MessagingContext::MRPMode::kDefault);

InteractionModelEngine::GetInstance()->ShutdownActiveReads();
EXPECT_EQ(GetExchangeManager().GetNumActiveExchanges(), 0u);
}

TEST_F(TestRead, TestSubscribeFailed_OnActiveModeNotification)
{
auto sessionHandle = GetSessionBobToAlice();
Expand Down Expand Up @@ -2393,7 +2525,7 @@ TEST_F(TestRead, TestSubscribeFailed_OnActiveModeNotification)
GetLoopback().mNumMessagesToDrop = 0;
callback.ClearCounters();
InteractionModelEngine::GetInstance()->OnActiveModeNotification(
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()));
ScopedNodeId(readClient.GetPeerNodeId(), readClient.GetFabricIndex()), static_cast<uint64_t>(0));
//
// Drive servicing IO till we have established a subscription.
//
Expand Down
Loading