我们这一部分模拟一个低能的阶乘计算器,来比较Service和Action的差异。


碎碎念:Hello米娜桑,这里是英国留学中的杨丝儿。我的博客技术点集中在机器人、人工智能可解释性、数学、物理等等,感兴趣地点个关注吧,持续高质量输出中。
唠嗑QQ群兔叽的魔术工房 (942848525)
B站账号杨丝儿今天也在科学修仙(UP主跨站求个关注)


本系列用时9天,博主也是从零开始,尽力去写的,如果发现了错误一定要私信告诉我呀。这么努力的博主,关注一下吧。


:star:案例目标

说明以下问题

  • ROS中message,action,service的定义方式大同小异。
  • 在消息作为一等公民的ROS软件中,灵活使用面向对象思想。
  • 文件的命名问题,很多文件的命名是很有讲究的,节点名也是。
  • ROS使用大量多线程并行的思想,需要理解并行与并发、python的主锁以及阻塞和上锁的区别。

:star:准备工作

  • 基础步骤参见:【机器人】ROS程序框架:架构部分
  • 创建factorial_calculator包

:star:服务部分

:sparkles:定义文件

创建一个名为FactorialService.srv,定义输入和输出

1
2
3
uint32 num_in
---
uint32 num_out

取消package.xml中的注释

添加信息到CMakeLists.txt

1
2
3
4
5
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs # 新增
message_generation # 新增
)
1
2
3
4
5
# Generate services in the 'srv' folder
add_service_files(
FILES
FactorialService.srv # 新增
)

取消CMakeLists.txt中的注释

使用$catkin_make编译


:sparkles:使用服务

客户端factorial_service_client.py代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python3

import rospy
from factorial_calculator.srv import FactorialService

class FactorialServiceClient():
def __init__(self):
rospy.init_node('service_client_factorial')
rospy.wait_for_service('service/server/factorial/service')
self.factorise = rospy.ServiceProxy('service/server/factorial/service', FactorialService)

def call(self,num):
return self.factorise(num).num_out


if __name__ == '__main__':
client = FactorialServiceClient()
while input_num := input("Input a num: "):
if not input_num.isdigit():
continue
input_num = int(input_num)
result = client.call(input_num)
print(f"Result is: {result}")

服务端factorial_service_server.py代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python3

import rospy
from factorial_calculator.srv import FactorialService,FactorialServiceResponse

class FactorialServiceServer():
def __init__(self):
rospy.init_node('service_server_factorial')

def factorise(self,num):
result = 1
for i in range(1,num+1):
result *= i
rospy.sleep(0.2)
print(result)
return result

def run(self):
service = rospy.Service('service/server/factorial/service', FactorialService, lambda x : FactorialServiceResponse(self.factorise(x.num_in)))
rospy.spin()

if __name__ == '__main__':
server = FactorialServiceServer()
server.run()

:sparkles:效果


:star:动作部分

:sparkles:定义文件

创建一个名为FactorialAction.action

1
2
3
4
5
uint32 num_in
---
uint32 num_process
---
uint32 num_out

package.xml添加信息

1
2
<build_depend>actionlib_msgs</build_depend>
<exec_depend>actionlib_msgs</exec_depend>

CMakeLists.txt

1
2
3
4
5
# Generate actions in the 'action' folder
add_action_files(
FILES
FactorialAction.action # 新增
)

添加actionlib_msgs依赖

1
2
3
4
5
6
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
actionlib_msgs # 新增
message_generation
)
1
2
3
4
5
generate_messages(
DEPENDENCIES
std_msgs # Or other packages containing msgs
actionlib_msgs # 新增
)
1
2
3
4
5
6
7
catkin_package(
# INCLUDE_DIRS include
# LIBRARIES factorial_calculator
# CATKIN_DEPENDS rospy
# DEPENDS system_lib
actionlib_msgs # 新增
)

$catkin_make


:sparkles:使用服务

客户端factorial_action_client.py代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python3

import rospy

import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult

class FactorialActionClient():
def __init__(self):
# 初始化节点
rospy.init_node('action_client_factorial')

# 定义动作客户端
self.client = actionlib.SimpleActionClient('action/server/factorial/action',
FactorialActionAction)
self.client.wait_for_server()

def feedback_callback(self,feedback):
if int(feedback)>100000:
self.client.cancel_goal()
print('Feedback:', feedback)

def call(self,num):
# 定义goal
goal = FactorialActionGoal()
goal.num_goal = num

# 发送goal
self.client.send_goal(goal, feedback_cb=lambda x : self.feedback_callback(x.num_feedback))

# 阻塞等待结果,一般是不会这样使用的
# 这是多线程中的阻塞步骤
self.client.wait_for_result()
return self.client.get_result().num_result


if __name__ == '__main__':
client = FactorialActionClient()

# 循环读取终端输入并上传动作服务器执行。
while input_num := input("Input a num: "):
if not input_num.isdigit():
continue
input_num = int(input_num)
result = client.call(input_num)
print(f"Result is: {result}")

服务端factorial_action_server.py代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3

import rospy

import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult,FactorialActionFeedback

class FactorialActionServer():
def __init__(self):
rospy.init_node('action_server_factorial')
self.server = actionlib.SimpleActionServer('action/server/factorial/action',
FactorialActionAction,
lambda x : self.factorise(x.num_goal),
auto_start=False)

def factorise(self,num):
result = 1

if num > 14:
self.server.set_aborted(FactorialActionResult(result),'[aborted]')
return

for i in range(1,num+1):

if self.server.is_preempt_requested():
self.server.set_preempted(FactorialActionResult(result),'[preempted]')

result *= i
rospy.sleep(0.2)
self.server.publish_feedback(FactorialActionFeedback(result))

self.server.set_succeeded(FactorialActionResult(result))
return

def run(self):
self.server.start()
rospy.spin()

if __name__ == '__main__':
server = FactorialActionServer()
server.run()

:sparkles:效果