Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Handling unloading of diagnostics by service #53

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions diagnostic_aggregator/include/diagnostic_aggregator/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ Input (status names):
/Robot/Sensors/Tilt Hokuyo/Frequency
/Robot/Sensors/Tilt Hokuyo/Connection
\endverbatim
* The analyzer should always output a DiagnosticStatus with the name of the
* The analyzer should always output a DiagnosticStatus with the name of the
* prefix. Any other data output is up to the analyzer developer.
*
*
* Analyzer's are loaded by specifying the private parameters of the
* aggregator.
* aggregator.
\verbatim
base_path: My Robot
pub_rate: 1.0
Expand All @@ -102,7 +102,7 @@ pub_rate: 1.0
* the aggregator will report the error and publish it in the aggregated output.
*/
class Aggregator
{
{
public:
/*!
*\brief Constructor initializes with main prefix (ex: '/Robot')
Expand All @@ -127,6 +127,11 @@ class Aggregator
double getPubRate() const { return pub_rate_; }

private:
typedef boost::shared_ptr<Analyzer> AnalyzerPtr;
typedef boost::shared_ptr<bond::Bond> BondPtr;
typedef std::pair<BondPtr, AnalyzerPtr> BondAnalyzerPair;
typedef std::vector<BondAnalyzerPair> BondAnalyzerPairs;

ros::NodeHandle n_;
ros::ServiceServer add_srv_; /**< AddDiagnostics, /diagnostics_agg/add_diagnostics */
ros::Subscriber diag_sub_; /**< DiagnosticArray, /diagnostics */
Expand All @@ -153,18 +158,16 @@ class Aggregator

OtherAnalyzer* other_analyzer_;

std::vector<boost::shared_ptr<bond::Bond> > bonds_; /**< \brief Contains all bonds for additional diagnostics. */
BondAnalyzerPairs bonds_; /**< \brief Contains all bonds for additional diagnostics. */

/*
*!\brief called when a bond between the aggregator and a node is broken
*
* Modifies the contents of added_analyzers_ and analyzer_group, removing the
* diagnostics that had been brought up by that bond.
*!\param bond_id The bond id (namespace) from which the analyzer was created
*!\param analyzer Shared pointer to the analyzer group that was added
*/
void bondBroken(std::string bond_id,
boost::shared_ptr<Analyzer> analyzer);
void bondBroken(std::string bond_id);

/*
*!\brief called when a bond is formed between the aggregator and a node.
Expand Down Expand Up @@ -193,7 +196,7 @@ class Aggregator
struct BondIDMatch
{
BondIDMatch(const std::string s) : s(s) {}
bool operator()(const boost::shared_ptr<bond::Bond>& b){ return s == b->getId(); }
bool operator()(const std::pair< boost::shared_ptr<bond::Bond>, boost::shared_ptr<Analyzer> > & p){ return s == p.first->getId(); }
const std::string s;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*********************************************************************/

/**
* \author Kevin Watts
* \author Kevin Watts
*/

#ifndef DIAGNOSTIC_ANALYZER_GROUP_H
Expand All @@ -60,10 +60,10 @@ namespace diagnostic_aggregator {
*\brief Allows analyzers to be grouped together, or used as sub-analyzers
*
* The AnalyzerGroup is used by the diagnostic aggregator internally to
* load and handle analyzers. It can be used as a normal analyzer plugin to
* load and handle analyzers. It can be used as a normal analyzer plugin to
* allow analyzers to become "sub-analyzers", or move as a group.
*
* The "sub-analyzers" are initialized using parameters in the "~analyzers"
* The "sub-analyzers" are initialized using parameters in the "~analyzers"
* namespace of the AnalyzerGroup. The "type" parameters determines the analyzer type.
*
* Example initialization:
Expand Down Expand Up @@ -92,14 +92,14 @@ namespace diagnostic_aggregator {
* num_items: 3
*\endverbatim
*
* Each namespace below "analyzers" describes a new Analyzer that will be loaded as a
* Each namespace below "analyzers" describes a new Analyzer that will be loaded as a
* sub-analyzer. Any analyzer that fails to initialize or loads incorrectly will
* generate an error in the console output, and a special diagnostic item in the output
* of the AnalyzerGroup that describes the error.
* of the AnalyzerGroup that describes the error.
*
* In the above example, the AnalyzerGroup will have three sub-analyzers. The
* In the above example, the AnalyzerGroup will have three sub-analyzers. The
* AnalyzerGroup will report a DiagnosticStatus message in the processed output with
* the name "Sensors" (the top-level state). The "Sensors" message will have the
* the name "Sensors" (the top-level state). The "Sensors" message will have the
* level of the highest of the sub-analyzers, or the highest of "Sensors/Base Hokuyo",
* "Sensors/Tilt Hokuyo" and "Sensors/IMU". The state of any other items, like
* "Sensors/IMU/Connection" won't matter to the AnalyzerGroup.
Expand Down Expand Up @@ -136,11 +136,6 @@ class AnalyzerGroup : public Analyzer
*/
virtual bool match(const std::string name);

/*!
*\brief Clear match arrays. Used when analyzers are added or removed
*/
void resetMatches();

/*!
*\brief Analyze returns true if any sub-analyzers will analyze an item
*/
Expand All @@ -156,6 +151,11 @@ class AnalyzerGroup : public Analyzer
virtual std::string getName() const { return nice_name_; }

private:
/*!
*\brief Clear match arrays. Used when analyzers are added or removed
*/
void resetMatches();

std::string path_, nice_name_;

/*!
Expand Down
54 changes: 37 additions & 17 deletions diagnostic_aggregator/scripts/add_analyzers
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python

NAME='add_analyzers'

import sys
import argparse
from bondpy import bondpy
Expand All @@ -12,13 +10,36 @@ import rospy
class AddAnalyzers:

def __init__(self, args):
rospy.on_shutdown(self.remove_group)
self.bond = None
self.add_diagnostics = None
self.name = rospy.get_name()
self.namespace = None
self.add_analyzers(args)
rospy.on_shutdown(self.remove_group)

def remove_group(self):
if self.bond:
self.bond.shutdown()
if self.add_diagnostics is None:
return;

# unload the analyzers
try:
# this will reverse the load if it is loaded on the other side
resp = self.add_diagnostics(load_namespace=self.namespace)
if resp.success:
rospy.loginfo('Add Analyzers: successfully removed analyzers from the diagnostic aggregator [{0}]'.format(self.name))
else:
rospy.logerr('Add Analyzers: failed to remove analyzers on the diagnostic aggregator [{0}][{1}]'.format(self.name, resp.message))
except rospy.service.ServiceException:
rospy.logerr('Add Analyzers: unloading service returned failure [{0}]'.format(self.name))
except rospy.ROSException:
rospy.logerr('Add Analyzers: add timed out while waiting for diagnostics_agg service, or ROS shutdown [{0}]'.format(self.name))

# Cannot rely on bonds to get a message across if parts of ros are shutting down
# https://github.com/ros/bond_core/issues/14
#
# if self.bond:
# self.bond.shutdown()


def add_analyzers(self, myargv):
usage = """
Expand All @@ -32,7 +53,7 @@ class AddAnalyzers:
parser.add_argument('-t', '--timeout', type=float, dest='timeout', default=None, help='time in seconds to wait for the diagnostic_agg service to come up before timing out. Default waits indefinitely')
args = parser.parse_args(myargv[1:])

namespace = rospy.resolve_name(rospy.get_name())
self.namespace = rospy.resolve_name(rospy.get_name())

if args.analyzer_yaml is None:
# nothing to do - it will assume parameters are already loaded on
Expand All @@ -43,27 +64,26 @@ class AddAnalyzers:
for params, ns in paramlist:
rosparam.upload_params(ns, params)

self.bond = bondpy.Bond("/diagnostics_agg/bond", namespace)
self.bond = bondpy.Bond("/diagnostics_agg/bond", self.namespace)

try:
rospy.wait_for_service('/diagnostics_agg/add_diagnostics', timeout=args.timeout)
self.bond.start()
add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
resp = add_diagnostics(load_namespace=namespace)
self.add_diagnostics = rospy.ServiceProxy('/diagnostics_agg/add_diagnostics', AddDiagnostics)
resp = self.add_diagnostics(load_namespace=self.namespace)
if resp.success:
rospy.loginfo(NAME + ' successfully added analyzers to diagnostic aggregator')
rospy.loginfo('Add Analyzers: successfully added analyzers to diagnostic aggregator [{0}]'.format(self.name))
self.bond.start()
else:
rospy.logerr(NAME + ' did not add any analyzers to diagnostic aggregator: ' + resp.message)
rospy.logerr('Add Analyzers: did not add any analyzers to diagnostic aggregator [{0}][{1}]'.format(self.name, resp.message))
rospy.signal_shutdown('')
except rospy.service.ServiceException:
rospy.logerr(NAME + ' service returned failure - missing aggregator or failed init of analyzer group?')
rospy.logerr('Add Analyzers: service returned failure - missing aggregator or failed init of analyzer group? [{0}]'.format(self.name))
rospy.signal_shutdown('')
except rospy.ROSException:
rospy.logerr(NAME + ' add timed out while waiting for diagnostics_agg service, or ROS shutdown')
rospy.logerr('Add Analyzers: add timed out while waiting for diagnostics_agg service, or ROS shutdown [{0}]'.format(self.name))
rospy.signal_shutdown('')

rospy.spin()

if __name__ == '__main__':
rospy.init_node(NAME)
rospy.init_node('add_analyzers')
AddAnalyzers(rospy.myargv())
rospy.spin()
60 changes: 38 additions & 22 deletions diagnostic_aggregator/src/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr
stamp_warn += ", ";
stamp_warn += it->name;
}

if (!ros_warnings_.count(stamp_warn))
{
ROS_WARN("%s", stamp_warn.c_str());
Expand Down Expand Up @@ -119,64 +119,80 @@ Aggregator::~Aggregator()
}


void Aggregator::bondBroken(string bond_id, boost::shared_ptr<Analyzer> analyzer)
void Aggregator::bondBroken(string bond_id)
{
boost::mutex::scoped_lock lock(mutex_); // Possibility of multiple bonds breaking at once
ROS_DEBUG("Bond for namespace %s was broken", bond_id.c_str());
std::vector<boost::shared_ptr<bond::Bond> >::iterator elem;
BondAnalyzerPairs::iterator elem;
elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id));
if (elem == bonds_.end()){
ROS_WARN("Broken bond tried to erase a bond which didn't exist.");
} else {
if (!analyzer_group_->removeAnalyzer(elem->second))
{
ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
}
bonds_.erase(elem);
}
if (!analyzer_group_->removeAnalyzer(analyzer))
{
ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
}

analyzer_group_->resetMatches();
}

void Aggregator::bondFormed(boost::shared_ptr<Analyzer> group){
ROS_DEBUG("Bond formed");
boost::mutex::scoped_lock lock(mutex_);
analyzer_group_->addAnalyzer(group);
analyzer_group_->resetMatches();
}

/*
* This will load diagnostics if they are not already loaded
* and reverse the operation (unload) them if they are.
*/
bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req,
diagnostic_msgs::AddDiagnostics::Response &res)
{
ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());
// Don't currently support relative or private namespace definitions
if (req.load_namespace[0] != '/')
{
res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported.";
res.message = "Requested (un)load from non-global namespace. Private and relative namespaces are not supported.";
res.success = false;
return true;
}

boost::shared_ptr<Analyzer> group = boost::make_shared<AnalyzerGroup>();
{ // lock here ensures that bonds from the same namespace aren't added twice.
// Without it, possibility of two simultaneous calls adding two objects.
{
// Without lock, possibility of two simultaneous calls in this function at the same time
boost::mutex::scoped_lock lock(mutex_);
// rebuff attempts to add things from the same namespace twice
if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end())
// if adding to a namespace that already has it, remove it - toggle!
BondAnalyzerPairs::iterator existing_bond_analyzer_iter = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace));
if (existing_bond_analyzer_iter != bonds_.end())
{
res.message = "Requested load from namespace " + req.load_namespace + " which is already in use";
res.success = false;
ROS_DEBUG("Got unload request for namespace %s", req.load_namespace.c_str());
if (!analyzer_group_->removeAnalyzer(existing_bond_analyzer_iter->second))
{
ROS_WARN("Tried to remove an analyzer which didn't exist.");
}

// erase might delete the last bond reference triggering bondBroken(string bond_id)
// leading to a deadlock because of mutex_ lock, so create a temp reference
{
BondPtr temp_bond_reference = existing_bond_analyzer_iter->first;
bonds_.erase(existing_bond_analyzer_iter);
lock.unlock();
//destroy temp_bond_reference reference
}

res.message = "Unloaded from namespace '" + req.load_namespace + "'";
res.success = true;
return true;
}
ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());

boost::shared_ptr<bond::Bond> req_bond = boost::make_shared<bond::Bond>(
"/diagnostics_agg/bond", req.load_namespace,
boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)),
boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace)),
boost::function<void(void)>(boost::bind(&Aggregator::bondFormed, this, group))
);
req_bond->start();

bonds_.push_back(req_bond); // bond formed, keep track of it
bonds_.push_back(std::make_pair(req_bond, group)); // bond formed, keep track of it with associated analyzer
}

if (group->init(base_path_, ros::NodeHandle(req.load_namespace)))
Expand All @@ -201,7 +217,7 @@ void Aggregator::publishData()
diag_toplevel_state.name = "toplevel_state";
diag_toplevel_state.level = -1;
int min_level = 255;

vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
{
boost::mutex::scoped_lock lock(mutex_);
Expand Down
12 changes: 6 additions & 6 deletions diagnostic_aggregator/src/aggregator_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ using namespace std;
int main(int argc, char **argv)
{
ros::init(argc, argv, "diagnostic_aggregator");

try
{
diagnostic_aggregator::Aggregator agg;

ros::Rate pub_rate(agg.getPubRate());
while (agg.ok())
while (ros::ok() && agg.ok())
{
ros::spinOnce();
agg.publishData();
ros::spinOnce();
pub_rate.sleep();
}
}
Expand All @@ -60,8 +60,8 @@ int main(int argc, char **argv)
ROS_FATAL("Diagnostic aggregator node caught exception. Aborting. %s", e.what());
ROS_BREAK();
}

exit(0);
return 0;
}

Loading