首页 > Python资料 博客日记

ros2 rclpy 详解 --创建 python类型节点

2025-01-03 03:00:04Python资料围观22

这篇文章介绍了ros2 rclpy 详解 --创建 python类型节点,分享给大家做个参考,收藏Python资料网收获更多编程知识

rclpy 是 ROS 2 (Robot Operating System 2) 中用于 Python 的客户端库。它提供了与 ROS 2 系统交互的 API,使开发者能够使用 Python 编写 ROS 2 节点、发布和订阅消息、调用服务、定时器等。rclpy 是 ROS 2 的核心库之一,为 Python 开发者提供了与 ROS 2 系统进行通信的能力。

  1. rclpy 的基本功能
    创建节点:提供创建和管理 ROS 2 节点的功能。
    发布/订阅消息:支持创建发布者和订阅者,用于在 ROS 2 系统中发送和接收消息。
    服务/客户端:支持创建服务和客户端,用于在节点间进行请求/响应式通信。
    定时器:提供定时器功能,用于定期执行某些任务。
    参数管理:支持管理和使用节点参数。
  2. 安装 rclpy
    rclpy 是 ROS 2 的一部分,可以通过 ROS 2 的安装程序自动安装。确保已经安装了 ROS 2 环境。然后可以在 Python 环境中使用 rclpy。

安装 ROS 2(以 Ubuntu 为例):

sudo apt update
sudo apt install ros-<ros-distro>-rclpy

替换 为你使用的 ROS 2 发行版(如 foxy、galactic、humble)。

  1. 创建和使用 ROS 2 节点
    3.1 创建节点
    创建一个简单的 ROS 2 节点的基本步骤如下:

导入 rclpy:

import rclpy
from rclpy.node import Node

定义节点类:

class MinimalNode(Node):
    def __init__(self):
        super().__init__('minimal_node')
        self.get_logger().info('Hello ROS 2 from Python node!')

初始化和运行节点:

def main(args=None):
    rclpy.init(args=args)
    node = MinimalNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

完整示例:

import rclpy
from rclpy.node import Node

class MinimalNode(Node):
    def __init__(self):
        super().__init__('minimal_node')
        self.get_logger().info('Hello ROS 2 from Python node!')

def main(args=None):
    rclpy.init(args=args)
    node = MinimalNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

  1. 发布和订阅消息
    4.1 创建发布者
from std_msgs.msg import String

class PublisherNode(Node):
    def __init__(self):
        super().__init__('publisher_node')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello, world! %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

4.2 创建订阅者

class SubscriberNode(Node):
    def __init__(self):
        super().__init__('subscriber_node')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('Received: "%s"' % msg.data)

  1. 服务和客户端
    5.1 创建服务
from example_interfaces.srv import AddTwoInts

class ServiceNode(Node):
    def __init__(self):
        super().__init__('service_node')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        return response

5.2 创建客户端

from example_interfaces.srv import AddTwoInts

class ClientNode(Node):
    def __init__(self):
        super().__init__('client_node')
        self.client = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting again...')
        self.req = AddTwoInts.Request()
        self.req.a = 1
        self.req.b = 2
        self.send_request()

    def send_request(self):
        self.future = self.client.call_async(self.req)
        self.future.add_done_callback(self.callback)

    def callback(self, future):
        try:
            response = future.result()
        except Exception as e:
            self.get_logger().info('Service call failed %r' % (e,))
        else:
            self.get_logger().info('Result of add_two_ints: %d' % response.sum)

  1. 定时器
    使用定时器可以定期调用回调函数:
class TimerNode(Node):
    def __init__(self):
        super().__init__('timer_node')
        self.timer = self.create_timer(1.0, self.timer_callback)

    def timer_callback(self):
        self.get_logger().info('Timer callback triggered')

  1. 参数管理
    rclpy 支持节点参数的设置和获取:
class ParameterNode(Node):
    def __init__(self):
        super().__init__('parameter_node')
        self.declare_parameter('my_parameter', 'default_value')
        self.get_logger().info('Parameter value: %s' % self.get_parameter('my_parameter').get_parameter_value().string_value)

  1. 与 ROS 2 的集成
    rclpy 节点可以通过 ros2 run 命令来运行,确保将 Python 脚本的路径添加到 setup.py 文件中的 entry_points 部分。

示例 setup.py:

from setuptools import setup

package_name = 'my_package'

setup(
    name=package_name,
    version='0.0.1',
    packages=[package_name],
    py_modules=['my_module'],
    entry_points={
        'console_scripts': [
            'my_node = my_package.my_module:main',
        ],
    },
    install_requires=['setuptools'],
)

总结
rclpy 是 ROS 2 的 Python 客户端库,提供了创建节点、发布和订阅消息、调用服务、定时器等功能。它使得 Python 开发者能够使用熟悉的 Python 语言进行 ROS 2 开发。通过理解和使用 rclpy,可以构建功能强大的 ROS 2 应用程序,并与 ROS 2 系统进行高效的交互。


版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