LLM-Based Cognitive Planning in Humanoid Robotics
Large Language Models (LLMs) provide powerful cognitive planning capabilities for humanoid robots, enabling them to understand complex, natural language commands and generate detailed action plans. This section explores the implementation of LLM-based cognitive planning in humanoid robotics with focus on safety, interpretability, and human-centered design.
Architecture of LLM-Based Cognitive Planning
The cognitive planning system follows this architecture:
Natural Language Command → LLM Interpretation → Action Decomposition → Safety Verification → Execution Planning
Planning Interface Implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped, Twist
from action_msgs.msg import GoalStatus
from rclpy.action import ActionClient
import json
import asyncio
import openai # Or other LLM provider
class LLMBasedPlanner(Node):
def __init__(self):
super().__init__('llm_planner')
# Subscribers for commands
self.command_sub = self.create_subscription(
String,
'/natural_language_command',
self.command_callback,
10
)
# Publishers for plans and status
self.plan_pub = self.create_publisher(String, '/action_plan', 10)
self.status_pub = self.create_publisher(String, '/planning_status', 10)
# Initialize LLM interface
self.setup_llm_interface()
# Initialize safety validator
self.safety_validator = LLMPlanningSafety(self)
# Store active plans
self.active_plans = {}
self.get_logger().info('LLM-Based Planner initialized')
def setup_llm_interface(self):
"""Setup LLM interface (OpenAI or other provider)"""
# In a real implementation, you would set up API keys and models here
# For example, with OpenAI:
# openai.api_key = os.getenv("OPENAI_API_KEY")
# self.model_name = "gpt-4" # Or other appropriate model
# For this example, we'll use a placeholder
self.model_name = "gpt-4"
self.get_logger().info(f'LLM interface configured with model: {self.model_name}')
def command_callback(self, msg):
"""Process natural language command and generate plan"""
try:
command_text = msg.data
# Validate command safety before processing
if not self.safety_validator.validate_command_syntax(command_text):
self.get_logger().error(f'Unsafe command syntax: {command_text}')
return
# Generate cognitive plan using LLM
action_plan = self.generate_cognitive_plan(command_text)
if action_plan:
# Validate safety of the generated plan
is_safe, safety_issues = self.safety_validator.validate_plan_safety(action_plan)
if is_safe:
# Publish the safe action plan
plan_msg = String()
plan_msg.data = json.dumps(action_plan)
self.plan_pub.publish(plan_msg)
self.get_logger().info(f'Action plan published for: {command_text}')
# Store plan for execution tracking
plan_id = action_plan.get('plan_id', 'unknown')
self.active_plans[plan_id] = action_plan
else:
self.get_logger().error(f'Safety issues in plan: {safety_issues}')
self.publish_error_status(f'Safety validation failed: {safety_issues}')
else:
self.get_logger().error(f'Failed to generate plan for: {command_text}')
except Exception as e:
self.get_logger().error(f'Error processing command: {e}')
self.publish_error_status(f'Planning error: {str(e)}')
def generate_cognitive_plan(self, command_text):
"""Generate detailed action plan using LLM"""
try:
# Construct prompt for cognitive planning
prompt = self.construct_planning_prompt(command_text)
# In a real implementation, this would call the LLM
# For example, with OpenAI:
# response = openai.ChatCompletion.create(
# model=self.model_name,
# messages=[{"role": "user", "content": prompt}],
# temperature=0.3, # Lower temperature for more consistent planning
# max_tokens=1000,
# response_format={"type": "json_object"} # Ensure JSON output
# )
# plan_text = response.choices[0].message['content']
# For this example, we'll simulate the LLM response
plan_text = self.simulate_llm_planning_response(command_text)
# Parse the LLM response into structured plan
plan = json.loads(plan_text)
# Add metadata to the plan
import uuid
plan['plan_id'] = str(uuid.uuid4())
plan['original_command'] = command_text
plan['timestamp'] = self.get_clock().now().to_msg().sec
return plan
except json.JSONDecodeError as e:
self.get_logger().error(f'LLM response not valid JSON: {e}')
return None
except Exception as e:
self.get_logger().error(f'Error in LLM planning: {e}')
return None
def construct_planning_prompt(self, command_text):
"""Construct prompt for LLM-based cognitive planning"""
prompt = f"""
You are an advanced cognitive planner for a humanoid robot. Your task is to decompose complex natural language commands into detailed, executable action plans that are safe and appropriate for humanoid robot execution.
Command: "{command_text}"
Requirements for the plan:
1. Decompose the command into sequential, actionable steps
2. Include navigation, perception, and manipulation components as needed
3. Ensure each step is safe and executable by a humanoid robot
4. Include error handling and fallback procedures
5. Consider spatial relationships and environmental constraints
6. Output in strict JSON format with the following structure:
{{
"plan_id": "unique identifier",
"description": "Brief description of the plan",
"original_command": "Original user command",
"steps": [
{{
"id": 1,
"description": "Detailed description of the step",
"type": "navigation|perception|manipulation|interaction|other",
"required_skills": ["list", "of", "required", "skills"],
"estimated_duration": 10.0,
"success_criteria": "How to verify success",
"failure_recovery": "What to do if step fails"
}}
],
"resources_needed": ["list", "of", "required", "resources"],
"safety_considerations": [
"Consideration 1",
"Consideration 2"
],
"dependencies": [
{{"step_id": 2, "depends_on": 1, "relationship": "requires_completion"}}
]
}}
IMPORTANT: Ensure all actions are safe for human environments and the robot has physical capabilities to perform them.
"""
return prompt
def simulate_llm_planning_response(self, command_text):
"""Simulate LLM response for example purposes"""
import uuid
# Example responses based on command type
if "bring me" in command_text.lower() or "get me" in command_text.lower():
return f"""
{{
"description": "Retrieve object and bring to user",
"original_command": "{command_text}",
"steps": [
{{
"id": 1,
"description": "Identify the location of the requested object",
"type": "perception",
"required_skills": ["object_detection", "spatial_reasoning"],
"estimated_duration": 5.0,
"success_criteria": "Target object location is known",
"failure_recovery": "Ask user for more information about object location"
}},
{{
"id": 2,
"description": "Navigate to the object's location",
"type": "navigation",
"required_skills": ["path_planning", "obstacle_avoidance"],
"estimated_duration": 15.0,
"success_criteria": "Robot is at object location",
"failure_recovery": "Find alternative route or ask for help"
}},
{{
"id": 3,
"description": "Approach and grasp the object",
"type": "manipulation",
"required_skills": ["grasping", "force_control"],
"estimated_duration": 10.0,
"success_criteria": "Object is securely grasped",
"failure_recovery": "Adjust grasp or report inability to grasp"
}},
{{
"id": 4,
"description": "Return to user location",
"type": "navigation",
"required_skills": ["path_planning", "obstacle_avoidance"],
"estimated_duration": 15.0,
"success_criteria": "Robot is at user's location",
"failure_recovery": "Find alternative route or ask for help"
}},
{{
"id": 5,
"description": "Present object to user",
"type": "manipulation",
"required_skills": ["object_presentation", "human_interaction"],
"estimated_duration": 5.0,
"success_criteria": "Object is presented to user",
"failure_recovery": "Place object nearby and alert user"
}}
],
"resources_needed": ["object_detector", "navigation_system", "manipulator_arms"],
"safety_considerations": [
"Maintain safe distances from humans during navigation",
"Use appropriate grasping force to avoid damaging object",
"Be aware of surroundings when manipulating objects"
],
"dependencies": [
{{"step_id": 2, "depends_on": 1, "relationship": "requires_location"}},
{{"step_id": 3, "depends_on": 2, "relationship": "requires_navigation"}},
{{"step_id": 4, "depends_on": 3, "relationship": "requires_grasp"}},
{{"step_id": 5, "depends_on": 4, "relationship": "requires_return"}}
]
}}
"""
elif "go to" in command_text.lower() or "navigate to" in command_text.lower():
return f"""
{{
"description": "Navigate to specified location",
"original_command": "{command_text}",
"steps": [
{{
"id": 1,
"description": "Identify target location in map",
"type": "navigation",
"required_skills": ["localization", "map_access"],
"estimated_duration": 2.0,
"success_criteria": "Target coordinates are determined",
"failure_recovery": "Ask user for more specific location details"
}},
{{
"id": 2,
"description": "Plan safe path to target location",
"type": "navigation",
"required_skills": ["path_planning", "costmap_access"],
"estimated_duration": 3.0,
"success_criteria": "Valid path is computed",
"failure_recovery": "Find alternative destination or report impassable"
}},
{{
"id": 3,
"description": "Execute navigation to target",
"type": "navigation",
"required_skills": ["path_following", "obstacle_avoidance"],
"estimated_duration": 20.0,
"success_criteria": "Robot reaches target location",
"failure_recovery": "Stop safely and request assistance"
}}
],
"resources_needed": ["navigation_system", "lidar", "map"],
"safety_considerations": [
"Maintain safe distances from humans during movement",
"Stop immediately if path becomes blocked",
"Avoid areas with known hazards"
],
"dependencies": [
{{"step_id": 2, "depends_on": 1, "relationship": "requires_target"}},
{{"step_id": 3, "depends_on": 2, "relationship": "requires_path"}}
]
}}
"""
else:
# Default response for other commands
return f"""
{{
"description": "Execute general command",
"original_command": "{command_text}",
"steps": [
{{
"id": 1,
"description": "Analyze command and determine appropriate action sequence",
"type": "other",
"required_skills": ["command_analysis"],
"estimated_duration": 3.0,
"success_criteria": "Action sequence is determined",
"failure_recovery": "Request clarification from user"
}}
],
"resources_needed": ["cognitive_planner"],
"safety_considerations": [
"Verify all actions are safe before execution",
"Maintain ability to abort if unsafe conditions arise"
],
"dependencies": []
}}
"""
def publish_error_status(self, error_message):
"""Publish error status"""
status_msg = String()
status_msg.data = json.dumps({
'status': 'error',
'message': error_message,
'timestamp': self.get_clock().now().to_msg().sec
})
self.status_pub.publish(status_msg)
Safety Verification in Cognitive Planning
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import LaserScan
import json
import re
class LLMPlanningSafety(Node):
def __init__(self, parent_node):
super().__init__('llm_planning_safety')
# Store reference to parent planner node
self.parent_node = parent_node
# Subscribe to sensor data for environmental awareness
self.scan_sub = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10
)
# Subscribe to robot state
self.robot_state_sub = self.create_subscription(
# In real implementation, this would be a proper robot state message
)
# Initialize safety parameters
self.safety_violations = []
self.current_environment = {}
self.prohibited_actions = [
'harm humans',
'damage property',
'enter restricted areas',
'violate privacy',
'act unsafely'
]
self.get_logger().info('LLM Planning Safety Validator initialized')
def validate_command_syntax(self, command_text):
"""Validate the syntax and content of a command for safety"""
# Check for prohibited phrases
command_lower = command_text.lower()
for prohibited in self.prohibited_actions:
if prohibited in command_lower:
self.get_logger().warn(f'Prohibited action in command: {prohibited}')
return False
# Check for potentially unsafe language patterns
unsafe_patterns = [
r'\b(dangerous|unsafe|destroy|break|harm|injure|attack)\b',
r'\b(human|person|people|child|elderly)\s+(in\s+danger|at\s+risk|unsafe)',
r'(go\s+into\s+restricted|enter\s+forbidden)'
]
for pattern in unsafe_patterns:
if re.search(pattern, command_lower):
self.get_logger().warn(f'Unsafe pattern detected in command: {pattern}')
return False
# If no safety concerns found, return True
return True
def validate_plan_safety(self, action_plan):
"""Validate the safety of an entire action plan"""
violations = []
# Check each step in the plan
for step in action_plan.get('steps', []):
step_violations = self.validate_step_safety(step)
violations.extend(step_violations)
# Check resource requirements
resource_violations = self.validate_resources_safety(action_plan.get('resources_needed', []))
violations.extend(resource_violations)
# Check safety considerations
if 'safety_considerations' not in action_plan:
violations.append('Missing required safety considerations')
is_safe = len(violations) == 0
return is_safe, violations
def validate_step_safety(self, step):
"""Validate the safety of a single plan step"""
violations = []
step_type = step.get('type', 'other').lower()
description = step.get('description', '').lower()
# Check for unsafe navigation
if step_type == 'navigation':
if 'narrow' in description or 'tight' in description:
# Check if robot can safely navigate
if not self.can_safely_navigate(description):
violations.append(f'Navigation step may be unsafe: {description}')
# Check for unsafe manipulation
elif step_type == 'manipulation':
if 'fragile' in description or 'delicate' in description:
# Verify robot can handle appropriately
if not self.can_handle_safely(description):
violations.append(f'Manipulation step may be unsafe: {description}')
# Check for unsafe actions in general
for prohibited in self.prohibited_actions:
if prohibited in description:
violations.append(f'Prohibited action in step: {prohibited}')
# Check for environmental safety
if 'near humans' in description or 'close to people' in description:
if not self.environment_allows_safe_human_proximity():
violations.append('Step involves unsafe proximity to humans')
return violations
def validate_resources_safety(self, resources):
"""Validate that required resources are safe to use"""
violations = []
# Check for potentially unsafe resource requirements
for resource in resources:
if 'high_power' in resource or 'strong_force' in resource:
# Verify appropriate safeguards are in place
if not self.safeguards_in_place(resource):
violations.append(f'Unsafe resource without safeguards: {resource}')
return violations
def can_safely_navigate(self, description):
"""Check if navigation is safe given the environment and description"""
# In a real implementation, this would check:
# - Current environment map
# - Known obstacle locations
# - Safety zones
# For this example, we'll return True
return True
def can_handle_safely(self, description):
"""Check if manipulation is safe given robot capabilities"""
# In a real implementation, this would check:
# - Object properties (weight, fragility)
# - Robot manipulator capabilities
# - Environmental constraints
# For this example, we'll return True
return True
def environment_allows_safe_human_proximity(self):
"""Check if environment allows safe proximity to humans"""
# In a real implementation, this would check sensor data
# and maintain safe distances
# For this example, we'll return True
return True
def safeguards_in_place(self, resource):
"""Check if appropriate safeguards are in place for a resource"""
# In a real implementation, this would check:
# - Safety system status
# - Emergency stop availability
# - Operator presence
# For this example, we'll return True
return True
def scan_callback(self, msg):
"""Update environmental data from laser scanner"""
# Process scan data to update current environment awareness
# Store this for safety validation
self.current_environment['laser_data'] = {
'min_distance': min(msg.ranges) if msg.ranges else float('inf'),
'timestamp': msg.header.stamp.sec
}
Execution Monitoring and Adaptation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from action_msgs.msg import GoalStatus
from rclpy.action import ActionClient, GoalStatus
import json
import asyncio
class PlanExecutionMonitor(Node):
def __init__(self):
super().__init__('plan_execution_monitor')
# Subscribers for plan execution status
self.plan_sub = self.create_subscription(
String,
'/action_plan',
self.plan_received_callback,
10
)
self.execution_status_sub = self.create_subscription(
String,
'/execution_status',
self.execution_status_callback,
10
)
# Publishers for plan adaptation
self.adapted_plan_pub = self.create_publisher(String, '/adapted_action_plan', 10)
self.intervention_pub = self.create_publisher(String, '/intervention_request', 10)
# Store active plans and their execution status
self.active_plans = {}
self.execution_status = {}
# Timer for monitoring plan execution
self.monitor_timer = self.create_timer(1.0, self.monitor_execution)
self.get_logger().info('Plan Execution Monitor initialized')
def plan_received_callback(self, msg):
"""Handle new action plan"""
try:
plan = json.loads(msg.data)
plan_id = plan.get('plan_id', 'unknown')
# Store the plan
self.active_plans[plan_id] = {
'plan': plan,
'start_time': self.get_clock().now().to_msg().sec,
'current_step': 0,
'status': 'waiting', # waiting, executing, paused, completed, failed
'step_start_time': 0,
'intervention_needed': False
}
self.get_logger().info(f'New plan stored: {plan_id}')
except json.JSONDecodeError:
self.get_logger().error(f'Invalid JSON plan: {msg.data}')
def execution_status_callback(self, msg):
"""Handle execution status updates from action executors"""
try:
status_data = json.loads(msg.data)
plan_id = status_data.get('plan_id', 'unknown')
step_id = status_data.get('step_id', -1)
status = status_data.get('status', 'unknown')
if plan_id in self.execution_status:
# Update execution status for the plan
self.execution_status[plan_id][step_id] = {
'status': status,
'timestamp': status_data.get('timestamp', 0),
'details': status_data.get('details', '')
}
else:
# Initialize status tracking for this plan
self.execution_status[plan_id] = {
step_id: {
'status': status,
'timestamp': status_data.get('timestamp', 0),
'details': status_data.get('details', '')
}
}
self.get_logger().info(f'Execution status updated: {plan_id}, step {step_id}, status: {status}')
except json.JSONDecodeError:
self.get_logger().error(f'Invalid JSON status: {msg.data}')
def monitor_execution(self):
"""Monitor execution of all active plans"""
current_time = self.get_clock().now().to_msg().sec
for plan_id, plan_info in self.active_plans.items():
if plan_info['status'] == 'executing':
# Check if current step is taking too long
self.check_step_timeout(plan_id, plan_info, current_time)
# Check for safety issues
if self.has_safety_concerns(plan_id):
self.intervene_plan(plan_id, 'Safety concern detected')
# Check for execution failures
if self.has_execution_failed(plan_id):
self.intervene_plan(plan_id, 'Execution failure detected')
def check_step_timeout(self, plan_id, plan_info, current_time):
"""Check if the current step has taken too long"""
plan = plan_info['plan']
current_step_idx = plan_info['current_step']
if current_step_idx < len(plan.get('steps', [])):
step = plan['steps'][current_step_idx]
estimated_duration = step.get('estimated_duration', 30.0) # Default 30 seconds
# If the step has been executing significantly longer than estimated
if current_time - plan_info['step_start_time'] > estimated_duration * 2:
self.intervene_plan(plan_id, f'Step {current_step_idx} taking too long')
def has_safety_concerns(self, plan_id):
"""Check if there are safety concerns with plan execution"""
# In a real implementation, this would check sensor data, human proximity, etc.
# For this example, we'll return False
return False
def has_execution_failed(self, plan_id):
"""Check if plan execution has failed"""
# Check the status of the current step
if plan_id in self.execution_status:
plan_info = self.active_plans[plan_id]
current_step_idx = plan_info['current_step']
if current_step_idx in self.execution_status[plan_id]:
step_status = self.execution_status[plan_id][current_step_idx]['status']
if step_status == 'failed':
return True
return False
def intervene_plan(self, plan_id, reason):
"""Intervene in plan execution"""
self.get_logger().warn(f'Intervening in plan {plan_id}: {reason}')
# Update plan status
if plan_id in self.active_plans:
self.active_plans[plan_id]['status'] = 'paused'
self.active_plans[plan_id]['intervention_needed'] = True
# Publish intervention request
intervention_msg = String()
intervention_msg.data = json.dumps({
'plan_id': plan_id,
'reason': reason,
'timestamp': self.get_clock().now().to_msg().sec,
'type': 'pause_request'
})
self.intervention_pub.publish(intervention_msg)
def adapt_plan(self, plan_id, modification_request):
"""Adapt an existing plan based on changing conditions"""
if plan_id not in self.active_plans:
self.get_logger().error(f'Plan {plan_id} not found for adaptation')
return
original_plan = self.active_plans[plan_id]['plan']
# In a real implementation, this would use the LLM to adapt the plan
# based on the modification request
adapted_plan = self.create_adapted_plan(original_plan, modification_request)
if adapted_plan:
# Publish the adapted plan
adapted_msg = String()
adapted_msg.data = json.dumps(adapted_plan)
self.adapted_plan_pub.publish(adapted_msg)
self.get_logger().info(f'Adapted plan published for {plan_id}')
# Update the active plan
self.active_plans[plan_id]['plan'] = adapted_plan
self.active_plans[plan_id]['current_step'] = 0 # Reset to start of new plan
self.active_plans[plan_id]['status'] = 'waiting' # Wait for new execution
def create_adapted_plan(self, original_plan, modification_request):
"""Create an adapted version of the original plan"""
# For this example, we'll just return the original plan with a note
# In a real implementation, this would use an LLM to modify the plan
adapted_plan = original_plan.copy()
if 'adaptations' not in adapted_plan:
adapted_plan['adaptations'] = []
adapted_plan['adaptations'].append(modification_request)
return adapted_plan
Human-in-the-Loop Refinement
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import Image
import json
class HumanInLoopPlanner(Node):
def __init__(self):
super().__init__('human_in_loop_planner')
# Subscribers for human input
self.human_input_sub = self.create_subscription(
String,
'/human_planning_input',
self.human_input_callback,
10
)
# Publishers for human interaction
self.query_pub = self.create_publisher(String, '/planning_query', 10)
self.status_pub = self.create_publisher(String, '/hilo_status', 10)
# Store current plan context
self.current_plan_context = {}
self.pending_human_input = {}
self.get_logger().info('Human-in-the-Loop Planner initialized')
def human_input_callback(self, msg):
"""Handle human input for plan refinement"""
try:
input_data = json.loads(msg.data)
interaction_type = input_data.get('type', 'feedback')
plan_id = input_data.get('plan_id', 'unknown')
content = input_data.get('content', '')
if interaction_type == 'feedback':
# Process feedback for plan refinement
self.process_feedback(plan_id, content)
elif interaction_type == 'clarification':
# Process clarification request
self.process_clarification(plan_id, content)
elif interaction_type == 'correction':
# Process plan correction
self.process_correction(plan_id, content)
self.get_logger().info(f'Processed human input: {interaction_type} for plan {plan_id}')
except json.JSONDecodeError:
self.get_logger().error(f'Invalid JSON human input: {msg.data}')
def process_feedback(self, plan_id, feedback_text):
"""Process human feedback to refine plan"""
# In a real implementation, this would send the feedback to the LLM
# for plan refinement
self.get_logger().info(f'Processing feedback for plan {plan_id}: {feedback_text}')
# Publish status update
status_msg = String()
status_msg.data = json.dumps({
'plan_id': plan_id,
'status': 'feedback_received',
'feedback': feedback_text,
'timestamp': self.get_clock().now().to_msg().sec
})
self.status_pub.publish(status_msg)
def process_clarification(self, plan_id, clarification_request):
"""Process request for clarification"""
# This would typically ask the human for more information
self.get_logger().info(f'Clarification needed for plan {plan_id}: {clarification_request}')
# Prepare query for human
query_msg = String()
query_msg.data = json.dumps({
'plan_id': plan_id,
'type': 'clarification_query',
'query': clarification_request,
'timestamp': self.get_clock().now().to_msg().sec
})
self.query_pub.publish(query_msg)
def process_correction(self, plan_id, correction):
"""Process plan correction from human"""
self.get_logger().info(f'Applying correction to plan {plan_id}: {correction}')
# In a real implementation, this would modify the plan based on human input
# and potentially send it back to the LLM for integration
# Publish status update
status_msg = String()
status_msg.data = json.dumps({
'plan_id': plan_id,
'status': 'correction_applied',
'correction': correction,
'timestamp': self.get_clock().now().to_msg().sec
})
self.status_pub.publish(status_msg)
def request_human_input(self, plan_id, request_type, request_text):
"""Request input from human operator"""
query_msg = String()
query_msg.data = json.dumps({
'plan_id': plan_id,
'type': request_type,
'request': request_text,
'timestamp': self.get_clock().now().to_msg().sec
})
self.query_pub.publish(query_msg)
# Store pending request
self.pending_human_input[plan_id] = {
'type': request_type,
'request': request_text,
'timestamp': self.get_clock().now().to_msg().sec
}
self.get_logger().info(f'Requested human input: {request_type} for plan {plan_id}')
LLM-based cognitive planning enables humanoid robots to understand and execute complex, natural language commands by breaking them down into detailed, executable action plans. Through proper safety verification, execution monitoring, and human-in-the-loop refinement, these systems can operate safely in dynamic human environments while maintaining the ability to adapt to changing conditions and user preferences. The integration of LLMs with robotic systems represents a significant advancement in making robots more accessible and useful in everyday human environments.