Examples¶
The Python ROS Engine comes with several examples demonstrating its core functionality. These examples can be found in the examples/
directory of the repository.
System Architecture¶
The examples demonstrate various aspects of the Python ROS Engine architecture:
graph TD A[Examples System] --> B[Publisher Node] A --> C[Subscriber Node] A --> D[Service Node] A --> E[Client Node] A --> F[Bridge Node] A --> G[Launch System] B --> H[(Topics)] C --> H E --> I[(Services)] D --> I
Here's a sequence diagram showing the interaction between the publisher and subscriber nodes in the examples:
sequenceDiagram participant PublisherNode participant Topic participant SubscriberNode PublisherNode->>Topic: Publish message Topic->>SubscriberNode: Deliver message SubscriberNode->>SubscriberNode: Process message in callback
And here's a sequence diagram showing the service/client interaction:
sequenceDiagram participant ClientNode participant ServiceNode ClientNode->>ServiceNode: Service request ServiceNode->>ServiceNode: Process request in callback ServiceNode->>ClientNode: Return response ClientNode->>ClientNode: Handle response
Publisher Example¶
This example demonstrates how to create a node with a publisher that sends messages to a topic.
"""Example publisher node for the Python ROS engine."""
import time
from pyros2 import Node
from pyros2.message import String
class PublisherNode(Node):
"""Example publisher node."""
def __init__(self):
"""Initialize the publisher node."""
super().__init__("publisher_node")
self.publisher = self.create_publisher(String, "/example_topic")
self.counter = 0
def publish_message(self):
"""Publish a message to the topic."""
msg = String()
msg.data = f"Hello World! Counter: {self.counter}"
self.publisher.publish(msg)
self.get_logger().info(f"Published: {msg.data}")
self.counter += 1
def get_logger(self):
"""Get a simple logger for demonstration."""
class Logger:
def info(self, message):
print(f"[INFO] {message}")
return Logger()
def main():
"""Run the publisher node."""
node = PublisherNode()
try:
while True:
node.publish_message()
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down publisher node...")
node.destroy_node()
if __name__ == "__main__":
main()
Subscriber Example¶
This example demonstrates how to create a node with a subscriber that listens to messages on a topic.
"""Example subscriber node for the Python ROS engine."""
from pyros2 import Node
from pyros2.message import String
class SubscriberNode(Node):
"""Example subscriber node."""
def __init__(self):
"""Initialize the subscriber node."""
super().__init__("subscriber_node")
self.subscription = self.create_subscription(
String, "/example_topic", self.message_callback
)
def message_callback(self, msg):
"""Handle received messages."""
self.get_logger().info(f"Received: {msg.data}")
def get_logger(self):
"""Create a simple logger for demonstration."""
class Logger:
def info(self, message):
print(f"[INFO] {message}")
return Logger()
def main():
"""Run the subscriber node."""
node = SubscriberNode()
try:
print("Subscriber node running... Press Ctrl+C to stop.")
node.spin()
except KeyboardInterrupt:
print("Shutting down subscriber node...")
node.destroy_node()
if __name__ == "__main__":
main()
Service Example¶
This example demonstrates how to create a node that provides a service.
"""Example service node for the Python ROS engine."""
from pyros2 import Node
class AddTwoIntsService:
"""Mock service type for addition."""
class Request:
"""Request containing two integers to add."""
def __init__(self, a=0, b=0):
"""Initialize with two integers."""
self.a = a
self.b = b
class Response:
"""Response containing the sum of two integers."""
def __init__(self, sum=0):
"""Initialize with sum value."""
self.sum = sum
class ServiceNode(Node):
"""Example service node."""
def __init__(self):
"""Initialize the service node."""
super().__init__("service_node")
self.service = self.create_service(
AddTwoIntsService, "/add_two_ints", self.add_two_ints_callback
)
def add_two_ints_callback(self, request):
"""Handle service requests for adding two integers."""
response = AddTwoIntsService.Response()
response.sum = request.a + request.b
self.get_logger().info(f"Adding {request.a} + {request.b} = {response.sum}")
return response
def get_logger(self):
"""Get a simple logger for demonstration."""
class Logger:
def info(self, message):
print(f"[INFO] {message}")
return Logger()
def main():
"""Run the service node."""
node = ServiceNode()
try:
print("Service node running... Press Ctrl+C to stop.")
node.spin()
except KeyboardInterrupt:
print("Shutting down service node...")
node.destroy_node()
if __name__ == "__main__":
main()
Client Example¶
This example demonstrates how to create a node that calls a service.
"""Example client node for the Python ROS engine."""
import time
from pyros2 import Node
class AddTwoIntsService:
"""Service definition for adding two integers."""
class Request:
"""Request for adding two integers."""
def __init__(self, a=0, b=0):
"""Initialize the request with two integers."""
self.a = a
self.b = b
class Response:
"""Response containing the sum of two integers."""
def __init__(self, sum=0):
"""Initialize the response with the sum."""
self.sum = sum
class ClientNode(Node):
"""Example client node."""
def __init__(self):
"""Initialize the client node."""
super().__init__("client_node")
self.client = self.create_client(AddTwoIntsService, "/add_two_ints")
def send_request(self, a, b):
"""Send a request to the service."""
request = AddTwoIntsService.Request()
request.a = a
request.b = b
try:
response = self.client.call(request)
self.get_logger().info(f"Result: {a} + {b} = {response.sum}")
return response
except Exception as e:
self.get_logger().error(f"Service call failed: {e}")
return None
def get_logger(self):
"""Get a simple logger for demonstration."""
class Logger:
def info(self, message):
print(f"[INFO] {message}")
def error(self, message):
print(f"[ERROR] {message}")
return Logger()
def main():
"""Run the client example."""
node = ClientNode()
try:
# Send a few requests
for i in range(5):
node.send_request(i, i + 1)
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down client node...")
node.destroy_node()
if __name__ == "__main__":
main()
Bridge Example¶
This example demonstrates how to use the bridge functionality to discover native ROS nodes, topics, and services.
"""Example bridge usage for the Python ROS engine."""
from pyros2 import Bridge
from pyros2.exceptions import BridgeConnectionError
def main():
"""Run the bridge example."""
# Create bridge connection
bridge = Bridge()
try:
# Connect to ROS master
bridge.connect()
print("Connected to ROS master successfully!")
# Discover ROS nodes
nodes = bridge.discover_ros_nodes()
print(f"Discovered {len(nodes)} ROS nodes:")
for node in nodes:
print(f" - {node}")
# Discover ROS topics
topics = bridge.discover_ros_topics()
print(f"Discovered {len(topics)} ROS topics:")
for topic in topics:
print(f" - {topic['name']} (type: {topic['type']})")
# Discover ROS services
services = bridge.discover_ros_services()
print(f"Discovered {len(services)} ROS services:")
for service in services:
print(f" - {service['name']} (providers: {service['providers']})")
except BridgeConnectionError as e:
print(f"Failed to connect to ROS master: {e}")
print("Make sure ROS master is running on localhost:11311")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
Launch System Examples¶
These examples demonstrate how to use the launch system to manage multiple nodes.
Launch Description Example¶
This example shows how to create a launch description programmatically:
"""Example showing how to use the launch system programmatically."""
import time
from pyros2 import LaunchDescription, LaunchSystem
from pyros2.message import String
class SimplePublisherNode:
"""A simple publisher node for demonstration."""
def __init__(self):
"""Initialize the publisher node."""
from pyros2 import Node
self.node = Node("simple_publisher")
self.publisher = self.node.create_publisher(String, "/simple_topic")
self.counter = 0
def publish_message(self):
"""Publish a message to the topic."""
msg = String()
msg.data = f"Simple message {self.counter}"
self.publisher.publish(msg)
self.counter += 1
class SimpleSubscriberNode:
"""A simple subscriber node for demonstration."""
def __init__(self):
"""Initialize the subscriber node."""
from pyros2 import Node
self.node = Node("simple_subscriber")
self.subscription = self.node.create_subscription(
String, "/simple_topic", self.message_callback
)
def message_callback(self, msg):
"""Handle received messages."""
print(f"Received message: {msg.data}")
def main():
"""Demonstrate launch system usage."""
# Method 1: Using LaunchSystem directly
print("Method 1: Using LaunchSystem directly")
launch_system = LaunchSystem()
# Add nodes
launch_system.add_node("publisher", SimplePublisherNode)
launch_system.add_node("subscriber", SimpleSubscriberNode)
# Print system status
launch_system.print_system_status()
# Start nodes (in a real implementation, this would run them)
print("Starting nodes...")
try:
# Run for a few seconds
start_time = time.time()
while time.time() - start_time < 5:
# Publish a message every second
publisher_node = launch_system.nodes["publisher"]
publisher_node.publish_message()
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
launch_system.shutdown()
print("\n" + "=" * 50 + "\n")
# Method 2: Using LaunchDescription
print("Method 2: Using LaunchDescription")
launch_description = LaunchDescription()
# Add nodes
launch_description.add_node(SimplePublisherNode)
launch_description.add_node(SimpleSubscriberNode)
# Execute the launch description
launch_system = launch_description.execute()
# Print system status
launch_system.print_system_status()
if __name__ == "__main__":
main()
Launch File Example¶
This example shows how to create a launch file that can be executed from the command line:
"""Example launch file for the Python ROS engine."""
import os
import sys
from publisher_example import PublisherNode
from subscriber_example import SubscriberNode
from pyros2 import LaunchDescription
# Add the examples directory to the path so we can import the example nodes
examples_dir = os.path.join(os.path.dirname(__file__), "..", "..", "examples")
sys.path.insert(0, examples_dir)
def generate_launch_description():
"""
Generate a launch description with a publisher and subscriber node.
Returns:
LaunchDescription: The launch description
"""
launch_description = LaunchDescription()
# Add publisher node
launch_description.add_node(PublisherNode)
# Add subscriber node
launch_description.add_node(SubscriberNode)
return launch_description
# Alternative way to define a launch description
launch_description = LaunchDescription()
# Add publisher node
launch_description.add_node(PublisherNode)
# Add subscriber node
launch_description.add_node(SubscriberNode)
Complete Example Project¶
In addition to the basic examples, we've included a complete example project in the example_project/
directory that demonstrates how to build a robot system with multiple interconnected nodes.
The complete example project includes: - Configuration files using Hydra for flexible parameter management - Publisher, subscriber, service, and client nodes - A launch system to run all nodes together as a cohesive robot system
You can explore this complete example to understand how to structure a real-world application using the Python ROS Engine.
Running Examples¶
To run any of these examples, make sure you have installed the Python ROS Engine:
Then you can run the examples directly with Python:
python examples/publisher_example.py
python examples/subscriber_example.py
python examples/service_example.py
python examples/client_example.py
python examples/bridge_example.py
python examples/launch_system_example.py
To run the launch file example:
To just check the system status without starting the nodes:
For the complete example project:
python example_project/nodes/publisher_node.py
python example_project/nodes/subscriber_node.py
python example_project/nodes/service_node.py
python example_project/nodes/client_node.py
python example_project/launch/robot_system.py
Note: For the bridge example to work, you need to have a ROS master running on localhost:11311.