Skip to content

Commit c4f57a7

Browse files
authored
add mutex to protect events_executor current entity collection (#2187)
* add mutex to protect events_executor current entity collection and unit-test Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * be more precise with mutex locks; make stress test less stressfull Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> * fix uncrustify error Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com> --------- Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
1 parent d7fdb61 commit c4f57a7

File tree

3 files changed

+93
-15
lines changed

3 files changed

+93
-15
lines changed

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,12 @@ class EventsExecutor : public rclcpp::Executor
274274
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
275275

276276
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
277-
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
278277
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
279278

279+
/// Mutex to protect the current_entities_collection_
280+
std::recursive_mutex collection_mutex_;
281+
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
282+
280283
/// Flag used to reduce the number of unnecessary waitable events
281284
std::atomic<bool> notify_waitable_event_pushed_ {false};
282285

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

+35-14
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
273273
switch (event.type) {
274274
case ExecutorEventType::CLIENT_EVENT:
275275
{
276-
auto client = this->retrieve_entity(
277-
static_cast<const rcl_client_t *>(event.entity_key),
278-
current_entities_collection_->clients);
279-
276+
rclcpp::ClientBase::SharedPtr client;
277+
{
278+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
279+
client = this->retrieve_entity(
280+
static_cast<const rcl_client_t *>(event.entity_key),
281+
current_entities_collection_->clients);
282+
}
280283
if (client) {
281284
for (size_t i = 0; i < event.num_events; i++) {
282285
execute_client(client);
@@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
287290
}
288291
case ExecutorEventType::SUBSCRIPTION_EVENT:
289292
{
290-
auto subscription = this->retrieve_entity(
291-
static_cast<const rcl_subscription_t *>(event.entity_key),
292-
current_entities_collection_->subscriptions);
293+
rclcpp::SubscriptionBase::SharedPtr subscription;
294+
{
295+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
296+
subscription = this->retrieve_entity(
297+
static_cast<const rcl_subscription_t *>(event.entity_key),
298+
current_entities_collection_->subscriptions);
299+
}
293300
if (subscription) {
294301
for (size_t i = 0; i < event.num_events; i++) {
295302
execute_subscription(subscription);
@@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
299306
}
300307
case ExecutorEventType::SERVICE_EVENT:
301308
{
302-
auto service = this->retrieve_entity(
303-
static_cast<const rcl_service_t *>(event.entity_key),
304-
current_entities_collection_->services);
305-
309+
rclcpp::ServiceBase::SharedPtr service;
310+
{
311+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
312+
service = this->retrieve_entity(
313+
static_cast<const rcl_service_t *>(event.entity_key),
314+
current_entities_collection_->services);
315+
}
306316
if (service) {
307317
for (size_t i = 0; i < event.num_events; i++) {
308318
execute_service(service);
@@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
319329
}
320330
case ExecutorEventType::WAITABLE_EVENT:
321331
{
322-
auto waitable = this->retrieve_entity(
323-
static_cast<const rclcpp::Waitable *>(event.entity_key),
324-
current_entities_collection_->waitables);
332+
rclcpp::Waitable::SharedPtr waitable;
333+
{
334+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
335+
waitable = this->retrieve_entity(
336+
static_cast<const rclcpp::Waitable *>(event.entity_key),
337+
current_entities_collection_->waitables);
338+
}
325339
if (waitable) {
326340
for (size_t i = 0; i < event.num_events; i++) {
327341
auto data = waitable->take_data_by_entity_id(event.waitable_data);
@@ -386,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
386400
void
387401
EventsExecutor::refresh_current_collection_from_callback_groups()
388402
{
403+
// Build the new collection
389404
this->entities_collector_->update_collections();
390405
auto callback_groups = this->entities_collector_->get_all_callback_groups();
391406
rclcpp::executors::ExecutorEntitiesCollection new_collection;
@@ -400,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
400415
// To do it, we need to add the notify waitable as an entry in both the new and
401416
// current collections such that it's neither added or removed.
402417
this->add_notify_waitable_to_collection(new_collection.waitables);
418+
419+
// Acquire lock before modifying the current collection
420+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
403421
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
404422

405423
this->refresh_current_collection(new_collection);
@@ -409,6 +427,9 @@ void
409427
EventsExecutor::refresh_current_collection(
410428
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
411429
{
430+
// Acquire lock before modifying the current collection
431+
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
432+
412433
current_entities_collection_->timers.update(
413434
new_collection.timers,
414435
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},

rclcpp/test/rclcpp/executors/test_executors.cpp

+54
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,60 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
796796
}
797797
}
798798

799+
// This test verifies the thread-safety of adding and removing a node
800+
// while the executor is spinning and events are ready.
801+
// This test does not contain expectations, but rather it verifies that
802+
// we can run a "stressful routine" without crashing.
803+
TYPED_TEST(TestExecutors, stressAddRemoveNode)
804+
{
805+
using ExecutorType = TypeParam;
806+
// rmw_connextdds doesn't support events-executor
807+
if (
808+
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
809+
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
810+
{
811+
GTEST_SKIP();
812+
}
813+
814+
ExecutorType executor;
815+
816+
// A timer that is "always" ready (the timer callback doesn't do anything)
817+
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});
818+
819+
// This thread spins the executor until it's cancelled
820+
std::thread spinner_thread([&]() {
821+
executor.spin();
822+
});
823+
824+
// This thread publishes data in a busy loop (the node has a subscription)
825+
std::thread publisher_thread1([&]() {
826+
for (size_t i = 0; i < 100000; i++) {
827+
this->publisher->publish(test_msgs::msg::Empty());
828+
}
829+
});
830+
std::thread publisher_thread2([&]() {
831+
for (size_t i = 0; i < 100000; i++) {
832+
this->publisher->publish(test_msgs::msg::Empty());
833+
}
834+
});
835+
836+
// This thread adds/remove the node that contains the entities in a busy loop
837+
std::thread add_remove_thread([&]() {
838+
for (size_t i = 0; i < 100000; i++) {
839+
executor.add_node(this->node);
840+
executor.remove_node(this->node);
841+
}
842+
});
843+
844+
// Wait for the threads that do real work to finish
845+
publisher_thread1.join();
846+
publisher_thread2.join();
847+
add_remove_thread.join();
848+
849+
executor.cancel();
850+
spinner_thread.join();
851+
}
852+
799853
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
800854
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
801855
{

0 commit comments

Comments
 (0)