Skip to main content

Safety Principles in ROS 2 for Humanoid Robotics

Safety is the cornerstone of humanoid robotics development. In environments shared with humans, robots must operate with fail-safe mechanisms that prioritize human safety above all other considerations. This section details the safety principles and implementation strategies within ROS 2 for humanoid robotics applications.

Core Safety Principles

Safety-First Design

All implementations must prioritize human safety and environmental protection above all other considerations. Fail-safes must be implemented at every level, and any potential safety risk identified must halt autonomous operations until resolved.

Modular Safety Architecture

Safety systems must be designed as independent modules with well-defined interfaces, allowing for individual testing and validation while maintaining clear boundaries between functional domains.

Deterministic Safety Communication

All safety-related communication via ROS 2 topics/services must be designed with guaranteed message delivery. Communication protocols must include monitoring and logging capabilities to ensure safety-critical information is reliably transmitted.

Safety Implementation Patterns

Emergency Stop System

import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
import threading

class SafeHumanoidController(Node):
    def __init__(self):
        super().__init__('safe_humanoid_controller')
        
        # Initialize safety state
        self.emergency_stop_active = False
        self.safety_lock = threading.Lock()
        
        # Create emergency stop publisher and subscriber
        self.emergency_stop_pub = self.create_publisher(Bool, '/emergency_stop', 10)
        self.emergency_stop_sub = self.create_subscription(
            Bool, 
            '/emergency_stop', 
            self.emergency_stop_callback, 
            10
        )
        
        # Create sensor subscriber for collision detection
        self.laser_sub = self.create_subscription(
            LaserScan,
            '/laser_scan',
            self.laser_callback,
            10
        )
        
        # Create command publisher with safety check
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        
        # Create safety monitoring timer
        self.safety_timer = self.create_timer(0.1, self.safety_monitor)
        
        self.get_logger().info('Safe Humanoid Controller initialized')

    def emergency_stop_callback(self, msg):
        with self.safety_lock:
            self.emergency_stop_active = msg.data
            if self.emergency_stop_active:
                self.get_logger().error('EMERGENCY STOP ACTIVATED')
                # Immediately stop all movement
                self.publish_stop_command()
            else:
                self.get_logger().info('Emergency stop deactivated')

    def laser_callback(self, msg):
        # Check for obstacles in close proximity
        min_distance = float('inf')
        for range_val in msg.ranges:
            if range_val < min_distance and not float('inf'):
                min_distance = range_val
        
        # If obstacle is too close, activate safety stop
        if min_distance < 0.5 and not self.emergency_stop_active:
            with self.safety_lock:
                self.emergency_stop_active = True
                self.get_logger().warn(f'OBSTACLE TOO CLOSE: {min_distance}m')
                self.publish_stop_command()
                
                # Publish emergency stop signal
                stop_msg = Bool()
                stop_msg.data = True
                self.emergency_stop_pub.publish(stop_msg)

    def safety_monitor(self):
        # Regular safety checks
        with self.safety_lock:
            if not self.emergency_stop_active:
                # Additional safety checks can be added here
                pass
            else:
                # Continue to enforce stop state
                self.publish_stop_command()

    def publish_stop_command(self):
        # Publish zero velocity to stop all movement
        stop_cmd = Twist()
        stop_cmd.linear.x = 0.0
        stop_cmd.linear.y = 0.0
        stop_cmd.linear.z = 0.0
        stop_cmd.angular.x = 0.0
        stop_cmd.angular.y = 0.0
        stop_cmd.angular.z = 0.0
        self.cmd_pub.publish(stop_cmd)

    def safe_command(self, twist_cmd):
        with self.safety_lock:
            if not self.emergency_stop_active:
                # Apply velocity limits
                max_linear = 0.5  # m/s
                max_angular = 0.5  # rad/s
                
                twist_cmd.linear.x = max(min(twist_cmd.linear.x, max_linear), -max_linear)
                twist_cmd.angular.z = max(min(twist_cmd.angular.z, max_angular), -max_angular)
                
                self.cmd_pub.publish(twist_cmd)
            else:
                self.get_logger().warn('Command blocked - emergency stop active')

    def deactivate_emergency_stop(self):
        with self.safety_lock:
            self.emergency_stop_active = False
            stop_msg = Bool()
            stop_msg.data = False
            self.emergency_stop_pub.publish(stop_msg)

