Skip to content

Commit fce021b

Browse files
DensoADASJoshua HamppJoshua Hamppalsora
authored
Feature/available capacity of ipm (#2173)
* added available_capacity to get the lowest number of free capacity for intra-process communication for a publisher Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * added unit tests for available_capacity Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com> Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * fixed typos in comments Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * Updated warning Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com> Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * returning 0 if ipm is disabled in lowest_available_ipm_capacity Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * return 0 if no subscribers are present in lowest_available_capacity Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * updated unit test Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * update unit test Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com> * moved available_capacity to a lambda function to be able to handle subscriptions which went out of scope Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> * updated unit test to check subscriptions which went out of scope Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> --------- Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de> Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com> Co-authored-by: Joshua Hampp <j.hampp@denso-adas.de> Co-authored-by: Joshua Hampp <j.hampp@eu.denso.com> Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>
1 parent c4f57a7 commit fce021b

12 files changed

+337
-0
lines changed

rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class BufferImplementationBase
3333

3434
virtual void clear() = 0;
3535
virtual bool has_data() const = 0;
36+
virtual size_t available_capacity() const = 0;
3637
};
3738

3839
} // namespace buffers

rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class IntraProcessBufferBase
4444

4545
virtual bool has_data() const = 0;
4646
virtual bool use_take_shared_method() const = 0;
47+
virtual size_t available_capacity() const = 0;
4748
};
4849

4950
template<
@@ -143,6 +144,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
143144
return std::is_same<BufferT, MessageSharedPtr>::value;
144145
}
145146

147+
size_t available_capacity() const override
148+
{
149+
return buffer_->available_capacity();
150+
}
151+
146152
private:
147153
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
148154

rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp

+23
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,18 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
148148
return is_full_();
149149
}
150150

151+
/// Get the remaining capacity to store messages
152+
/**
153+
* This member function is thread-safe.
154+
*
155+
* \return the number of free capacity for new messages
156+
*/
157+
size_t available_capacity() const
158+
{
159+
std::lock_guard<std::mutex> lock(mutex_);
160+
return available_capacity_();
161+
}
162+
151163
void clear()
152164
{
153165
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
@@ -189,6 +201,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
189201
return size_ == capacity_;
190202
}
191203

204+
/// Get the remaining capacity to store messages
205+
/**
206+
* This member function is not thread-safe.
207+
*
208+
* \return the number of free capacity for new messages
209+
*/
210+
inline size_t available_capacity_() const
211+
{
212+
return capacity_ - size_;
213+
}
214+
192215
size_t capacity_;
193216

194217
std::vector<BufferT> ring_buffer_;

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ class IntraProcessManager
306306
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
307307
get_subscription_intra_process(uint64_t intra_process_subscription_id);
308308

309+
/// Return the lowest available capacity for all subscription buffers for a publisher id.
310+
RCLCPP_PUBLIC
311+
size_t
312+
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;
313+
309314
private:
310315
struct SplittedSubscriptions
311316
{

rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
6262
void
6363
add_to_wait_set(rcl_wait_set_t * wait_set) override;
6464

65+
RCLCPP_PUBLIC
66+
virtual
67+
size_t
68+
available_capacity() const = 0;
69+
6570
bool
6671
is_ready(rcl_wait_set_t * wait_set) override = 0;
6772

rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
169169
return buffer_->use_take_shared_method();
170170
}
171171

172+
size_t available_capacity() const override
173+
{
174+
return buffer_->available_capacity();
175+
}
176+
172177
protected:
173178
void
174179
trigger_guard_condition() override

rclcpp/include/rclcpp/publisher_base.hpp

+11
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,17 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
215215
std::vector<rclcpp::NetworkFlowEndpoint>
216216
get_network_flow_endpoints() const;
217217

