Skip to content

Commit 371756b

Browse files
IM: Create ReadHandler after Session Establishment for Subscription Resumption (project-chip#30491)
* IM: Create ReadHandler after Session Establishment for Subscription Resumption * Restyled by clang-format * Make SubscriptionResumptionHelper inherits from SubscriptionInfo * review changes * Rename Helper to SessionEstablisher * Restyled by clang-format * RAII changes * Restyled by clang-format --------- Co-authored-by: Restyled.io <commits@restyled.io>
1 parent cb4777f commit 371756b

7 files changed

+252
-98
lines changed

src/app/BUILD.gn

+7
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,13 @@ static_library("app") {
232232
]
233233
}
234234

235+
if (chip_persist_subscriptions) {
236+
sources += [
237+
"SubscriptionResumptionSessionEstablisher.cpp",
238+
"SubscriptionResumptionSessionEstablisher.h",
239+
]
240+
}
241+
235242
if (chip_enable_read_client) {
236243
sources += [
237244
"BufferedReadCallback.cpp",

src/app/InteractionModelEngine.cpp

+20-15
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@
4141
namespace chip {
4242
namespace app {
4343

44+
class AutoReleaseSubscriptionInfoIterator
45+
{
46+
public:
47+
AutoReleaseSubscriptionInfoIterator(SubscriptionResumptionStorage::SubscriptionInfoIterator * iterator) : mIterator(iterator){};
48+
~AutoReleaseSubscriptionInfoIterator() { mIterator->Release(); }
49+
50+
SubscriptionResumptionStorage::SubscriptionInfoIterator * operator->() const { return mIterator; }
51+
52+
private:
53+
SubscriptionResumptionStorage::SubscriptionInfoIterator * mIterator;
54+
};
55+
4456
using Protocols::InteractionModel::Status;
4557

4658
Global<InteractionModelEngine> sInteractionModelEngine;
@@ -1872,7 +1884,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
18721884
bool resumedSubscriptions = false;
18731885
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
18741886
SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo;
1875-
auto * iterator = imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions();
1887+
AutoReleaseSubscriptionInfoIterator iterator(imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions());
18761888
while (iterator->Next(subscriptionInfo))
18771889
{
18781890
// If subscription happens between reboot and this timer callback, it's already live and should skip resumption
@@ -1890,31 +1902,24 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
18901902
continue;
18911903
}
18921904

1893-
auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
1894-
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
1895-
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
1896-
requestedEventPathCount))
1905+
auto subscriptionResumptionSessionEstablisher = Platform::MakeUnique<SubscriptionResumptionSessionEstablisher>();
1906+
if (subscriptionResumptionSessionEstablisher == nullptr)
18971907
{
1898-
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
1899-
iterator->Release();
1908+
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher");
19001909
return;
19011910
}
19021911

1903-
ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
1904-
if (handler == nullptr)
1912+
if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) !=
1913+
CHIP_NO_ERROR)
19051914
{
1906-
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
1907-
iterator->Release();
1915+
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
19081916
return;
19091917
}
1910-
1911-
ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
1912-
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
1918+
subscriptionResumptionSessionEstablisher.release();
19131919
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
19141920
resumedSubscriptions = true;
19151921
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
19161922
}
1917-
iterator->Release();
19181923

19191924
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
19201925
// If no persisted subscriptions needed resumption then all resumption retries are done

src/app/InteractionModelEngine.h

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <app/AppConfig.h>
3030
#include <app/MessageDef/AttributeReportIBs.h>
3131
#include <app/MessageDef/ReportDataMessage.h>
32+
#include <app/SubscriptionResumptionSessionEstablisher.h>
3233
#include <lib/core/CHIPCore.h>
3334
#include <lib/support/CodeUtils.h>
3435
#include <lib/support/DLLUtil.h>
@@ -380,6 +381,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
380381
friend class reporting::Engine;
381382
friend class TestCommandInteraction;
382383
friend class TestInteractionModelEngine;
384+
friend class SubscriptionResumptionSessionEstablisher;
383385
using Status = Protocols::InteractionModel::Status;
384386

385387
void OnDone(CommandHandler & apCommandObj) override;

src/app/ReadHandler.cpp

