网站首页 > 技术文章 正文
ROS多线程
回调(callback)
主要有四种类型的回调:
订阅回调——ros::Subscriber
请求回调——ros::ServiceServer
行动回调——actionlib::SimpleActionServer
定时器回调——ros::Timer
Callback的特点:用户实现,ros调度。
Ros回调机制
ROS默认有维护一个全局回调队列(名为:Global Callback Queue),将已可用的回调插入Callback队列中。再通过Spinner线程获取并执行当前可用的回调。为了说明ROS回调机制,我引入两个ROS节点:一个使用定时器发布多个topic消息;另一个订阅这些topic消息。
发布节点
一个使用ROS定时器超时回调发布多个topic消息。
ros::Timer
来看一个ros::Timer的回调队列,代码如下:
//这段代码主要是实现定时向Topic发布消息 #include "ros/ros.h" #include <boost/thread.hpp> #include "myset_msg/weather_pub.h" #include "myset_msg/air_quality_pub.h" #include <sstream> int main(int argc, char **argv){ ros::init(argc, argv, "m_publisher"); ros::NodeHandle n; /*通知ROS master,节点要发布一个名为“topic”的话题(topic), 消息类型为my_msg::topic_pub,发送队列长度为48*/ ros::Publisher pub_weather = n.advertise<myset_msg::topic_pub>("topic", 48, true); /*通知ROS master,节点要发布一个名为“topicA”的话题(topic), 消息类型为my_msg::topic_pub,发送队列长度为48*/ ros::Publisher pub_topic_a = n.advertise<myset_msg::topic_pub>("topicA", 48, true); /*通知ROS master,节点要发布一个名为“lQuality”的话题(topic), 消息类型为my_msg::l_quality_pub,发送队列长度为48*/ ros::Publisher pub_l_quality = n.advertise<myset_msg::l_quality_pub>("lQuality", 48, true); int count = 0; //创建一个ros::Timer每0.2秒进行发布,回调函数采用lamda4方法的格式 ros::Timer timer = n.createTimer(ros::Duration(0.2), [&](const ros::TimerEvent&) { myset_msg::topic_pub msg; std::stringstream ss; ss << "find " << count; msg.weather= ss.str(); ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],topic:"<< msg.weather.c_str()); pub_weather.publish(msg); std::stringstream ss; ss << "find " << 20+count; msg.topic= ss.str(); ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],topic:"<< msg.topic.c_str()); pub_topic_a.publish(msg); myset_msg::l_quality_pub msg_l; msg_l.l_quality_index = 128+count; ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],l quality:"<< msg_l.l_quality_index); pub_l_quality.publish(msg_l); ++count; }); //确保定时器回调被调用 ros::spin(); return 0; } |
定时器启动后会生成一个Timer线程,根据定时器的参数,当定时器超时后将定时器的回调函数加入Callback队列中。然后再由用户调用的Spinner线程(ros::spin)从Callback队列中依次取出当前已可用的回调并执行。
ros::Timer的队列和回调示意图
ros::Publisher
在定时器回调函数中向topic进行发布,ros::Publisher将要发布的消息加入到Publisher队列中,再由专门的Publisher线程发布出去。注意这其中并不涉及Callback队列,这也解释了上篇中提到的:如果一个ROS节点仅进行topic发布是不需要调用spinner的。
ros::Pubblisher的队列和发送示意图
订阅节点
订阅上面发布的topic消息。根据不同情况,进行代码修改。
ros::Subscriber
先让ROS节点只订阅一个topic来说明订阅回调的过程。下面是代码:
//订阅一个topic的代码 #include "ros/ros.h" #include "myset_msg/topic_pub.h" //回调函数,注意参数是const类型的boost::shared_ptr指针 void topicCallback(const my_msg::topic_pubConstPtr& msg) { ROS_INFO("The 24 hours topic: [%s]", msg->topic.c_str()); } int main(int argc, char **argv){ ros::init(argc, argv, "subscriber"); ros::NodeHandle n; /*通知ROS master,节点要订阅名为“Weather”的话题(topic), 并指定回调函数topicCallback*/ ros::Subscriber sub = n.subscribe("topic", 48, topicCallback); ros::spin(); } |
订阅创建后,涉及到两个线程和两个队列:
- ROS提供的Receiver线程将收到的消息加入到Subscriber队列,并触发使订阅回调函数加入Callback队列。注意:每个ros::Subscriber有自己的Subscriber队列,而Callback队列默认是全局的。
- 用户调用的Spinner线程(也就是ros::spin)从Callback队列中取出已可用的回调并执行。
ros::Subscriber的队列和回调示意图
在实际项目中,如果订阅回调中有耗时操作,那么可以用户可以启用多个Spinner线程并发从Callback队列中取出已可用的回调并执行。这样可以加快Callback队列被执行的速度。
ROS Spinner
ROS提供两种单线程回调的spin方法和两种多线程回调的Spin类,分别是:
单线程回调spin方法:
- ros::spin()——相当于while(true)的大循环,不断遍历执行Callback队列中的可用回调
- ros::spinOne()——相当于马上执行一次Callback队列中的可用回调
多线程回调spin类:
- ros::MultiThreadedSpinner——相当于while(true)大循环,启动指定数量的Spinner线程并发执行Callback队列中的可用回调。可指定Callback队列。
- ros::AsyncSpinner——异步启停,开启指定数量的Spinner线程并发执行Callback队列中的可用回调。可指定Callback队列。
如果程序简单用ros::spin()就够了;如果程序复杂推荐使用ros::AsyncSpinner类。
下面这段代码展示如何使用ros::AsyncSpinner启用多个Spinner线程。
//一个topic多个线程来执行的代码 #include "ros/ros.h" #include "boost/thread.hpp" #include "myset_msg/topic_pub.h" //回调函数,注意参数是const类型的boost::shared_ptr指针 void topicCallback(const my_msg::topic_pubConstPtr& msg) { ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],after looping 24 hours topic:"<< msg->topic.c_str()); } int main(int argc, char **argv){ ros::init(argc, argv, "m_subscriber"); ros::NodeHandle n; /*通知ROS master,节点要订阅名为“Weather”的话题(topic), 并指定回调函数topicCallback*/ ros::Subscriber sub = n.subscribe("topic", 48, topicCallback); ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"]This is main thread."); //声明spinner对象,参数2表示并发线程数,默认处理全局Callback队列 ros::AsyncSpinner spinner(2); //启动两个spinner线程并发执行可用回调 spinner.start(); ros::waitForShutdown(); } |
下图展示了相关的线程和队列处理过程 :
使用多线程Spinner同时并发处理回调示意图
实际项目中一个节点往往要订阅多个topic,在使用默认全局Callback队列时,如果某些topic发布频率高回调处理又耗时的话,容易影响其他topic消息的处理。下图中TopicB的消息居多可能影响TopicA的处理。
多个ros::Subscriber在全局回调队列中排队示意图
这种情况下,ROS提供了机制,可以为每个ros::Subscriber指定Callback队列,再分别指定Spinner线程仅处理指定Callback队列的回调。这样确保每个订阅回调相互独立不影响。下面的代码展示如何进行上述操作:
//为每个subscriber指定队列 #include "ros/ros.h" #include "boost/thread.hpp" #include "myset_msg/topic_pub.h" #include "myset_msg/l_quality_pub.h" #include <ros/callback_queue.h> //回调函数,注意参数是const类型的boost::shared_ptr指针 void topicCallback(const my_msg::topic_pubConstPtr& msg) { ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],before loop 24 hours topic:"<< msg->topic.c_str()); //死循环 while(true){} ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],24 hours topic:"<< msg->topic.c_str()); } void topicCallback_A(const my_msg::topic_pubConstPtr& msg) { ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],A 24 hours topic:"<< msg->topic.c_str()); } //回调函数,注意参数是const类型的boost::shared_ptr指针 void lQualityCallback(const my_msg::l_quality_pubConstPtr& msg) { ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id() <<"],24 hours l quality:"<< msg->l_quality_index); } int main(int argc, char **argv){ ros::init(argc, argv, "m_subscriber"); ros::NodeHandle n; /*通知ROS master,节点要订阅名为“topic”的话题(topic), 并指定回调函数weatherCallback*/ ros::Subscriber sub = n.subscribe("topic", 48, topicCallback); ros::Subscriber sub_a = n.subscribe("topicA", 48, topicCallback_A); //需要单独声明一个ros::NodeHandle ros::NodeHandle n_1; //为这个ros::Nodehandle指定单独的Callback队列 ros::CallbackQueue my_queue; n_1.setCallbackQueue(&my_queue); /*通知ROS master,节点要订阅名为“lQuality”的话题(topic), 并指定回调函数airQualityCallback*/ ros::Subscriber air_sub = n_1.subscribe("lQuality", 48, airQualityCallback); ROS_INFO_STREAM("Thread["<< boost::this_thread::get_id()<<"]This is main topic."); //启动两个线程处理全局Callback队列 ros::AsyncSpinner spinner(2); spinner.start(); //启动一个线程处理AirQuality单独的队列 ros::AsyncSpinner spinner_1(1, &my_queue); spinner_1.start(); ros::waitForShutdown(); } |
从执行结果中可以看到,进程中包括四个线程:主线程、全局队列Spinner线程1、全局队列Spinner线程2,以及本地队列Spinner线程3。尽管Spinner线程1被回调函数中的死循环卡住,但并不影响其他topic的回调处理。
下图展示了相关的线程和队列处理过程 :
多个ros::Subscriber各自独立使用回调队列示意图
小结
在理解ROS的回调机制后,使用多个Callback队列和多个Spinner线程可以满足实际项目开发的需要。提醒大家在使用多线程时,记得对临界区域适当加锁,防止引入多线程问题。
猜你喜欢
- 2024-10-09 用最快速度,打造「最强 Webpack 前端工具链」,强势运行
- 2024-10-09 苹果iPhone 15 Pro系列手机静音拨片将改为Action按钮
- 2024-10-09 22个必须学习的Linux安全命令 简述linux的安全口令规则
- 2024-10-09 盘点|22个基本的Linux安全命令 linux教程安全性
- 2024-10-09 0207-使用Oozie API接口向Kerberos环境的CDH集群提交Shell作业
- 2024-10-09 什么是CSRF攻击? 简述csrf攻击的基本原理
- 2024-10-09 DELMIA软件物流仿真:操作对象显示与隐藏功能介绍与使用方法
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)