Skip to content

Commit c952a9a

Browse files
committed
Simplify RPC code by setting transparent subscribe
1 parent ff12ada commit c952a9a

File tree

6 files changed

+124
-100
lines changed

6 files changed

+124
-100
lines changed

examples/chef/common/chef-rpc-actions-worker.cpp

+69-17
Original file line numberDiff line numberDiff line change
@@ -36,48 +36,100 @@ using namespace chip::rpc;
3636

3737
static std::map<ClusterId, ActionsDelegate *> gActionsDelegateMap {};
3838

39+
ActionsDelegate * RpcFindActionsDelegate(ClusterId clusterId)
40+
{
41+
if (gActionsDelegateMap.find(clusterId) != gActionsDelegateMap.end()) {
42+
return gActionsDelegateMap[clusterId];
43+
}
44+
45+
return nullptr;
46+
}
47+
3948
static void RpcActionsTaskCallback(System::Layer * systemLayer, void * data)
4049
{
41-
//printf("\033[41m %s , %d, endpointId=%d, clusterId=%d \033[0m \n", __func__, __LINE__, queue->endpointId, queue->clusterId);
50+
ActionTask task = ChefRpcActionsWorker::Instance().PopActionQueue();
4251

43-
// struct ActionsDelegate * delegate = RpcFindActionsDelegate(queue->clusterId);
44-
// if ( nullptr == delegate ) {
45-
// TBD: Error cluster not registered
46-
// return;
47-
// }
52+
printf("\033[41m %s , %d, endpointId=%d, clusterId=%d \033[0m \n", __func__, __LINE__, task.endpointId, task.clusterId);
4853

54+
ActionsDelegate * delegate = RpcFindActionsDelegate(task.clusterId);
55+
if ( nullptr == delegate ) {
56+
// TBD: Error cluster not registered
57+
return;
58+
}
59+
60+
ActionType type = static_cast<ActionType>(task.type);
61+
62+
switch (type) {
63+
case ActionType::WRITE_ATTRIBUTE:
64+
{
65+
delegate->AttributeWriteHandler(static_cast<chip::AttributeId>(task.actionId), task.args);
66+
}
67+
break;
68+
case ActionType::RUN_COMMAND:
69+
{
70+
delegate->CommandHandler(static_cast<chip::CommandId>(task.actionId), task.args);
71+
}
72+
break;
73+
case ActionType::EMIT_EVENT:
74+
{
75+
delegate->EventHandler(static_cast<chip::EventId>(task.actionId), task.args);
76+
}
77+
break;
78+
default:
79+
break;
80+
}
4981
// TBD: insert the queue t ActionHandler's queue
5082
// delete queue;
5183
}
5284

53-
bool ChefRpcActionsWorker::publishAction(chip::rpc::ActionTask task)
85+
bool ChefRpcActionsCallback(EndpointId endpointId, ClusterId clusterId, uint8_t type, uint32_t delayMs, uint32_t actionId, std::vector<uint32_t> args)
5486
{
55-
bool kickTimer = queue.size() == 0;
87+
ActionTask task(endpointId, clusterId, static_cast<ActionType>(type), delayMs, actionId, args);
88+
// TBD: Stack lock
89+
return ChefRpcActionsWorker::Instance().EnqueueAction(task);
90+
}
5691

57-
queue.push(task);
92+
bool ChefRpcActionsWorker::EnqueueAction(ActionTask task)
93+
{
94+
bool kickTimer = false;
5895

96+
if (queue.empty()) {
97+
queue.push(task);
98+
kickTimer = true; // kick timer when the first task is adding to the queue
99+
}
59100
if (kickTimer) {
60101
(void) DeviceLayer::SystemLayer().StartTimer(System::Clock::Milliseconds32(10), RpcActionsTaskCallback, this);
61102
}
62103
return true;
63104
}
64105

65-
struct ActionsDelegate * RpcFindActionsDelegate(ClusterId clusterId)
106+
ActionTask ChefRpcActionsWorker::PopActionQueue()
66107
{
67-
if (gActionsDelegateMap.find(clusterId) != gActionsDelegateMap.end()) {
68-
return gActionsDelegateMap[clusterId];
69-
}
108+
// assert !queue.empty()
109+
ActionTask task = queue.front();
110+
queue.pop();
70111

71-
return nullptr;
112+
return task;
72113
}
73114

74-
75-
void ActionsDelegate::RegisterRpcActionsDelegate(ClusterId clusterId, ActionsDelegate * delegate)
115+
void ChefRpcActionsWorker::RegisterRpcActionsDelegate(ClusterId clusterId, ActionsDelegate * delegate)
76116
{
77117
if ( nullptr == RpcFindActionsDelegate(clusterId) ) {
78118
gActionsDelegateMap[clusterId] = delegate;
79119
return;
80120
}
81-
82121
// TBD: print already registered
83122
}
123+
124+
ChefRpcActionsWorker::ChefRpcActionsWorker()
125+
{
126+
chip::rpc::SubscribeActions(ChefRpcActionsCallback);
127+
}
128+
129+
static ChefRpcActionsWorker instance;
130+
131+
ChefRpcActionsWorker & ChefRpcActionsWorker::Instance()
132+
{
133+
return instance;
134+
}
135+

