invincible-jha
Fix agent initialization and simplify agent structure
9a39220
raw
history blame
4.54 kB
from typing import Dict, List
from crewai import Crew, Task
import logging
from utils.log_manager import LogManager
from agents.conversation_agent import ConversationAgent
from agents.assessment_agent import AssessmentAgent
from agents.mindfulness_agent import MindfulnessAgent
from agents.crisis_agent import CrisisAgent
class WellnessOrchestrator:
"""Orchestrates the coordination between different agents"""
def __init__(self, model_config: Dict):
self.model_config = model_config
self.log_manager = LogManager()
self.logger = self.log_manager.get_agent_logger("orchestrator")
# Initialize agents
self.initialize_agents()
# Initialize CrewAI
self.initialize_crew()
def initialize_agents(self):
"""Initialize all agents with their specific roles and tools"""
self.logger.info("Initializing agents")
try:
# Initialize each agent
self.conversation_agent = ConversationAgent(
model_config=self.model_config
)
self.assessment_agent = ConversationAgent( # Temporarily using ConversationAgent
model_config=self.model_config
)
self.mindfulness_agent = ConversationAgent( # Temporarily using ConversationAgent
model_config=self.model_config
)
self.crisis_agent = ConversationAgent( # Temporarily using ConversationAgent
model_config=self.model_config
)
self.logger.info("All agents initialized successfully")
except Exception as e:
self.logger.error(f"Error initializing agents: {str(e)}")
raise
def initialize_crew(self):
"""Initialize CrewAI with agents"""
self.logger.info("Initializing CrewAI")
try:
# Create the crew with all agent instances
self.crew = Crew(
agents=[
self.conversation_agent.agent,
self.assessment_agent.agent,
self.mindfulness_agent.agent,
self.crisis_agent.agent
]
)
self.logger.info("CrewAI initialized successfully")
except Exception as e:
self.logger.error(f"Error initializing CrewAI: {str(e)}")
raise
def process_message(self, message: str, context: Dict = None) -> Dict:
"""Process user message through appropriate agents"""
self.logger.info("Processing message through agents")
context = context or {}
try:
# First, check for crisis indicators
if self._is_crisis(message):
return self.crisis_agent.process_message(message, context)
# Check for specific intents
if "assess" in message.lower() or "evaluate" in message.lower():
return self.assessment_agent.process_message(message, context)
if "meditate" in message.lower() or "mindful" in message.lower():
return self.mindfulness_agent.process_message(message, context)
# Default to conversation agent
return self.conversation_agent.process_message(message, context)
except Exception as e:
self.logger.error(f"Error processing message: {str(e)}")
return {
"message": "I apologize, but I encountered an error. Please try again.",
"agent_type": "error",
"task_type": "error_handling"
}
def _is_crisis(self, message: str) -> bool:
"""Check if message indicates a crisis situation"""
crisis_indicators = [
"suicide", "kill myself", "end it all",
"hurt myself", "give up", "can't go on",
"emergency", "crisis", "urgent help"
]
return any(indicator in message.lower() for indicator in crisis_indicators)
def get_status(self) -> Dict:
"""Get status of all agents"""
return {
"conversation": self.conversation_agent.get_status(),
"assessment": self.assessment_agent.get_status(),
"mindfulness": self.mindfulness_agent.get_status(),
"crisis": self.crisis_agent.get_status()
}