Skip to main content

Voice-to-Action Implementation with Whisper Integration

This section details the implementation of voice-to-action systems using Whisper for speech recognition and natural language processing in humanoid robotics applications.

Whisper Integration for Speech Recognition

Audio Processing Pipeline

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from audio_common_msgs.msg import AudioData
from std_msgs.msg import String
import numpy as np
import threading
import queue
import time

class WhisperAudioProcessor(Node):
    def __init__(self):
        super().__init__('whisper_audio_processor')
        
        # Subscriber for raw audio data
        self.audio_sub = self.create_subscription(
            AudioData,
            '/microphone/audio_raw',
            self.audio_callback,
            10
        )
        
        # Publisher for recognized text
        self.text_pub = self.create_publisher(String, '/recognized_text', 10)
        
        # Audio processing parameters
        self.sample_rate = 16000  # Whisper works best at 16kHz
        self.chunk_size = 1024  # Number of samples per processing chunk
        self.audio_buffer = queue.Queue()
        self.processing_thread = threading.Thread(target=self.process_audio_loop)
        self.processing_thread.daemon = True
        self.processing_thread.start()
        
        # Initialize Whisper model
        self.setup_whisper_model()
        
        self.get_logger().info('Whisper Audio Processor initialized')

    def setup_whisper_model(self):
        """Initialize Whisper model for speech recognition"""
        try:
            import whisper
            # Load a small model for real-time processing
            self.model = whisper.load_model("base.en")  # Change to "base" for multiple languages
            self.get_logger().info('Whisper model loaded successfully')
        except ImportError:
            self.get_logger().error('Whisper library not found. Install with: pip install openai-whisper')
            self.model = None
        except Exception as e:
            self.get_logger().error(f'Failed to load Whisper model: {e}')
            self.model = None

    def audio_callback(self, msg):
        """Receive audio data from microphone"""
        # Add audio data to the processing queue
        if not self.audio_buffer.full():
            self.audio_buffer.put(msg.data)

    def process_audio_loop(self):
        """Continuous loop for audio processing"""
        audio_chunk = b""
        
        while rclpy.ok():
            try:
                # Get audio data from queue
                if not self.audio_buffer.empty():
                    audio_data = self.audio_buffer.get_nowait()
                    audio_chunk += audio_data
                    
                    # Process when we have enough data
                    if len(audio_chunk) >= self.chunk_size * 2:  # 2 bytes per sample for 16-bit
                        # Convert bytes to numpy array
                        audio_np = np.frombuffer(audio_chunk, dtype=np.int16)
                        audio_np = audio_np.astype(np.float32) / 32768.0  # Normalize to [-1, 1]
                        
                        # Process with Whisper if model is available
                        if self.model:
                            recognized_text = self.process_with_whisper(audio_np)
                            if recognized_text.strip():
                                # Publish recognized text
                                text_msg = String()
                                text_msg.data = recognized_text
                                self.text_pub.publish(text_msg)
                                self.get_logger().info(f'Recognized: {recognized_text}')
                        
                        # Keep remaining audio data
                        samples_processed = min(len(audio_chunk), self.chunk_size * 2)
                        audio_chunk = audio_chunk[samples_processed:]
                else:
                    # Small delay to prevent busy-waiting
                    time.sleep(0.01)
            except queue.Empty:
                # No data in queue, continue loop
                time.sleep(0.01)
            except Exception as e:
                self.get_logger().error(f'Error in audio processing: {e}')

    def process_with_whisper(self, audio_data):
        """Process audio with Whisper model"""
        try:
            # Transcribe the audio
            result = self.model.transcribe(audio_data)
            return result["text"]
        except Exception as e:
            self.get_logger().error(f'Whisper processing error: {e}')
            return ""

Voice Command Recognition

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
import json

