Usage Guide¶
Installation¶
Install the package using pip:
Or install in development mode:
Node Lifecycle¶
The Python ROS Engine follows a node lifecycle similar to ROS2:
graph TD A[Node Creation] --> B[Configuration] B --> C[Activation] C --> D[Running] D --> E{Shutdown?} E -->|Yes| F[Cleanup] E -->|No| D F --> G[Node Destruction]
Publisher/Subscriber Communication¶
The publisher/subscriber pattern enables asynchronous one-to-many communication between nodes:
graph LR A[Publishers] -- Messages --> B[(Topic)] B -- Messages --> C[Subscribers] B -- Messages --> D[Subscribers]
Here's a sequence diagram showing the message flow between publishers and subscribers:
sequenceDiagram participant Publisher participant Topic participant Subscriber1 participant Subscriber2 Publisher->>Topic: Publish Message Topic->>Subscriber1: Deliver Message Topic->>Subscriber2: Deliver Message
Basic Usage¶
Creating a Node¶
from pyros2 import Node
node = Node("my_node")
# Add publishers, subscribers, services, etc.
node.spin()
Publisher¶
from pyros2 import Node
from pyros2.message import String
class MyNode(Node):
def __init__(self):
super().__init__("publisher_node")
self.publisher = self.create_publisher(String, "/my_topic")
self.timer = self.create_timer(1.0, self.publish_message)
def publish_message(self):
msg = String()
msg.data = "Hello World!"
self.publisher.publish(msg)
node = MyNode()
node.spin()
Subscriber¶
from pyros2 import Node
from pyros2.message import String
class MyNode(Node):
def __init__(self):
super().__init__("subscriber_node")
self.subscription = self.create_subscription(
String, "/my_topic", self.message_callback)
def message_callback(self, msg):
print(f"Received: {msg.data}")
node = MyNode()
node.spin()
Service¶
from pyros2 import Node
class AddTwoInts:
class Request:
def __init__(self, a=0, b=0):
self.a = a
self.b = b
class Response:
def __init__(self, sum=0):
self.sum = sum
class MyNode(Node):
def __init__(self):
super().__init__("service_node")
self.service = self.create_service(
AddTwoInts, "/add_two_ints", self.add_two_ints_callback)
def add_two_ints_callback(self, request):
response = AddTwoInts.Response()
response.sum = request.a + request.b
return response
node = MyNode()
node.spin()
Client¶
from pyros2 import Node
import time
class AddTwoInts:
class Request:
def __init__(self, a=0, b=0):
self.a = a
self.b = b
class Response:
def __init__(self, sum=0):
self.sum = sum
class MyNode(Node):
def __init__(self):
super().__init__("client_node")
self.client = self.create_client(AddTwoInts, "/add_two_ints")
# Wait for service to be available
while not self.client.service_is_ready():
print("Waiting for service...")
time.sleep(1)
def send_request(self, a, b):
request = AddTwoInts.Request()
request.a = a
request.b = b
response = self.client.call(request)
return response
node = MyNode()
# Send requests as needed
result = node.send_request(3, 4)
print(f"Result: {result.sum}")
Here's a sequence diagram showing the service/client request-response pattern:
sequenceDiagram participant Client participant Service Client->>Service: Request (a=3, b=4) Service->>Client: Response (sum=7)
Here's a sequence diagram showing the service/client request-response pattern:
Advanced Usage¶
Quality of Service (QoS) Profiles¶
The Python ROS Engine supports QoS profiles for publishers and subscribers:
graph TD A[QoS Profiles] --> B[Reliability] A --> C[Durability] A --> D[Depth] B --> B1[Reliable - Guaranteed delivery] B --> B2[Best Effort - No delivery guarantees] C --> C1[Transient Local - Replays last message] C --> C2[Volatile - No message replay] D --> D1[Queue Depth - Message history size]
from pyros2 import Node, QoSProfile
from pyros2.message import String
class MyNode(Node):
def __init__(self):
super().__init__("qos_example_node")
# Create a custom QoS profile
qos_profile = QoSProfile()
qos_profile.reliability = "reliable" # or "best_effort"
qos_profile.durability = "transient_local" # or "volatile"
qos_profile.depth = 10
# Use the QoS profile with publisher/subscriber
self.publisher = self.create_publisher(String, "/my_topic", qos_profile)
self.subscription = self.create_subscription(String, "/my_topic", self.callback, qos_profile)
def callback(self, msg):
print(f"Received: {msg.data}")
Parameter Handling¶
Nodes can handle parameters with callbacks:
graph TD A[Parameter Declaration] --> B[Parameter Storage] B --> C[Parameter Access] C --> D[Parameter Modification] D --> E[Callback Trigger] E --> F[Execute Callback Function]
from pyros2 import Node
class MyNode(Node):
def __init__(self):
super().__init__("parameter_node")
# Declare a parameter with a callback
self.declare_parameter("my_param", "default_value", self.param_callback)
def param_callback(self, param_name, old_value, new_value):
print(f"Parameter {param_name} changed from {old_value} to {new_value}")
Configuration with Hydra¶
Create a config.yaml file:
node:
name: my_node
namespace: /
publisher:
topic: my_topic
qos:
reliability: reliable
durability: volatile
depth: 10
subscriber:
topic: my_topic
qos:
reliability: reliable
durability: volatile
depth: 10
service:
name: add_two_ints
qos:
reliability: reliable
durability: volatile
depth: 10
Then use it in your code:
from pyros2 import Node
from config.hydra_config import load_config
config = load_config("config.yaml")
node = Node(config.node.name)
Complete Example Project¶
We've included a complete example project in the repository that demonstrates how to build a robot system with multiple nodes. The example project includes:
- Configuration files using Hydra
- Publisher, subscriber, service, and client nodes
- A launch system to run all nodes together
You can find the complete example in the example_project/
directory of the repository.
Running the Complete Example¶
To run the complete example project:
-
Clone the repository:
-
Install the package:
-
Run individual nodes:
-
Run the complete robot system:
Running Examples¶
To run the examples:
python examples/publisher_example.py
python examples/subscriber_example.py
python examples/service_example.py
python examples/client_example.py
python examples/bridge_example.py
Note: For the bridge example to work, you need to have a ROS master running on localhost:11311.