Spaces:
Runtime error
Runtime error
| from typing import Dict, List | |
| from crewai import Crew, Task | |
| import logging | |
| from utils.log_manager import LogManager | |
| from agents.conversation_agent import ConversationAgent | |
| from agents.assessment_agent import AssessmentAgent | |
| from agents.mindfulness_agent import MindfulnessAgent | |
| from agents.crisis_agent import CrisisAgent | |
| class WellnessOrchestrator: | |
| """Orchestrates the coordination between different agents""" | |
| def __init__(self, model_config: Dict): | |
| self.model_config = model_config | |
| self.log_manager = LogManager() | |
| self.logger = self.log_manager.get_agent_logger("orchestrator") | |
| # Initialize agents | |
| self.initialize_agents() | |
| # Initialize CrewAI | |
| self.initialize_crew() | |
| def initialize_agents(self): | |
| """Initialize all agents with their specific roles and tools""" | |
| self.logger.info("Initializing agents") | |
| try: | |
| # Initialize each agent | |
| self.conversation_agent = ConversationAgent( | |
| model_config=self.model_config | |
| ) | |
| self.assessment_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config | |
| ) | |
| self.mindfulness_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config | |
| ) | |
| self.crisis_agent = ConversationAgent( # Temporarily using ConversationAgent | |
| model_config=self.model_config | |
| ) | |
| self.logger.info("All agents initialized successfully") | |
| except Exception as e: | |
| self.logger.error(f"Error initializing agents: {str(e)}") | |
| raise | |
| def initialize_crew(self): | |
| """Initialize CrewAI with agents""" | |
| self.logger.info("Initializing CrewAI") | |
| try: | |
| # Create the crew with all agent instances | |
| self.crew = Crew( | |
| agents=[ | |
| self.conversation_agent.agent, | |
| self.assessment_agent.agent, | |
| self.mindfulness_agent.agent, | |
| self.crisis_agent.agent | |
| ] | |
| ) | |
| self.logger.info("CrewAI initialized successfully") | |
| except Exception as e: | |
| self.logger.error(f"Error initializing CrewAI: {str(e)}") | |
| raise | |
| def process_message(self, message: str, context: Dict = None) -> Dict: | |
| """Process user message through appropriate agents""" | |
| self.logger.info("Processing message through agents") | |
| context = context or {} | |
| try: | |
| # First, check for crisis indicators | |
| if self._is_crisis(message): | |
| return self.crisis_agent.process_message(message, context) | |
| # Check for specific intents | |
| if "assess" in message.lower() or "evaluate" in message.lower(): | |
| return self.assessment_agent.process_message(message, context) | |
| if "meditate" in message.lower() or "mindful" in message.lower(): | |
| return self.mindfulness_agent.process_message(message, context) | |
| # Default to conversation agent | |
| return self.conversation_agent.process_message(message, context) | |
| except Exception as e: | |
| self.logger.error(f"Error processing message: {str(e)}") | |
| return { | |
| "message": "I apologize, but I encountered an error. Please try again.", | |
| "agent_type": "error", | |
| "task_type": "error_handling" | |
| } | |
| def _is_crisis(self, message: str) -> bool: | |
| """Check if message indicates a crisis situation""" | |
| crisis_indicators = [ | |
| "suicide", "kill myself", "end it all", | |
| "hurt myself", "give up", "can't go on", | |
| "emergency", "crisis", "urgent help" | |
| ] | |
| return any(indicator in message.lower() for indicator in crisis_indicators) | |
| def get_status(self) -> Dict: | |
| """Get status of all agents""" | |
| return { | |
| "conversation": self.conversation_agent.get_status(), | |
| "assessment": self.assessment_agent.get_status(), | |
| "mindfulness": self.mindfulness_agent.get_status(), | |
| "crisis": self.crisis_agent.get_status() | |
| } |