class VoiceCommandRecognizer(Node):
    def __init__(self):
        super().__init__('voice_command_recognizer')
        
        # Subscribe to recognized text
        self.text_sub = self.create_subscription(
            String,
            '/recognized_text',
            self.text_callback,
            10
        )
        
        # Publishers for robot commands
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        self.action_pub = self.create_publisher(String, '/robot_actions', 10)
        
        # Initialize command vocabulary
        self.setup_command_vocabulary()
        
        self.get_logger().info('Voice Command Recognizer initialized')

    def setup_command_vocabulary(self):
        """Setup recognized voice commands and their mappings"""
        self.command_mapping = {
            # Movement commands
            'forward': 'move_forward',
            'backward': 'move_backward',
            'left': 'turn_left',
            'right': 'turn_right',
            'stop': 'stop_robot',
            'go forward': 'move_forward',
            'go backward': 'move_backward',
            'turn left': 'turn_left',
            'turn right': 'turn_right',
            'halt': 'stop_robot',
            
            # Navigation commands
            'go to kitchen': 'navigate_kitchen',
            'go to living room': 'navigate_living_room',
            'go to bedroom': 'navigate_bedroom',
            'go home': 'return_home',
            
            # Action commands
            'pick up object': 'grasp_object',
            'wave': 'wave_hand',
            'greet': 'greet_person',
            'dance': 'perform_dance',
            'introduce yourself': 'self_introduction'
        }

    def text_callback(self, msg):
        """Process recognized text for commands"""
        text = msg.data.lower().strip()
        
        # Check if text contains a valid command
        command = self.recognize_command(text)
        
        if command:
            self.get_logger().info(f'Command recognized: {command}')
            
            # Execute the command
            self.execute_command(command, text)
        else:
            # Check if text contains keywords that might indicate intent
            potential_commands = self.extract_potential_commands(text)
            if potential_commands:
                self.get_logger().info(f'Potential commands found: {potential_commands}')
                for cmd in potential_commands:
                    self.execute_command(cmd, text)
            else:
                self.get_logger().info(f'No recognized command in: "{text}"')

    def recognize_command(self, text):
        """Recognize command from text using exact matches and fuzzy matching"""
        # Check for exact command matches
        for cmd_text, cmd_func in self.command_mapping.items():
            if cmd_text in text:
                return cmd_func
        
        # If no exact match, try to find the closest match
        import difflib
        possible_matches = difflib.get_close_matches(
            text, 
            self.command_mapping.keys(), 
            n=1, 
            cutoff=0.6  # 60% similarity required
        )
        
        if possible_matches:
            return self.command_mapping[possible_matches[0]]
        
        return None

    def extract_potential_commands(self, text):
        """Extract potential commands using keyword matching"""
        potential = []
        
        # Check for movement-related keywords
        movement_keywords = ['forward', 'back', 'left', 'right', 'stop', 'move', 'go', 'turn']
        for keyword in movement_keywords:
            if keyword in text:
                if keyword == 'forward':
                    potential.append('move_forward')
                elif keyword == 'back':
                    potential.append('move_backward')
                elif keyword == 'left':
                    potential.append('turn_left')
                elif keyword == 'right':
                    potential.append('turn_right')
                elif keyword == 'stop':
                    potential.append('stop_robot')
        
        return list(set(potential))  # Remove duplicates

    def execute_command(self, command, original_text):
        """Execute the recognized command"""
        try:
            # Validate command for safety before execution
            if not self.is_command_safe(command, original_text):
                self.get_logger().error(f'Command unsafe, not executing: {command}')
                return
            
            # Execute the appropriate function based on the command
            if command == 'move_forward':
                self.move_forward()
            elif command == 'move_backward':
                self.move_backward()
            elif command == 'turn_left':
                self.turn_left()
            elif command == 'turn_right':
                self.turn_right()
            elif command == 'stop_robot':
                self.stop_robot()
            elif command == 'navigate_kitchen':
                self.navigate_to_location('kitchen')
            elif command == 'navigate_living_room':
                self.navigate_to_location('living_room')
            elif command == 'navigate_bedroom':
                self.navigate_to_location('bedroom')
            elif command == 'return_home':
                self.return_to_home()
            else:
                # For other commands, send to action planner
                action_msg = String()
                action_msg.data = json.dumps({
                    'command': command,
                    'original_text': original_text,
                    'timestamp': self.get_clock().now().to_msg().sec
                })
                self.action_pub.publish(action_msg)

        except Exception as e:
            self.get_logger().error(f'Error executing command {command}: {e}')

    def is_command_safe(self, command, original_text):
        """Check if a command is safe to execute"""
        # Get laser scan data to check current environment
        try:
            # In a real implementation, we would get current laser scan
            # For this example, we'll assume a safe environment
            return True
        except:
            # If we can't check safety, default to safe
            return True

    def move_forward(self):
        """Move robot forward"""
        cmd = Twist()
        cmd.linear.x = 0.2  # m/s
        cmd.angular.z = 0.0
        self.cmd_pub.publish(cmd)

    def move_backward(self):
        """Move robot backward"""
        cmd = Twist()
        cmd.linear.x = -0.2  # m/s
        cmd.angular.z = 0.0
        self.cmd_pub.publish(cmd)

    def turn_left(self):
        """Turn robot left"""
        cmd = Twist()
        cmd.linear.x = 0.0
        cmd.angular.z = 0.3  # rad/s
        self.cmd_pub.publish(cmd)

    def turn_right(self):
        """Turn robot right"""
        cmd = Twist()
        cmd.linear.x = 0.0
        cmd.angular.z = -0.3  # rad/s
        self.cmd_pub.publish(cmd)

    def stop_robot(self):
        """Stop robot movement"""
        cmd = Twist()
        cmd.linear.x = 0.0
        cmd.angular.z = 0.0
        self.cmd_pub.publish(cmd)

    def navigate_to_location(self, location):
        """Navigate to named location"""
        # In a real implementation, this would interface with navigation system
        self.get_logger().info(f'Navigating to {location} (not implemented in this example)')
        
        # Publish action for higher-level navigation system
        action_msg = String()
        action_msg.data = json.dumps({
            'action': 'navigate',
            'location': location,
            'timestamp': self.get_clock().now().to_msg().sec
        })
        self.action_pub.publish(action_msg)

    def return_to_home(self):
        """Return to home position"""
        self.navigate_to_location('home_base')

