name: create-example description: Create new publish/subscribe examples for ROS2 message types. Use when the user wants to add a new example, create pub/sub for a new message type, or add trajectory/sensor/geometry message examples.
Create Example
This skill guides you through creating new publish/subscribe examples for the zenoh-ros2-sdk.
When to Use
- User wants to add a new example for a message type
- User wants to create publisher/subscriber for a new ROS2 message
- Adding support for trajectory, sensor, geometry, or other message types
Project Structure
Examples are in examples/ with numbered filenames:
- Odd numbers: publishers (e.g.,
11_publish_*.py) - Even numbers: subscribers (e.g.,
12_subscribe_*.py)
1. Determine Next Example Numbers
Check existing examples in examples/ directory. Use the next available odd number for publisher, next even for subscriber.
2. Look Up Message Definition
Find the ROS2 message definition. Key resources:
- https://docs.ros2.org/foxy/api/{package}/msg/{MessageName}.html
- Common packages:
std_msgs,geometry_msgs,sensor_msgs,trajectory_msgs,nav_msgs
3. Publisher Template
Create examples/{NN}_publish_{msg_name}.py:
#!/usr/bin/env python3
"""
{NN} - Publish {MessageName} Messages
Demonstrates how to publish {package}/msg/{MessageName} messages to a ROS2 topic.
{Brief description of what this message type is used for.}
"""
import time
import numpy as np
from zenoh_ros2_sdk import ROS2Publisher, get_message_class
def main():
print("{NN} - Publish {MessageName} Messages")
print("Publishing to /{topic_name} topic...\n")
# Get message classes for easy object creation
Header = get_message_class("std_msgs/msg/Header")
Time = get_message_class("builtin_interfaces/msg/Time")
{MessageName} = get_message_class("{package}/msg/{MessageName}")
# Add nested message classes as needed
if not all([Header, Time, {MessageName}]):
print("Error: Failed to get message classes")
return
# Create publisher
pub = ROS2Publisher(
topic="/{topic_name}",
msg_type="{package}/msg/{MessageName}"
)
try:
print("Publishing {MessageName} messages...")
print("Press Ctrl+C to stop\n")
counter = 0
while True:
# Get current time
now = time.time()
sec = int(now)
nanosec = int((now - sec) * 1e9)
# Create header with timestamp
header = Header(
stamp=Time(sec=sec, nanosec=nanosec),
frame_id="base_link"
)
# Create and populate message fields
# Note: Arrays need to be numpy arrays for proper serialization
# Publish message - pass fields as kwargs matching message structure
pub.publish(
header=header,
# ... other fields
)
print(f"Published {MessageName} {counter}")
counter += 1
time.sleep(0.1) # 10 Hz
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
pub.close()
print("Publisher closed")
if __name__ == "__main__":
main()
4. Subscriber Template
Create examples/{NN+1}_subscribe_{msg_name}.py:
#!/usr/bin/env python3
"""
{NN+1} - Subscribe to {MessageName} Messages
Demonstrates how to subscribe to a ROS2 topic and receive {package}/msg/{MessageName} messages.
{Brief description of what this message type is used for.}
"""
import time
from zenoh_ros2_sdk import ROS2Subscriber
def main():
print("{NN+1} - Subscribe to {MessageName} Messages")
print("Subscribing to /{topic_name} topic...\n")
message_count = [0] # Use list to allow modification in nested function
def on_message(msg):
"""Callback function called when a {MessageName} message is received."""
message_count[0] += 1
# Extract information from the message
timestamp = msg.header.stamp
frame_id = msg.header.frame_id
# ... extract other fields
# Display received data
print(f"\n--- {MessageName} Message #{message_count[0]} ---")
print(f"Timestamp: {timestamp.sec}.{timestamp.nanosec:09d}")
print(f"Frame ID: {frame_id}")
# ... print other fields
# Create subscriber
sub = ROS2Subscriber(
topic="/{topic_name}",
msg_type="{package}/msg/{MessageName}",
callback=on_message
)
try:
print("Waiting for {MessageName} messages... Press Ctrl+C to stop")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
sub.close()
print("Subscriber closed")
if __name__ == "__main__":
main()
5. Update README
Add entries to examples/README.md:
### [`{NN}_publish_{msg_name}.py`]({NN}_publish_{msg_name}.py)
Demonstrates how to publish `{package}/msg/{MessageName}` messages. {Description}.
**Usage:**
\```bash
python3 examples/{NN}_publish_{msg_name}.py
\```
### [`{NN+1}_subscribe_{msg_name}.py`]({NN+1}_subscribe_{msg_name}.py)
Demonstrates how to subscribe to `{package}/msg/{MessageName}` messages. {Description}.
**Usage:**
\```bash
python3 examples/{NN+1}_subscribe_{msg_name}.py
\```
6. Key Patterns
Arrays
Use numpy arrays for proper CDR serialization:
positions = np.array([0.0, 0.1, 0.2], dtype=np.float64)
Nested Messages
Get nested message classes and construct them:
Point = get_message_class("trajectory_msgs/msg/JointTrajectoryPoint")
Duration = get_message_class("builtin_interfaces/msg/Duration")
point = Point(
positions=np.array([0.0, 0.1], dtype=np.float64),
time_from_start=Duration(sec=1, nanosec=0)
)
Common Topic Names
/joint_states- JointState/cmd_vel- Twist/joint_trajectory- JointTrajectory/odom- Odometry/tf- TransformStamped