Skip to content

Commit 1d28b3a

Browse files
committed
Add thread-safe methods to trigger notifications and prepare work in executor
Signed-off-by: HarunTeper <harun.teper@tu-dortmund.de>
1 parent c5b5f2e commit 1d28b3a

File tree

3 files changed

+110
-10
lines changed

3 files changed

+110
-10
lines changed

rclcpp/include/rclcpp/executor.hpp

+37
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,14 @@ class Executor
455455
void
456456
execute_any_executable(AnyExecutable & any_exec);
457457

458+
/// Trigger the notify guard condition to wake up the executor.
459+
/**
460+
* This function is thread safe.
461+
*/
462+
RCLCPP_PUBLIC
463+
void
464+
trigger_executor_notify();
465+
458466
/// Run subscription executable.
459467
/**
460468
* Do necessary setup and tear-down as well as executing the subscription.
@@ -508,6 +516,27 @@ class Executor
508516
void
509517
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
510518

519+
/// Prepare the wait set to bee waited on.
520+
/**
521+
* Builds a set of waitable entities, which are passed to the middleware.
522+
* After building wait set, waits on middleware to notify.
523+
* \throws std::runtime_error if the wait set can be cleared
524+
*/
525+
RCLCPP_PUBLIC
526+
void
527+
prepare_work();
528+
529+
/// Block until more work becomes avilable or timeout is reached.
530+
/**
531+
* Builds a set of waitable entities, which are passed to the middleware.
532+
* After building wait set, waits on middleware to notify.
533+
* \param[in] timeout duration to wait for new work to become available.
534+
* \throws std::runtime_error if the wait set can be cleared
535+
*/
536+
RCLCPP_PUBLIC
537+
void
538+
wait_for_work_simple(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
539+
511540
/// Check for executable in ready state and populate union structure.
512541
/**
513542
* \param[out] any_executable populated union structure of ready executable
@@ -556,6 +585,10 @@ class Executor
556585

557586
mutable std::mutex mutex_;
558587

588+
mutable std::mutex notify_mutex_;
589+
590+
std::vector<rclcpp::CallbackGroup::SharedPtr> wait_for_work_cbgs_;
591+
559592
/// The context associated with this executor.
560593
std::shared_ptr<rclcpp::Context> context_;
561594

@@ -583,6 +616,10 @@ class Executor
583616
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
584617
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
585618

619+
/// Previous WaitSet to be waited on.
620+
rclcpp::WaitSet previous_wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
621+
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> previous_wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
622+
586623
/// Hold the current state of the collection being waited on by the waitset
587624
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
588625
mutex_);

rclcpp/src/rclcpp/executor.cpp

+65-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "rcl/allocator.h"
2727
#include "rcl/error_handling.h"
2828
#include "rclcpp/executors/executor_notify_waitable.hpp"
29+
#include "rclcpp/logging.hpp"
2930
#include "rclcpp/subscription_wait_set_mask.hpp"
3031
#include "rcpputils/scope_exit.hpp"
3132

@@ -480,9 +481,18 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
480481
const std::shared_ptr<void> & const_data = any_exec.data;
481482
any_exec.waitable->execute(const_data);
482483
}
484+
}
483485

484-
// Reset the callback_group, regardless of type
485-
any_exec.callback_group->can_be_taken_from().store(true);
486+
void
487+
Executor::trigger_executor_notify()
488+
{
489+
try {
490+
interrupt_guard_condition_->trigger();
491+
} catch (const rclcpp::exceptions::RCLError & ex) {
492+
throw std::runtime_error(
493+
std::string(
494+
"Failed to trigger guard condition on callback group change: ") + ex.what());
495+
}
486496
}
487497

488498
template<typename Taker, typename Handler>
@@ -770,6 +780,53 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
770780
}
771781
}
772782

783+
void Executor::prepare_work()
784+
{
785+
{
786+
std::lock_guard<std::mutex> guard(mutex_);
787+
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
788+
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Rebuilding executor entities");
789+
this->collect_entities();
790+
}
791+
792+
auto callback_groups = this->collector_.get_all_callback_groups();
793+
wait_for_work_cbgs_.resize(callback_groups.size());
794+
for(const auto & w_ptr : callback_groups) {
795+
auto shr_ptr = w_ptr.lock();
796+
if(shr_ptr) {
797+
wait_for_work_cbgs_.push_back(std::move(shr_ptr));
798+
}
799+
}
800+
}
801+
}
802+
803+
void
804+
Executor::wait_for_work_simple(std::chrono::nanoseconds timeout)
805+
{
806+
TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
807+
808+
this->wait_result_.reset();
809+
this->wait_result_.emplace(wait_set_.wait(timeout));
810+
811+
// drop references to the callback groups, before trying to execute anything
812+
wait_for_work_cbgs_.clear();
813+
814+
if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
815+
RCUTILS_LOG_WARN_NAMED(
816+
"rclcpp",
817+
"empty wait set received in wait(). This should never happen.");
818+
} else {
819+
if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
820+
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Getting wait set");
821+
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
822+
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
823+
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Executing notify waitable");
824+
current_notify_waitable_->execute(current_notify_waitable_->take_data());
825+
}
826+
}
827+
}
828+
}
829+
773830
bool
774831
Executor::get_next_ready_executable(AnyExecutable & any_executable)
775832
{
@@ -896,20 +953,26 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
896953
bool
897954
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
898955
{
956+
notify_mutex_.lock();
899957
bool success = false;
900958
// Check to see if there are any subscriptions or timers needing service
901959
// TODO(wjwwood): improve run to run efficiency of this function
902960
success = get_next_ready_executable(any_executable);
903961
// If there are none
904962
if (!success) {
905963
// Wait for subscriptions or timers to work on
964+
prepare_work();
965+
notify_mutex_.unlock();
906966
wait_for_work(timeout);
907967
if (!spinning.load()) {
908968
return false;
909969
}
910970
// Try again
911971
success = get_next_ready_executable(any_executable);
912972
}
973+
else {
974+
notify_mutex_.unlock();
975+
}
913976
return success;
914977
}
915978

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

+8-8
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ MultiThreadedExecutor::run(size_t this_thread_number)
9999

100100
execute_any_executable(any_exec);
101101

102-
if (any_exec.callback_group &&
103-
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
104102
{
105-
try {
106-
interrupt_guard_condition_->trigger();
107-
} catch (const rclcpp::exceptions::RCLError & ex) {
108-
throw std::runtime_error(
109-
std::string(
110-
"Failed to trigger guard condition on callback group change: ") + ex.what());
103+
std::lock_guard wait_lock{notify_mutex_};
104+
105+
any_exec.callback_group->can_be_taken_from().store(true);
106+
107+
if (any_exec.callback_group &&
108+
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
109+
{
110+
trigger_executor_notify();
111111
}
112112
}
113113

0 commit comments

Comments
 (0)