Safety Override Integration

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan, Image
from builtin_interfaces.msg import Duration
import numpy as np

class SafetyOverrideSystem(Node):
    def __init__(self):
        super().__init__('safety_override_system')
        
        # Publishers
        self.safety_pub = self.create_publisher(Bool, '/safety_status', 10)
        self.emergency_stop_pub = self.create_publisher(Bool, '/emergency_stop', 10)
        self.cmd_override_pub = self.create_publisher(Twist, '/cmd_vel_override', 10)
        
        # Subscribers
        self.voice_cmd_sub = self.create_subscription(
            String,
            '/recognized_text',
            self.voice_command_callback,
            10
        )
        
        self.laser_sub = self.create_subscription(
            LaserScan,
            '/scan',
            self.scan_callback,
            10
        )
        
        self.cmd_sub = self.create_subscription(
            Twist,
            '/cmd_vel',
            self.cmd_callback,
            10
        )
        
        # Safety parameters
        self.safety_distance = 0.5  # meters
        self.enable_safety_overrides = True
        self.last_command_time = self.get_clock().now()
        self.command_timeout = Duration(sec=5, nanosec=0)  # 5 seconds
        self.emergency_stop_active = False
        
        # Timer for safety monitoring
        self.safety_timer = self.create_timer(0.1, self.safety_check)
        
        self.get_logger().info('Safety Override System initialized')

    def voice_command_callback(self, msg):
        """Handle incoming voice commands with safety validation"""
        # Update command time for timeout monitoring
        self.last_command_time = self.get_clock().now()
        
        # Check if command should be blocked for safety
        if self.emergency_stop_active:
            self.get_logger().warn('Voice command blocked: Emergency stop active')
            return
        
        # Additional safety checks can be added here
        command_text = msg.data.lower()
        
        # Check for dangerous commands
        if self.is_dangerous_command(command_text):
            self.get_logger().error(f'Dangerous command blocked: {command_text}')
            self.trigger_emergency_stop(f'Dangerous command: {command_text}')
            return
    
    def is_dangerous_command(self, text):
        """Check if the command is potentially dangerous"""
        dangerous_keywords = [
            'fire', 'explosion', 'harm', 'hurt', 'hit', 'break', 
            'destroy', 'damage', 'unsafe', 'dangerous'
        ]
        
        for keyword in dangerous_keywords:
            if keyword in text:
                return True
        return False

    def scan_callback(self, msg):
        """Process laser scan for safety"""
        if not self.enable_safety_overrides:
            return
            
        # Check for obstacles in path
        ranges = np.array(msg.ranges)
        valid_ranges = ranges[np.isfinite(ranges)]
        
        if len(valid_ranges) == 0:
            return
            
        min_distance = np.min(valid_ranges)
        
        # If too close to obstacle, trigger safety response
        if min_distance < self.safety_distance and not self.emergency_stop_active:
            self.get_logger().warn(f'Obstacle detected at {min_distance:.2f}m, triggering safety response')
            self.trigger_safety_stop()
    
    def cmd_callback(self, msg):
        """Monitor motion commands for safety"""
        if not self.enable_safety_overrides:
            return
            
        # Check if the command is safe based on current sensor data
        if abs(msg.linear.x) > 0.5 or abs(msg.angular.z) > 0.8:  # Too fast
            # Check if environment allows for such speed
            self.check_motion_safety(msg)

    def check_motion_safety(self, cmd_msg):
        """Check if motion command is safe given current environment"""
        # In a real implementation, this would analyze sensor data
        # to determine if motion is safe
        pass

    def safety_check(self):
        """Regular safety checks"""
        current_time = self.get_clock().now()
        
        # Check for command timeout
        if (current_time - self.last_command_time).nanoseconds / 1e9 > self.command_timeout.sec:
            self.get_logger().warn('No commands received within timeout, checking safety')
            # Could implement safety behavior here
        
        # Check safety status
        safety_ok = not self.emergency_stop_active
        safety_msg = Bool()
        safety_msg.data = safety_ok
        self.safety_pub.publish(safety_msg)
        
    def trigger_safety_stop(self):
        """Trigger safety stop without full emergency stop"""
        stop_cmd = Twist()
        self.cmd_override_pub.publish(stop_cmd)
        self.get_logger().warn('Safety stop triggered')

    def trigger_emergency_stop(self, reason="Safety violation"):
        """Trigger full emergency stop"""
        self.emergency_stop_active = True
        
        # Publish emergency stop command
        stop_msg = Bool()
        stop_msg.data = True
        self.emergency_stop_pub.publish(stop_msg)
        
        # Stop all motion
        stop_cmd = Twist()
        self.cmd_override_pub.publish(stop_cmd)
        
        self.get_logger().error(f'EMERGENCY STOP: {reason}')
        
        # Auto-reset safety after a delay if needed
        # In a real implementation, this would require manual reset
        # Or a more sophisticated recovery procedure