218+
/// Return the lowest available capacity for all subscription buffers.
219+
/**
220+
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
221+
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
222+
* On failure return 0.
223+
* \return lowest buffer capacity for all subscriptions
224+
*/
225+
RCLCPP_PUBLIC
226+
size_t
227+
lowest_available_ipm_capacity() const;
228+
218229
/// Wait until all published messages are acknowledged or until the specified timeout elapses.
219230
/**
220231
* This method waits until all published messages are acknowledged by all matching

rclcpp/src/rclcpp/intra_process_manager.cpp

+47
Original file line numberDiff line numberDiff line change
@@ -225,5 +225,52 @@ IntraProcessManager::can_communicate(
225225
return true;
226226
}
227227

228+
size_t
229+
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
230+
{
231+
size_t capacity = std::numeric_limits<size_t>::max();
232+
233+
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
234+
if (publisher_it == pub_to_subs_.end()) {
235+
// Publisher is either invalid or no longer exists.
236+
RCLCPP_WARN(
237+
rclcpp::get_logger("rclcpp"),
238+
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
239+
return 0u;
240+
}
241+
242+
if (publisher_it->second.take_shared_subscriptions.empty() &&
243+
publisher_it->second.take_ownership_subscriptions.empty())
244+
{
245+
// no subscriptions available
246+
return 0u;
247+
}
248+
249+
auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
250+
{
251+
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
252+
if (subscription_it != subscriptions_.end()) {
253+
auto subscription = subscription_it->second.lock();
254+
if (subscription) {
255+
capacity = std::min(capacity, subscription->available_capacity());
256+
}
257+
} else {
258+
// Subscription is either invalid or no longer exists.
259+
RCLCPP_WARN(
260+
rclcpp::get_logger("rclcpp"),
261+
"Calling available_capacity for invalid or no longer existing subscription id");
262+
}
263+
};
264+
265+
for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
266+
available_capacity(sub_id);
267+
}
268+
269+
for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
270+
available_capacity(sub_id);
271+
}
272+
273+
return capacity;
274+
}
228275
} // namespace experimental
229276
} // namespace rclcpp

rclcpp/src/rclcpp/publisher_base.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin
384384

385385
return network_flow_endpoint_vector;
386386
}
387+
388+
size_t PublisherBase::lowest_available_ipm_capacity() const
389+
{
390+
if (!intra_process_is_enabled_) {
391+
return 0u;
392+
}
393+
394+
auto ipm = weak_ipm_.lock();
395+
396+
if (!ipm) {
397+
// TODO(ivanpauno): should this raise an error?
398+
RCLCPP_WARN(
399+
rclcpp::get_logger("rclcpp"),
400+
"Intra process manager died for a publisher.");
401+
return 0u;
402+
}
403+
404+
return ipm->lowest_available_capacity(intra_process_publisher_id_);
405+
}

rclcpp/test/rclcpp/test_intra_process_buffer.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
238238
EXPECT_EQ(original_value, *popped_unique_msg);
239239
EXPECT_EQ(original_message_pointer, popped_message_pointer);
240240
}
241+
242+
/*
243+
Check the available buffer capacity while storing and consuming data from an intra-process
244+
buffer.
245+
The initial available buffer capacity should equal the buffer size.
246+
Inserting a message should decrease the available buffer capacity by 1.
247+
Consuming a message should increase the available buffer capacity by 1.
248+
*/
249+
TEST(TestIntraProcessBuffer, available_capacity) {
250+
using MessageT = char;
251+
using Alloc = std::allocator<void>;
252+
using Deleter = std::default_delete<MessageT>;
253+
using SharedMessageT = std::shared_ptr<const MessageT>;
254+
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
255+
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
256+
MessageT, Alloc, Deleter, UniqueMessageT>;
257+
258+
constexpr auto history_depth = 5u;
259+
260+
auto buffer_impl =
261+
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
262+
history_depth);
263+
264+
UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));
265+
266+
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
267+
268+
auto original_unique_msg = std::make_unique<char>('a');
269+
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
270+
auto original_value = *original_unique_msg;
271+
272+
intra_process_buffer.add_unique(std::move(original_unique_msg));
273+
274+
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
275+
276+
SharedMessageT popped_shared_msg;
277+
popped_shared_msg = intra_process_buffer.consume_shared();
278+
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());
279+
280+
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
281+
EXPECT_EQ(original_value, *popped_shared_msg);
282+
EXPECT_EQ(original_message_pointer, popped_message_pointer);
283+
284+
original_unique_msg = std::make_unique<char>('b');
285+
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
286+
original_value = *original_unique_msg;
287+
288+
intra_process_buffer.add_unique(std::move(original_unique_msg));
289+
290+
auto second_unique_msg = std::make_unique<char>('c');
291+
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
292+
auto second_value = *second_unique_msg;
293+
294+
intra_process_buffer.add_unique(std::move(second_unique_msg));
295+
296+
EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());
297+
298+
UniqueMessageT popped_unique_msg;
299+
popped_unique_msg = intra_process_buffer.consume_unique();
300+
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
301+
302+
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
303+
EXPECT_EQ(original_value, *popped_unique_msg);
304+
EXPECT_EQ(original_message_pointer, popped_message_pointer);
305+
306+
popped_unique_msg = intra_process_buffer.consume_unique();
307+
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
308+
309+
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
310+
EXPECT_EQ(second_value, *popped_unique_msg);
311+
EXPECT_EQ(second_message_pointer, popped_message_pointer);
312+
}

0 commit comments

Comments
 (0)