Skip to content

Commit 7ada478

Browse files
tehampsonrestyled-commitsandy31415saurabhst
authored andcommitted
Add DeviceSubscriptionManager to manage subscription of fabric-admin (project-chip#35305)
--------- Co-authored-by: Restyled.io <commits@restyled.io> Co-authored-by: Andrei Litvin <andy314@gmail.com> Co-authored-by: saurabhst <s.kumar9@samsung.com>
1 parent 867ba6c commit 7ada478

8 files changed

+272
-28
lines changed

examples/fabric-admin/BUILD.gn

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ static_library("fabric-admin-utils") {
8484
"device_manager/DeviceManager.h",
8585
"device_manager/DeviceSubscription.cpp",
8686
"device_manager/DeviceSubscription.h",
87+
"device_manager/DeviceSubscriptionManager.cpp",
88+
"device_manager/DeviceSubscriptionManager.h",
8789
"device_manager/DeviceSynchronization.cpp",
8890
"device_manager/DeviceSynchronization.h",
8991
]

examples/fabric-admin/commands/pairing/PairingCommand.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,9 @@ void PairingCommand::OnCommissioningComplete(NodeId nodeId, CHIP_ERROR err)
423423
{
424424
// print to console
425425
fprintf(stderr, "New device with Node ID: 0x%lx has been successfully added.\n", nodeId);
426-
DeviceSynchronizer::Instance().StartDeviceSynchronization(CurrentCommissioner(), mNodeId, mDeviceIsICD);
426+
// CurrentCommissioner() has a lifetime that is the entire life of the application itself
427+
// so it is safe to provide to StartDeviceSynchronization.
428+
DeviceSynchronizer::Instance().StartDeviceSynchronization(&CurrentCommissioner(), mNodeId, mDeviceIsICD);
427429
}
428430
else
429431
{
@@ -564,6 +566,8 @@ void PairingCommand::OnCurrentFabricRemove(void * context, NodeId nodeId, CHIP_E
564566
fprintf(stderr, "Device with Node ID: 0x%lx has been successfully removed.\n", nodeId);
565567

566568
#if defined(PW_RPC_ENABLED)
569+
chip::app::InteractionModelEngine::GetInstance()->ShutdownSubscriptions(command->CurrentCommissioner().GetFabricIndex(),
570+
nodeId);
567571
RemoveSynchronizedDevice(nodeId);
568572
#endif
569573
}

examples/fabric-admin/device_manager/DeviceSubscription.cpp

+91-9
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,18 @@ void DeviceSubscription::OnReportEnd()
100100
#if defined(PW_RPC_ENABLED)
101101
AdminCommissioningAttributeChanged(mCurrentAdministratorCommissioningAttributes);
102102
#else
103-
ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
103+
ChipLogError(NotSpecified, "Cannot forward Administrator Commissioning Attribute to fabric bridge: RPC not enabled");
104104
#endif
105105
mChangeDetected = false;
106106
}
107107
}
108108

109109
void DeviceSubscription::OnDone(ReadClient * apReadClient)
110110
{
111-
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
111+
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
112+
// DeviceSubscription.
113+
MoveToState(State::AwaitingDestruction);
114+
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
112115
}
113116

114117
void DeviceSubscription::OnError(CHIP_ERROR error)
@@ -118,6 +121,15 @@ void DeviceSubscription::OnError(CHIP_ERROR error)
118121

119122
void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchangeMgr, const SessionHandle & sessionHandle)
120123
{
124+
if (mState == State::Stopping)
125+
{
126+
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
127+
// DeviceSubscription.
128+
MoveToState(State::AwaitingDestruction);
129+
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
130+
return;
131+
}
132+
VerifyOrDie(mState == State::Connecting);
121133
mClient = std::make_unique<ReadClient>(app::InteractionModelEngine::GetInstance(), &exchangeMgr /* echangeMgr */,
122134
*this /* callback */, ReadClient::InteractionType::Subscribe);
123135
VerifyOrDie(mClient);
@@ -136,25 +148,95 @@ void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchange
136148
if (err != CHIP_NO_ERROR)
137149
{
138150
ChipLogError(NotSpecified, "Failed to issue subscription to AdministratorCommissioning data");
139-
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
151+
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
152+
// DeviceSubscription.
153+
MoveToState(State::AwaitingDestruction);
154+
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
155+
return;
140156
}
157+
MoveToState(State::SubscriptionStarted);
158+
}
159+
160+
void DeviceSubscription::MoveToState(const State aTargetState)
161+
{
162+
mState = aTargetState;
163+
ChipLogDetail(NotSpecified, "DeviceSubscription moving to [%10.10s]", GetStateStr());
164+
}
165+
166+
const char * DeviceSubscription::GetStateStr() const
167+
{
168+
switch (mState)
169+
{
170+
case State::Idle:
171+
return "Idle";
172+
173+
case State::Connecting:
174+
return "Connecting";
175+
176+
case State::Stopping:
177+
return "Stopping";
178+
179+
case State::SubscriptionStarted:
180+
return "SubscriptionStarted";
181+
182+
case State::AwaitingDestruction:
183+
return "AwaitingDestruction";
184+
}
185+
return "N/A";
141186
}
142187