examples/chef/common/chef-rpc-actions-worker.h

+25-10
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,39 @@ class ActionsDelegate
4040
virtual void CommandHandler(chip::CommandId commandId, std::vector<uint32_t>args) {};
4141
virtual void EventHandler(chip::EventId eventId, std::vector<uint32_t>args) {};
4242

43-
static void RegisterRpcActionsDelegate(ClusterId clusterId, ActionsDelegate * delegate);
44-
4543
protected:
4644
EndpointId mEndpointId;
4745
ClusterId mClusterId;
4846
};
4947

50-
} // namespace app
51-
} // namespace chip
48+
struct ActionTask {
49+
chip::EndpointId endpointId;
50+
chip::ClusterId clusterId;
51+
chip::rpc::ActionType type; // Aligned with Storage buf
52+
uint32_t delayMs;
53+
uint32_t actionId;
54+
std::vector<uint32_t> args;
55+
ActionTask(chip::EndpointId e, chip::ClusterId c,
56+
chip::rpc::ActionType t, uint32_t d, uint32_t i, std::vector<uint32_t> a): endpointId(e), clusterId(c), type(t), delayMs(d), actionId(i), args(a) {};
57+
~ActionTask() {};
58+
};
5259

53-
class ChefRpcActionsWorker: public chip::rpc::ActionsSubscriber
60+
class ChefRpcActionsWorker
5461
{
5562
public:
56-
ChefRpcActionsWorker() = default;
57-
~ChefRpcActionsWorker() override {};
58-
bool publishAction(chip::rpc::ActionTask task) override;
63+
static ChefRpcActionsWorker& Instance();
64+
65+
ChefRpcActionsWorker();
5966

60-
private:
61-
std::queue<chip::rpc::ActionTask> queue;
67+
bool EnqueueAction(ActionTask task);
68+
ActionTask PopActionQueue();
69+
void RegisterRpcActionsDelegate(ClusterId clusterId, ActionsDelegate * delegate);
6270

71+
private:
72+
std::queue<ActionTask> queue;
6373
};
74+
75+
76+
} // namespace app
77+
} // namespace chip
78+

examples/chef/common/clusters/switch/SwitchManager.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void emberAfSwitchClusterInitCallback(EndpointId endpointId)
7171
{
7272
ChipLogProgress(Zcl, "Chef: emberAfSwitchClusterInitCallback");
7373
printf("\033[44m %s, %d, Switch::ID=%u \033[0m \n", __func__, __LINE__, Switch::Id);
74-
ActionsDelegate::RegisterRpcActionsDelegate(Clusters::Switch::Id, new SwitchActionsDelegate(endpointId, Clusters::Switch::Id, new SwitchEventHandler(endpointId)));
74+
ChefRpcActionsWorker::Instance().RegisterRpcActionsDelegate(Clusters::Switch::Id, new SwitchActionsDelegate(endpointId, Clusters::Switch::Id, new SwitchEventHandler(endpointId)));
7575
}
7676

7777

examples/common/pigweed/rpc_services/Actions.h

