Examples

The Python ROS Engine comes with several examples demonstrating its core functionality. These examples can be found in the examples/ directory of the repository.

System Architecture

The examples demonstrate various aspects of the Python ROS Engine architecture:

graph TD
    A[Examples System] --> B[Publisher Node]
    A --> C[Subscriber Node]
    A --> D[Service Node]
    A --> E[Client Node]
    A --> F[Bridge Node]
    A --> G[Launch System]
    B --> H[(Topics)]
    C --> H
    E --> I[(Services)]
    D --> I

Here's a sequence diagram showing the interaction between the publisher and subscriber nodes in the examples:

sequenceDiagram
    participant PublisherNode
    participant Topic
    participant SubscriberNode
    PublisherNode->>Topic: Publish message
    Topic->>SubscriberNode: Deliver message
    SubscriberNode->>SubscriberNode: Process message in callback

And here's a sequence diagram showing the service/client interaction:

sequenceDiagram
    participant ClientNode
    participant ServiceNode
    ClientNode->>ServiceNode: Service request
    ServiceNode->>ServiceNode: Process request in callback
    ServiceNode->>ClientNode: Return response
    ClientNode->>ClientNode: Handle response

Publisher Example

This example demonstrates how to create a node with a publisher that sends messages to a topic.

"""Example publisher node for the Python ROS engine."""

import time

from pyros2 import Node
from pyros2.message import String


class PublisherNode(Node):
    """Example publisher node."""

    def __init__(self):
        """Initialize the publisher node."""
        super().__init__("publisher_node")
        self.publisher = self.create_publisher(String, "/example_topic")
        self.counter = 0

    def publish_message(self):
        """Publish a message to the topic."""
        msg = String()
        msg.data = f"Hello World! Counter: {self.counter}"
        self.publisher.publish(msg)
        self.get_logger().info(f"Published: {msg.data}")
        self.counter += 1

    def get_logger(self):
        """Get a simple logger for demonstration."""

        class Logger:
            def info(self, message):
                print(f"[INFO] {message}")

        return Logger()


def main():
    """Run the publisher node."""
    node = PublisherNode()

    try:
        while True:
            node.publish_message()
            time.sleep(1)
    except KeyboardInterrupt:
        print("Shutting down publisher node...")
        node.destroy_node()


if __name__ == "__main__":
    main()

Subscriber Example

This example demonstrates how to create a node with a subscriber that listens to messages on a topic.

"""Example subscriber node for the Python ROS engine."""

from pyros2 import Node
from pyros2.message import String


class SubscriberNode(Node):
    """Example subscriber node."""

    def __init__(self):
        """Initialize the subscriber node."""
        super().__init__("subscriber_node")
        self.subscription = self.create_subscription(
            String, "/example_topic", self.message_callback
        )

    def message_callback(self, msg):
        """Handle received messages."""
        self.get_logger().info(f"Received: {msg.data}")

    def get_logger(self):
        """Create a simple logger for demonstration."""

        class Logger:
            def info(self, message):
                print(f"[INFO] {message}")

        return Logger()


def main():
    """Run the subscriber node."""
    node = SubscriberNode()

    try:
        print("Subscriber node running... Press Ctrl+C to stop.")
        node.spin()
    except KeyboardInterrupt:
        print("Shutting down subscriber node...")
        node.destroy_node()


if __name__ == "__main__":
    main()

Service Example

This example demonstrates how to create a node that provides a service.

"""Example service node for the Python ROS engine."""

from pyros2 import Node


class AddTwoIntsService:
    """Mock service type for addition."""

    class Request:
        """Request containing two integers to add."""

        def __init__(self, a=0, b=0):
            """Initialize with two integers."""
            self.a = a
            self.b = b

    class Response:
        """Response containing the sum of two integers."""

        def __init__(self, sum=0):
            """Initialize with sum value."""
            self.sum = sum