Integration with Higher-Level Planning

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped
import json

class VoiceToActionBridge(Node):
    def __init__(self):
        super().__init__('voice_to_action_bridge')
        
        # Subscribers
        self.text_sub = self.create_subscription(
            String,
            '/recognized_text',
            self.process_text_command,
            10
        )
        
        self.parsed_command_sub = self.create_subscription(
            String,
            '/parsed_command',
            self.process_parsed_command,
            10
        )
        
        # Publishers for different system components
        self.navigation_pub = self.create_publisher(PoseStamped, '/move_base_simple/goal', 10)
        self.manipulation_pub = self.create_publisher(String, '/manipulation_command', 10)
        self.action_plan_pub = self.create_publisher(String, '/action_plan', 10)
        
        # Initialize components
        self.command_parser = CommandParser()  # From previous section
        self.safety_validator = SafetyValidator(self)
        
        self.get_logger().info('Voice-to-Action Bridge initialized')

    def process_text_command(self, msg):
        """Process raw text command and convert to robot actions"""
        try:
            # Parse the natural language command
            parsed_command = self.command_parser.parse(msg.data)
            
            if parsed_command and parsed_command.confidence > 0.6:  # Confidence threshold
                # Validate safety
                is_safe, reason = self.safety_validator.validate_command(parsed_command)
                
                if is_safe:
                    # Convert to appropriate robot command based on intent
                    self.route_command(parsed_command)
                else:
                    self.get_logger().error(f'Command blocked for safety: {reason}')
            else:
                self.get_logger().info(f'Command not understood or low confidence: {msg.data}')
                
        except Exception as e:
            self.get_logger().error(f'Error processing text command: {e}')

    def process_parsed_command(self, msg):
        """Process already-parsed command"""
        try:
            command_data = json.loads(msg.data)
            
            # Create ParsedCommand object from JSON
            parsed_cmd = ParsedCommand(
                intent=command_data.get('intent', 'unknown'),
                objects=command_data.get('objects', []),
                locations=command_data.get('locations', []),
                actions=command_data.get('actions', []),
                confidence=command_data.get('confidence', 0.0),
                raw_text=command_data.get('raw_text', '')
            )
            
            # Route the command appropriately
            self.route_command(parsed_cmd)
            
        except json.JSONDecodeError:
            self.get_logger().error(f'Invalid JSON in parsed command: {msg.data}')
        except Exception as e:
            self.get_logger().error(f'Error processing parsed command: {e}')

    def route_command(self, parsed_command):
        """Route the command to the appropriate system based on intent"""
        intent = parsed_command.intent.lower()
        
        if intent == 'navigation':
            self.handle_navigation_command(parsed_command)
        elif intent == 'manipulation':
            self.handle_manipulation_command(parsed_command)
        elif intent == 'interaction':
            self.handle_interaction_command(parsed_command)
        elif intent == 'information':
            self.handle_information_command(parsed_command)
        else:
            self.get_logger().info(f'Unknown intent: {intent}, routing to default handler')
            self.handle_default_command(parsed_command)

    def handle_navigation_command(self, parsed_command):
        """Handle navigation-related commands"""
        locations = parsed_command.locations
        
        if locations:
            # For this example, we'll use the first location found
            target_location = locations[0]
            
            # In a real system, this would map named locations to coordinates
            # Here we'll use a simple example mapping
            location_coords = {
                'kitchen': (3.0, 1.0, 0.0),  # x, y, theta
                'living room': (0.0, 0.0, 0.0),
                'bedroom': (-2.0, 1.5, 1.57),
                'home': (0.0, 0.0, 0.0)
            }
            
            if target_location.lower() in location_coords:
                x, y, theta = location_coords[target_location.lower()]
                
                # Create and publish navigation goal
                goal = self.create_pose_stamped(x, y, theta)
                self.navigation_pub.publish(goal)
                
                self.get_logger().info(f'Navigating to {target_location}: ({x}, {y})')
            else:
                self.get_logger().warn(f'Unknown location: {target_location}')
        else:
            self.get_logger().warn('Navigation command without target location')

    def handle_manipulation_command(self, parsed_command):
        """Handle manipulation-related commands"""
        objects = parsed_command.objects
        
        if objects:
            target_object = objects[0]  # Use first object found
            
            # Publish manipulation command
            cmd_msg = String()
            cmd_msg.data = json.dumps({
                'action': 'grasp',
                'object': target_object,
                'timestamp': self.get_clock().now().to_msg().sec
            })
            self.manipulation_pub.publish(cmd_msg)
            
            self.get_logger().info(f'Manipulation command: Grasp {target_object}')
        else:
            self.get_logger().warn('Manipulation command without object specification')

    def handle_interaction_command(self, parsed_command):
        """Handle interaction-related commands"""
        # Example: handle greeting, waving, etc.
        if 'wave' in parsed_command.raw_text.lower():
            cmd_msg = String()
            cmd_msg.data = json.dumps({
                'action': 'wave',
                'timestamp': self.get_clock().now().to_msg().sec
            })
            self.manipulation_pub.publish(cmd_msg)
        elif 'greet' in parsed_command.raw_text.lower() or 'hello' in parsed_command.raw_text.lower():
            cmd_msg = String()
            cmd_msg.data = json.dumps({
                'action': 'greet',
                'timestamp': self.get_clock().now().to_msg().sec
            })
            self.manipulation_pub.publish(cmd_msg)
            
        self.get_logger().info(f'Interaction command: {parsed_command.raw_text}')

    def handle_information_command(self, parsed_command):
        """Handle information-seeking commands"""
        # This would interface with perception systems to answer questions
        self.get_logger().info(f'Information command: {parsed_command.raw_text}')
        # Implementation would depend on perception and knowledge base systems

    def handle_default_command(self, parsed_command):
        """Handle commands that don't match specific intent categories"""
        # Use LLM planner for complex or unknown commands
        plan_msg = String()
        plan_msg.data = json.dumps({
            'intent': parsed_command.intent,
            'objects': parsed_command.objects,
            'locations': parsed_command.locations,
            'raw_command': parsed_command.raw_text,
            'confidence': parsed_command.confidence
        })
        self.action_plan_pub.publish(plan_msg)
        
        self.get_logger().info(f'Default handler for: {parsed_command.raw_text}')

    def create_pose_stamped(self, x, y, theta):
        """Create a PoseStamped message for navigation"""
        goal = PoseStamped()
        goal.header.stamp = self.get_clock().now().to_msg()
        goal.header.frame_id = 'map'
        goal.pose.position.x = float(x)
        goal.pose.position.y = float(y)
        goal.pose.position.z = 0.0  # Assuming flat navigation
        
        # Convert theta to quaternion
        import math
        goal.pose.orientation.z = math.sin(theta / 2.0)
        goal.pose.orientation.w = math.cos(theta / 2.0)
        
        return goal

The voice-to-action implementation with Whisper integration provides a robust foundation for natural human-robot interaction in humanoid robotics. By properly integrating safety overrides, the system ensures that voice commands are processed safely, with automatic intervention when potentially dangerous situations are detected. The system's ability to understand and execute complex commands makes it suitable for sophisticated human-robot collaboration scenarios.

💬