+20-17
Original file line numberDiff line numberDiff line change
@@ -36,35 +36,38 @@ namespace rpc {
3636
class Actions final : public pw_rpc::nanopb::Actions::Service<Actions>
3737
{
3838
public:
39+
enum class Type: uint8_t {
40+
Attribute = 0,
41+
Command = 1,
42+
Event = 2,
43+
};
44+
3945
::pw::Status Set( const chip_rpc_ActionsWrite & request, ::pw_protobuf_Empty & response)
4046
{
4147
printf("\033[41m %s, %d, request.endpoint_id=%d, request.cluster_id=%d \033[0m \n", __func__, __LINE__, request.endpoint_id, request.cluster_id);
4248

43-
std::queue<Action> actionQueue;
44-
mActionsSubscriber(request.endpoint_id, request.cluster_id, actionQueue);
49+
Type type = Type::Attribute;
50+
uint32_t delayMs = 0;
51+
uint32_t actionId = 0;
52+
std::vector<uint32_t> args;
53+
mActionsSubscribeCallback(request.endpoint_id, request.cluster_id, static_cast<uint8_t>(type), delayMs, actionId, args);
4554

4655
return pw::OkStatus();
4756
}
4857

49-
enum class Type: uint8_t {
50-
Attribute = 0,
51-
Command = 1,
52-
Event = 2,
53-
};
54-
55-
struct Action {
56-
Type type;
57-
uint32_t delayMs;
58-
uint32_t actionId;
59-
std::vector<uint32_t> args;
60-
};
58+
// struct Action {
59+
// Type type;
60+
// uint32_t delayMs;
61+
// uint32_t actionId;
62+
// std::vector<uint32_t> args;
63+
// };
6164

62-
using RpcActionsSubscriber = void (*)(EndpointId endpointId, ClusterId clusterId, std::queue<Action>);
65+
using RpcActionsSubscribeCallback = bool (*)(EndpointId endpointId, ClusterId clusterId, uint8_t type, uint32_t delayMs, uint32_t actionId, std::vector<uint32_t> args);
6366

64-
void RegisterActionsSubscriber(RpcActionsSubscriber subscriber) { mActionsSubscriber = subscriber; };
67+
void SubscribeActions(RpcActionsSubscribeCallback subscriber) { mActionsSubscribeCallback = subscriber; };
6568

6669
private:
67-
RpcActionsSubscriber mActionsSubscriber;
70+
RpcActionsSubscribeCallback mActionsSubscribeCallback;
6871

6972
};
7073

examples/platform/linux/Rpc.cpp

+7-35
Original file line numberDiff line numberDiff line change
@@ -44,40 +44,6 @@
4444

4545
#if defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
4646
#include "pigweed/rpc_services/Actions.h"
47-
namespace chip {
48-
namespace rpc {
49-
50-
ActionsSubscriber * gActionsSubscriber = nullptr;
51-
52-
void RegisterActionsSubscriber(ActionsSubscriber * subscriber)
53-
{
54-
gActionsSubscriber = subscriber;
55-
}
56-
57-
void RpcActionsDispatch(EndpointId endpointId, ClusterId clusterId, std::queue<Actions::Action> rpcActions)
58-
{
59-
ChipLogProgress(NotSpecified, "Receiving the Rpc Actions to be dispatched, endpointId=0x%x, clusterId=0x%x, rpcActions count=%lu", endpointId, clusterId, rpcActions.size());
60-
// std::queue<ActionTask> * queue = new std::queue<ActionTask>();
61-
printf("\033[41m %s , %d, rpcActions.type=%u, rpcAction.delayMs = %d \033[0m \n", __func__, __LINE__, to_underlying(rpcActions.front().type), rpcActions.front().delayMs);
62-
63-
for (; !rpcActions.empty();) {
64-
Actions::Action action = rpcActions.front();
65-
66-
// Since application cannot diretly include Actions.h, so the event is relayed by Rpc.cpp to the subscriber
67-
ActionTask task(endpointId, clusterId, static_cast<chip::rpc::ActionType>(to_underlying(action.type)), action.delayMs, action.actionId, action.args);
68-
69-
if (nullptr != gActionsSubscriber) {
70-
gActionsSubscriber->publishAction(task);
71-
}
72-
// TBD: insert to Device Queue
73-
74-
rpcActions.pop();
75-
}
76-
77-
}
78-
79-
} // rpc
80-
} // chip
8147
#endif // defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
8248

8349
#if defined(PW_RPC_LIGHTING_SERVICE) && PW_RPC_LIGHTING_SERVICE
@@ -142,7 +108,6 @@ void RegisterServices(pw::rpc::Server & server)
142108
{
143109
#if defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
144110
server.RegisterService(actions_service);
145-
actions_service.RegisterActionsSubscriber(RpcActionsDispatch);
146111
#endif // defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
147112

148113
#if defined(PW_RPC_ATTRIBUTE_SERVICE) && PW_RPC_ATTRIBUTE_SERVICE
@@ -173,6 +138,13 @@ void RegisterServices(pw::rpc::Server & server)
173138

174139
} // namespace
175140

141+
#if defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
142+
void SubscribeActions(RpcActionsSubscribeCallback subscriber)
143+
{
144+
actions_service.SubscribeActions(subscriber);
145+
}
146+
#endif // defined(PW_RPC_ACTIONS_SERVICE) && PW_RPC_ACTIONS_SERVICE
147+
176148
void RunRpcService()
177149
{
178150
pw::rpc::system_server::Init();

examples/platform/linux/Rpc.h

+2-20
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,9 @@ enum class ActionType : uint8_t
3535
EMIT_EVENT = 0x02, // Emit a cluster Events
3636
};
3737

38-
struct ActionTask {
39-
chip::EndpointId endpointId;
40-
chip::ClusterId clusterId;
41-
enum ActionType type; // Aligned with Storage buf
42-
uint32_t delayMs;
43-
uint32_t actionId;
44-
std::vector<uint32_t> args;
45-
ActionTask(chip::EndpointId e, chip::ClusterId c,
46-
ActionType t, uint32_t d, uint32_t i, std::vector<uint32_t> a): endpointId(e), clusterId(c), type(t), delayMs(d), actionId(i), args(a) {};
47-
~ActionTask() {};
48-
};
49-
50-
class ActionsSubscriber {
51-
public:
52-
ActionsSubscriber() = default;
53-
virtual ~ActionsSubscriber() = default;
54-
virtual bool publishAction(ActionTask task) = 0;
55-
private:
56-
};
38+
using RpcActionsSubscribeCallback = bool (*)(EndpointId endpointId, ClusterId clusterId, uint8_t type, uint32_t delayMs, uint32_t actionId, std::vector<uint32_t> args);
5739

58-
void RegisterActionsSubscriber(ActionsSubscriber * subscriber);
40+
void SubscribeActions(RpcActionsSubscribeCallback subscriber);
5941

6042
int Init(uint16_t rpcServerPort);
6143

0 commit comments

Comments
 (0)