class ServiceNode(Node):
    """Example service node."""

    def __init__(self):
        """Initialize the service node."""
        super().__init__("service_node")
        self.service = self.create_service(
            AddTwoIntsService, "/add_two_ints", self.add_two_ints_callback
        )

    def add_two_ints_callback(self, request):
        """Handle service requests for adding two integers."""
        response = AddTwoIntsService.Response()
        response.sum = request.a + request.b
        self.get_logger().info(f"Adding {request.a} + {request.b} = {response.sum}")
        return response

    def get_logger(self):
        """Get a simple logger for demonstration."""

        class Logger:
            def info(self, message):
                print(f"[INFO] {message}")

        return Logger()


def main():
    """Run the service node."""
    node = ServiceNode()

    try:
        print("Service node running... Press Ctrl+C to stop.")
        node.spin()
    except KeyboardInterrupt:
        print("Shutting down service node...")
        node.destroy_node()


if __name__ == "__main__":
    main()

Client Example

This example demonstrates how to create a node that calls a service.

"""Example client node for the Python ROS engine."""

import time

from pyros2 import Node


class AddTwoIntsService:
    """Service definition for adding two integers."""

    class Request:
        """Request for adding two integers."""

        def __init__(self, a=0, b=0):
            """Initialize the request with two integers."""
            self.a = a
            self.b = b

    class Response:
        """Response containing the sum of two integers."""

        def __init__(self, sum=0):
            """Initialize the response with the sum."""
            self.sum = sum


class ClientNode(Node):
    """Example client node."""

    def __init__(self):
        """Initialize the client node."""
        super().__init__("client_node")
        self.client = self.create_client(AddTwoIntsService, "/add_two_ints")

    def send_request(self, a, b):
        """Send a request to the service."""
        request = AddTwoIntsService.Request()
        request.a = a
        request.b = b

        try:
            response = self.client.call(request)
            self.get_logger().info(f"Result: {a} + {b} = {response.sum}")
            return response
        except Exception as e:
            self.get_logger().error(f"Service call failed: {e}")
            return None

    def get_logger(self):
        """Get a simple logger for demonstration."""

        class Logger:
            def info(self, message):
                print(f"[INFO] {message}")

            def error(self, message):
                print(f"[ERROR] {message}")

        return Logger()


def main():
    """Run the client example."""
    node = ClientNode()

    try:
        # Send a few requests
        for i in range(5):
            node.send_request(i, i + 1)
            time.sleep(1)
    except KeyboardInterrupt:
        print("Shutting down client node...")
        node.destroy_node()


if __name__ == "__main__":
    main()

Bridge Example

This example demonstrates how to use the bridge functionality to discover native ROS nodes, topics, and services.

"""Example bridge usage for the Python ROS engine."""

from pyros2 import Bridge
from pyros2.exceptions import BridgeConnectionError


def main():
    """Run the bridge example."""
    # Create bridge connection
    bridge = Bridge()

    try:
        # Connect to ROS master
        bridge.connect()
        print("Connected to ROS master successfully!")

        # Discover ROS nodes
        nodes = bridge.discover_ros_nodes()
        print(f"Discovered {len(nodes)} ROS nodes:")
        for node in nodes:
            print(f"  - {node}")

        # Discover ROS topics
        topics = bridge.discover_ros_topics()
        print(f"Discovered {len(topics)} ROS topics:")
        for topic in topics:
            print(f"  - {topic['name']} (type: {topic['type']})")

        # Discover ROS services
        services = bridge.discover_ros_services()
        print(f"Discovered {len(services)} ROS services:")
        for service in services:
            print(f"  - {service['name']} (providers: {service['providers']})")

    except BridgeConnectionError as e:
        print(f"Failed to connect to ROS master: {e}")
        print("Make sure ROS master is running on localhost:11311")
    except Exception as e:
        print(f"An error occurred: {e}")


if __name__ == "__main__":
    main()

Launch System Examples

These examples demonstrate how to use the launch system to manage multiple nodes.

Launch Description Example

This example shows how to create a launch description programmatically:

"""Example showing how to use the launch system programmatically."""

import time

from pyros2 import LaunchDescription, LaunchSystem
from pyros2.message import String