Safety-First Communication

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy

class SafetyCriticalCommunicator(Node):
    def __init__(self):
        super().__init__('safety_critical_communicator')
        
        # Define QoS profile for safety-critical messages
        safety_qos = QoSProfile(
            depth=1,  # Only keep the most recent message
            reliability=ReliabilityPolicy.RELIABLE,  # Ensure delivery
            durability=DurabilityPolicy.TRANSIENT_LOCAL  # Persist messages
        )
        
        # Create safety-critical publisher
        self.safety_pub = self.create_publisher(Bool, '/safety_status', safety_qos)
        
        # Create safety-critical subscriber
        self.safety_sub = self.create_subscription(
            Bool,
            '/safety_command',
            self.safety_command_callback,
            safety_qos
        )
        
        # Log safety status regularly
        self.safety_timer = self.create_timer(1.0, self.log_safety_status)
    
    def safety_command_callback(self, msg):
        # Handle safety commands with highest priority
        if msg.data:
            self.emergency_action()
        else:
            self.normal_operation()
    
    def log_safety_status(self):
        # Log safety status for auditability
        status_msg = Bool()
        status_msg.data = self.is_system_safe()
        self.safety_pub.publish(status_msg)
        self.get_logger().info(f'Safety Status: {"SAFE" if status_msg.data else "UNSAFE"}')

Safety Validation

Unit Testing Safety Functions

import unittest
from rclpy.time import Time
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist

class TestSafetyFunctions(unittest.TestCase):
    def setUp(self):
        # Setup test environment
        self.controller = SafeHumanoidController()
    
    def test_emergency_stop_activation(self):
        # Test that emergency stop properly stops movement
        emergency_msg = Bool()
        emergency_msg.data = True
        
        # Simulate receiving emergency stop
        self.controller.emergency_stop_callback(emergency_msg)
        
        # Check that emergency stop is active
        self.assertTrue(self.controller.emergency_stop_active)
        
        # Try to send a movement command
        test_cmd = Twist()
        test_cmd.linear.x = 1.0
        self.controller.safe_command(test_cmd)
        
        # Verify the command was blocked
        # (Check that no movement occurred)
    
    def test_collision_detection_stop(self):
        # Create a LaserScan message with close obstacle
        scan_msg = LaserScan()
        scan_msg.ranges = [0.3] * 360  # All readings show obstacle at 0.3m
        
        # Process the scan (should trigger safety stop)
        self.controller.laser_callback(scan_msg)
        
        # Verify emergency stop is activated
        self.assertTrue(self.controller.emergency_stop_active)

Lifecycle Nodes for Safe Initialization

Lifecycle nodes provide a structured approach to initialization, activation, and shutdown that enhances safety:

  1. Unconfigured State: Node is created but not initialized
  2. Inactive State: Node is configured but not active
  3. Active State: Node is fully operational
  4. Finalized State: Node is shut down safely

This approach ensures that all safety checks pass before the robot becomes operational.

Quality of Service (QoS) Settings for Safety

Proper QoS settings ensure safety-critical messages are delivered reliably:

  • Reliability: Use RELIABLE for safety messages
  • Durability: Use TRANSIENT_LOCAL for safety parameters
  • Deadline: Set appropriate deadlines for safety-critical messages
  • Liveliness: Monitor that safety systems remain active

Conclusion

Safety implementation in ROS 2 requires a multi-layered approach combining emergency stop systems, collision detection, safe communication patterns, and proper validation. By following these principles and implementing the patterns shown above, developers can create humanoid robotics systems that operate safely in human environments while maintaining the functionality required for complex robotic behaviors.

All safety implementations should be thoroughly tested in simulation before deployment to physical systems, following the simulation-to-reality transfer principle that is central to safe humanoid robotics development.

💬