+33-60
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
5656
InteractionType aInteractionType, Observer * observer) :
5757
mExchangeCtx(*this),
5858
mManagementCallback(apCallback)
59-
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
60-
,
61-
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
62-
#endif
6359
{
6460
VerifyOrDie(apExchangeContext != nullptr);
6561

@@ -84,8 +80,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
8480

8581
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
8682
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
87-
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
88-
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
83+
mExchangeCtx(*this), mManagementCallback(apCallback)
8984
{
9085
mInteractionType = InteractionType::Subscribe;
9186
mFlags.ClearAll();
@@ -94,41 +89,57 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
9489
mObserver = observer;
9590
}
9691

97-
void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
98-
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
92+
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
93+
SubscriptionResumptionSessionEstablisher & resumptionSessionEstablisher)
9994
{
100-
mSubscriptionId = subscriptionInfo.mSubscriptionId;
101-
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
102-
mMaxInterval = subscriptionInfo.mMaxInterval;
103-
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
95+
mSubscriptionId = resumptionSessionEstablisher.mSubscriptionInfo.mSubscriptionId;
96+
mMinIntervalFloorSeconds = resumptionSessionEstablisher.mSubscriptionInfo.mMinInterval;
97+
mMaxInterval = resumptionSessionEstablisher.mSubscriptionInfo.mMaxInterval;
98+
SetStateFlag(ReadHandlerFlags::FabricFiltered, resumptionSessionEstablisher.mSubscriptionInfo.mFabricFiltered);
10499

105100
// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
106101
// the object pool managed by the IM engine
107-
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
102+
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++)
108103
{
109-
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
110-
CHIP_ERROR err =
111-
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
104+
AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams();
105+
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
112106
if (err != CHIP_NO_ERROR)
113107
{
114108
Close();
115109
return;
116110
}
117111
}
118-
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
112+
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++)
119113
{
120-
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
121-
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
114+
EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams();
115+
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
122116
if (err != CHIP_NO_ERROR)
123117
{
124118
Close();
125119
return;
126120
}
127121
}
128122

129-
// Ask IM engine to start CASE session with subscriber
130-
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
131-
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
123+
mSessionHandle.Grab(sessionHandle);
124+
125+
SetStateFlag(ReadHandlerFlags::ActiveSubscription);
126+
127+
auto * appCallback = mManagementCallback.GetAppCallback();
128+
if (appCallback)
129+
{
130+
appCallback->OnSubscriptionEstablished(*this);
131+
}
132+
// Notify the observer that a subscription has been resumed
133+
mObserver->OnSubscriptionEstablished(this);
134+
135+
MoveToState(HandlerState::CanStartReporting);
136+
137+
ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
138+
while (attributePath)
139+
{
140+
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
141+
attributePath = attributePath->mpNext;
142+
}
132143
}
133144

134145
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
@@ -893,43 +904,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
893904
SetStateFlag(aFlag, false);
894905
}
895906

896-
void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
897-
const SessionHandle & sessionHandle)
898-
{
899-
ReadHandler * const _this = static_cast<ReadHandler *>(context);
900-
901-
_this->mSessionHandle.Grab(sessionHandle);
902-
903-
_this->SetStateFlag(ReadHandlerFlags::ActiveSubscription);
904-
905-
auto * appCallback = _this->mManagementCallback.GetAppCallback();
906-
if (appCallback)
907-
{
908-
appCallback->OnSubscriptionEstablished(*_this);
909-
}
910-
// Notify the observer that a subscription has been resumed
911-
_this->mObserver->OnSubscriptionEstablished(_this);
912-
913-
_this->MoveToState(HandlerState::CanStartReporting);
914-
915-
ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
916-
while (attributePath)
917-
{
918-
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
919-
attributePath = attributePath->mpNext;
920-
}
921-
}
922-
923-
void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
924-
{
925-
ReadHandler * const _this = static_cast<ReadHandler *>(context);
926-
VerifyOrDie(_this != nullptr);
927-
928-
// TODO: Have a retry mechanism tied to wake interval for IC devices
929-
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
930-
err.Format());
931-
_this->Close();
932-
}
933-
934907
} // namespace app
935908
} // namespace chip

src/app/ReadHandler.h

+11-23
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <app/MessageDef/EventPathIBs.h>
3939
#include <app/ObjectList.h>
4040
#include <app/OperationalSessionSetup.h>
41+
#include <app/SubscriptionResumptionSessionEstablisher.h>
4142
#include <app/SubscriptionResumptionStorage.h>
4243
#include <lib/core/CHIPCallback.h>
4344
#include <lib/core/CHIPCore.h>
@@ -253,6 +254,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
253254
return CHIP_NO_ERROR;
254255
}
255256

257+
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
258+
/**
259+
*
260+
* @brief Initialize a ReadHandler for a resumed subsciption
261+
*
262+
* Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session
263+
*/
264+
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher);
265+
#endif
266+
256267
private:
257268
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
258269
EventNumber & GetEventMin() { return mEventMin; }
@@ -302,18 +313,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
302313
*/
303314
void OnInitialRequest(System::PacketBufferHandle && aPayload);
304315

305-
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
306-
/**
307-
*
308-
* @brief Resume a persisted subscription
309-
*
310-
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
311-
* with the subscriber if one doesn't already exist, and send full priming report when connected.
312-
*/
313-
void ResumeSubscription(CASESessionManager & caseSessionManager,
314-
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
315-
#endif
316-
317316
/**
318317
* Send ReportData to initiator
319318
*
@@ -485,11 +484,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
485484
/// @param aFlag Flag to clear
486485
void ClearStateFlag(ReadHandlerFlags aFlag);
487486

488-
// Helpers for continuing the subscription resumption
489-
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
490-
const SessionHandle & sessionHandle);
491-
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);
492-
493487
AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);
494488

495489
// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
@@ -571,12 +565,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
571565

572566
// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
573567
Observer * mObserver = nullptr;
574-
575-
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
576-
// Callbacks to handle server-initiated session success/failure
577-
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
578-
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
579-
#endif
580568
};
581569
} // namespace app
582570
} // namespace chip

0 commit comments

Comments
 (0)