143188
void DeviceSubscription::OnDeviceConnectionFailure(const ScopedNodeId & peerId, CHIP_ERROR error)
144189
{
145-
ChipLogError(NotSpecified, "Device Sync failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
146-
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
190+
VerifyOrDie(mState == State::Connecting || mState == State::Stopping);
191+
ChipLogError(NotSpecified, "DeviceSubscription failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
192+
// TODO(#35333) Figure out how we should recover if we fail to connect and mState == State::Connecting.
193+
194+
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
195+
// DeviceSubscription.
196+
MoveToState(State::AwaitingDestruction);
197+
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
147198
}
148199

149-
void DeviceSubscription::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
200+
CHIP_ERROR DeviceSubscription::StartSubscription(OnDoneCallback onDoneCallback, Controller::DeviceController & controller,
201+
NodeId nodeId)
150202
{
151-
VerifyOrDie(!mSubscriptionStarted);
203+
assertChipStackLockedByCurrentThread();
204+
VerifyOrDie(mState == State::Idle);
152205

153206
mCurrentAdministratorCommissioningAttributes = chip_rpc_AdministratorCommissioningChanged_init_default;
154207
mCurrentAdministratorCommissioningAttributes.node_id = nodeId;
155208
mCurrentAdministratorCommissioningAttributes.window_status =
156209
static_cast<uint32_t>(Clusters::AdministratorCommissioning::CommissioningWindowStatusEnum::kWindowNotOpen);
157-
mSubscriptionStarted = true;
210+
mState = State::Connecting;
211+
mOnDoneCallback = onDoneCallback;
212+
213+
return controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
214+
}
215+
216+
void DeviceSubscription::StopSubscription()
217+
{
218+
assertChipStackLockedByCurrentThread();
219+
VerifyOrDie(mState != State::Idle);
220+
// Something is seriously wrong if we die on the line below
221+
VerifyOrDie(mState != State::AwaitingDestruction);
222+
223+
if (mState == State::Stopping)
224+
{
225+
// Stop is called again while we are still waiting on connected callbacks
226+
return;
227+
}
228+
229+
if (mState == State::Connecting)
230+
{
231+
MoveToState(State::Stopping);
232+
return;
233+
}
158234

159-
controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
235+
// By calling reset on our ReadClient we terminate the subscription.
236+
VerifyOrDie(mClient);
237+
mClient.reset();
238+
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
239+
// DeviceSubscription.
240+
MoveToState(State::AwaitingDestruction);
241+
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
160242
}

examples/fabric-admin/device_manager/DeviceSubscription.h

+26-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "fabric_bridge_service/fabric_bridge_service.pb.h"
2727
#include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"
2828

29+
class DeviceSubscriptionManager;
30+
2931
/// Attribute subscription to attributes that are important to keep track and send to fabric-bridge
3032
/// via RPC when change has been identified.
3133
///
@@ -35,11 +37,18 @@
3537
class DeviceSubscription : public chip::app::ReadClient::Callback
3638
{
3739
public:
40+
using OnDoneCallback = std::function<void(chip::NodeId)>;
41+
3842
DeviceSubscription();
3943

40-
/// Usually called after we have added a synchronized device to fabric-bridge to monitor
41-
/// for any changes that need to be propgated to fabric-bridge.
42-
void StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
44+
CHIP_ERROR StartSubscription(OnDoneCallback onDoneCallback, chip::Controller::DeviceController & controller,
45+
chip::NodeId nodeId);
46+
47+
/// This will trigger stopping the subscription. Once subscription is stopped the OnDoneCallback
48+
/// provided in StartSubscription will be called to indicate that subscription have been terminated.
49+
///
50+
/// Must only be called after StartSubscription was successfully called.
51+
void StopSubscription();
4352

4453
///////////////////////////////////////////////////////////////
4554
// ReadClient::Callback implementation
@@ -57,14 +66,25 @@ class DeviceSubscription : public chip::app::ReadClient::Callback
5766
void OnDeviceConnectionFailure(const chip::ScopedNodeId & peerId, CHIP_ERROR error);
5867

5968
private:
69+
enum class State : uint8_t
70+
{
71+
Idle, ///< Default state that the object starts out in, where no work has commenced
72+
Connecting, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks to be called
73+
Stopping, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks so we can terminate
74+
SubscriptionStarted, ///< We have started a subscription.
75+
AwaitingDestruction, ///< The object has completed its work and is awaiting destruction.
76+
};
77+
78+
void MoveToState(const State aTargetState);
79+
const char * GetStateStr() const;
80+
81+
OnDoneCallback mOnDoneCallback;
6082
std::unique_ptr<chip::app::ReadClient> mClient;
6183

6284
chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
6385
chip::Callback::Callback<chip::OnDeviceConnectionFailure> mOnDeviceConnectionFailureCallback;
6486

6587
chip_rpc_AdministratorCommissioningChanged mCurrentAdministratorCommissioningAttributes;
6688
bool mChangeDetected = false;
67-
// Ensures that DeviceSubscription starts a subscription only once. If instance of
68-
// DeviceSubscription can be reused, the class documentation should be updated accordingly.
69-
bool mSubscriptionStarted = false;
89+
State mState = State::Idle;
7090
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2024 Project CHIP Authors
3+
* All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
#include "DeviceSubscriptionManager.h"
20+
#include "rpc/RpcClient.h"
21+
22+
#include <app/InteractionModelEngine.h>
23+
#include <app/server/Server.h>
24+
25+
#include <app-common/zap-generated/ids/Attributes.h>
26+
#include <app-common/zap-generated/ids/Clusters.h>
27+
#include <device_manager/DeviceManager.h>
28+
29+
using namespace ::chip;
30+
using namespace ::chip::app;
31+
32+
DeviceSubscriptionManager & DeviceSubscriptionManager::Instance()
33+
{
34+
static DeviceSubscriptionManager instance;
35+
return instance;
36+
}
37+
38+
CHIP_ERROR DeviceSubscriptionManager::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
39+
{
40+
assertChipStackLockedByCurrentThread();
41+
auto it = mDeviceSubscriptionMap.find(nodeId);
42+
VerifyOrReturnError((it == mDeviceSubscriptionMap.end()), CHIP_ERROR_INCORRECT_STATE);
43+
44+
auto deviceSubscription = std::make_unique<DeviceSubscription>();
45+
VerifyOrReturnError(deviceSubscription, CHIP_ERROR_NO_MEMORY);
46+
ReturnErrorOnFailure(deviceSubscription->StartSubscription(
47+
[this](NodeId aNodeId) { this->DeviceSubscriptionTerminated(aNodeId); }, controller, nodeId));
48+
49+
mDeviceSubscriptionMap[nodeId] = std::move(deviceSubscription);
50+
return CHIP_NO_ERROR;
51+
}
52+
53+
CHIP_ERROR DeviceSubscriptionManager::RemoveSubscription(chip::NodeId nodeId)
54+
{
55+
assertChipStackLockedByCurrentThread();
56+
auto it = mDeviceSubscriptionMap.find(nodeId);
57+
VerifyOrReturnError((it != mDeviceSubscriptionMap.end()), CHIP_ERROR_NOT_FOUND);
58+
// We cannot safely erase the DeviceSubscription from mDeviceSubscriptionMap.
59+
// After calling StopSubscription we expect DeviceSubscription to eventually
60+
// call the OnDoneCallback we provided in StartSubscription which will call
61+
// DeviceSubscriptionTerminated where it will be erased from the
62+
// mDeviceSubscriptionMap.
63+
it->second->StopSubscription();
64+
return CHIP_NO_ERROR;
65+
}
66+
67+
void DeviceSubscriptionManager::DeviceSubscriptionTerminated(NodeId nodeId)
68+
{
69+
assertChipStackLockedByCurrentThread();
70+
auto it = mDeviceSubscriptionMap.find(nodeId);
71+
// DeviceSubscriptionTerminated is a private method that is expected to only
72+
// be called by DeviceSubscription when it is terminal and is ready to be
73+
// cleaned up and removed. If it is not mapped that means something has gone
74+
// really wrong and there is likely a memory leak somewhere.
75+
VerifyOrDie(it != mDeviceSubscriptionMap.end());
76+
mDeviceSubscriptionMap.erase(nodeId);
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) 2024 Project CHIP Authors
3+
* All rights reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
#pragma once
19+
20+
#include "DeviceSubscription.h"
21+
22+
#include <app/ReadClient.h>
23+
#include <controller/CHIPDeviceController.h>
24+
#include <lib/core/DataModelTypes.h>
25+
26+
#include <memory>
27+
28+
class DeviceSubscriptionManager
29+
{
30+
public:
31+
static DeviceSubscriptionManager & Instance();
32+
33+
/// Usually called after we have added a synchronized device to fabric-bridge to monitor
34+
/// for any changes that need to be propagated to fabric-bridge.
35+
CHIP_ERROR StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
36+
37+
CHIP_ERROR RemoveSubscription(chip::NodeId nodeId);
38+
39+
private:
40+
void DeviceSubscriptionTerminated(chip::NodeId nodeId);
41+
42+
std::unordered_map<chip::NodeId, std::unique_ptr<DeviceSubscription>> mDeviceSubscriptionMap;
43+
};

0 commit comments

Comments
 (0)