apollo_logo
3
0

Cyber python版本

我们知道cyber是apollo自动驾驶的中间件,是一个分布式的发布订阅系统,这里提供了一个轻量化的python版本,可以直接在本地安装,不需要启动容器和下载整个apollo代码仓库,就可以运行。

安装

通过pip就可以完成cyber的安装。

pip3 install pycyber

目前在以下环境中测试通过,暂时不支持windows和mac系统。

  • 操作系统:ubuntu 18.04
  • 硬件 : x86
  • python版本:3.6

安装过程如图:

pycyber安装过程

运行

pycyber目前集成了一个简单的测试用例和4个命令,下面我们分别开始介绍。

测试用例

目前pycyber集成了一个简单的测试用例,你可以在一个终端中输入listener,然后在打开一个终端窗口,输入talker,就可以看到2者之间的收发消息过程了,其中listener为接收端,talker为发布端。

运行的效果如下图所示。

listener和talker进行通信

命令行

同时pycyber还自带了4个命令,来支持查看和启动节点等。

  • cyber_node查看当前的节点信息
  • cyber_channel查看当前的通道信息
  • cyber_service查看当前的service信息
  • cyber_launch启动cyber组件

cyber_channel为例,在上述过程中启动该命令,可以查看当前的channel信息

cyber_channel命令输出

引用

除了直接使用命令行之外,还可以通过import pycyber来引用cyber的功能。

1. 发送和接收

通过引用pycyber创建node和reader

importsysfrompycyberimportcyberfrompycyber.examples.proto.examples_pb2importChatterdefcallback(data):"""
Reader message callback.
"""print("="*80)print("py:reader callback msg->:")print(data)print("="*80)deftest_listener_class():"""
Reader message.
"""print("="*120)test_node=cyber.Node("listener")test_node.create_reader("channel/chatter",Chatter,callback)test_node.spin()defmain(args=sys.argv):cyber.init()test_listener_class()cyber.shutdown()

通过引用pycyber创建node和writer

importsysimporttimefrompycyberimportcyberfrompycyber.examples.proto.examples_pb2importChatterdeftest_talker_class():"""
Test talker.
"""msg=Chatter()msg.content=str.encode("py:talker:send Alex!")msg.timestamp=9999msg.seq=0print(msg)test_node=cyber.Node("node_name1")g_count=1writer=test_node.create_writer("channel/chatter",Chatter,6)whilenotcyber.is_shutdown():time.sleep(1)g_count=g_count+1msg.seq=g_countmsg.content=str.encode("I am python talker.")print("="*80)print("write msg -> %s"%msg)writer.write(msg)defmain(args=sys.argv):cyber.init("talker_sample")test_talker_class()cyber.shutdown()

2. 定时器

通过引用pycyber创建定时器timer

importtimefrompycyberimportcyberfrompycyberimportcyber_timercount=0deffun():globalcountprint("cb fun is called:",count)count+=1deftest_timer():cyber.init()ct=cyber_timer.Timer(10,fun,0)# 10msct.start()time.sleep(1)# 1sct.stop()print("+"*80,"test set_option")ct2=cyber_timer.Timer()# 10msct2.set_option(10,fun,0)ct2.start()time.sleep(1)# 1sct2.stop()cyber.shutdown()if__name__=='__main__':test_timer()

更多例子可以参考:

https://github.com/daohu527/pycyber/tree/main/pycyber/examples​github.com/daohu527/pycyber/tree/main/pycyber/examples

总结

pycyber提供了cyber的python版本的功能,可以更加方便的安装和构建,帮助开发者在之上构建自己的功能。

来自专栏
Cyber入门与探索查看专栏 >
原创声明,本文由作者授权发布于Apollo开发者社区,未经许可,不得转载。
发表评论已发表 0 条评论
登录后可评论,请前往 登录
暂无评论~快去发表自己的独特见解吧!
目录
安装
运行
测试用例
命令行
引用
1. 发送和接收
2. 定时器
总结