class SimplePublisherNode:
    """A simple publisher node for demonstration."""

    def __init__(self):
        """Initialize the publisher node."""
        from pyros2 import Node

        self.node = Node("simple_publisher")
        self.publisher = self.node.create_publisher(String, "/simple_topic")
        self.counter = 0

    def publish_message(self):
        """Publish a message to the topic."""
        msg = String()
        msg.data = f"Simple message {self.counter}"
        self.publisher.publish(msg)
        self.counter += 1


class SimpleSubscriberNode:
    """A simple subscriber node for demonstration."""

    def __init__(self):
        """Initialize the subscriber node."""
        from pyros2 import Node

        self.node = Node("simple_subscriber")
        self.subscription = self.node.create_subscription(
            String, "/simple_topic", self.message_callback
        )

    def message_callback(self, msg):
        """Handle received messages."""
        print(f"Received message: {msg.data}")


def main():
    """Demonstrate launch system usage."""
    # Method 1: Using LaunchSystem directly
    print("Method 1: Using LaunchSystem directly")
    launch_system = LaunchSystem()

    # Add nodes
    launch_system.add_node("publisher", SimplePublisherNode)
    launch_system.add_node("subscriber", SimpleSubscriberNode)

    # Print system status
    launch_system.print_system_status()

    # Start nodes (in a real implementation, this would run them)
    print("Starting nodes...")
    try:
        # Run for a few seconds
        start_time = time.time()
        while time.time() - start_time < 5:
            # Publish a message every second
            publisher_node = launch_system.nodes["publisher"]
            publisher_node.publish_message()
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        launch_system.shutdown()

    print("\n" + "=" * 50 + "\n")

    # Method 2: Using LaunchDescription
    print("Method 2: Using LaunchDescription")
    launch_description = LaunchDescription()

    # Add nodes
    launch_description.add_node(SimplePublisherNode)
    launch_description.add_node(SimpleSubscriberNode)

    # Execute the launch description
    launch_system = launch_description.execute()

    # Print system status
    launch_system.print_system_status()


if __name__ == "__main__":
    main()

Launch File Example

This example shows how to create a launch file that can be executed from the command line:

"""Example launch file for the Python ROS engine."""

import os
import sys

from publisher_example import PublisherNode
from subscriber_example import SubscriberNode

from pyros2 import LaunchDescription

# Add the examples directory to the path so we can import the example nodes
examples_dir = os.path.join(os.path.dirname(__file__), "..", "..", "examples")
sys.path.insert(0, examples_dir)


def generate_launch_description():
    """
    Generate a launch description with a publisher and subscriber node.

    Returns:
        LaunchDescription: The launch description
    """
    launch_description = LaunchDescription()

    # Add publisher node
    launch_description.add_node(PublisherNode)

    # Add subscriber node
    launch_description.add_node(SubscriberNode)

    return launch_description


# Alternative way to define a launch description
launch_description = LaunchDescription()

# Add publisher node
launch_description.add_node(PublisherNode)

# Add subscriber node
launch_description.add_node(SubscriberNode)

Complete Example Project

In addition to the basic examples, we've included a complete example project in the example_project/ directory that demonstrates how to build a robot system with multiple interconnected nodes.

The complete example project includes: - Configuration files using Hydra for flexible parameter management - Publisher, subscriber, service, and client nodes - A launch system to run all nodes together as a cohesive robot system

You can explore this complete example to understand how to structure a real-world application using the Python ROS Engine.

Running Examples

To run any of these examples, make sure you have installed the Python ROS Engine:

pip install python-ros-engine

Then you can run the examples directly with Python:

python examples/publisher_example.py
python examples/subscriber_example.py
python examples/service_example.py
python examples/client_example.py
python examples/bridge_example.py
python examples/launch_system_example.py

To run the launch file example:

python -m pyros2.launch_cli examples/launch_example.py

To just check the system status without starting the nodes:

python -m pyros2.launch_cli examples/launch_example.py --status

For the complete example project:

python example_project/nodes/publisher_node.py
python example_project/nodes/subscriber_node.py
python example_project/nodes/service_node.py
python example_project/nodes/client_node.py
python example_project/launch/robot_system.py

Note: For the bridge example to work, you need to have a ROS master running on localhost:11311.