# aduc_framework/engineers/composer.py # # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos # # Versão 4.1.0 (Hub Cognitivo Consolidado) # # - A arquitetura está estável e completa. O Composer atua como o hub # central de comunicação com os LLMs. # - O método `execute_plan` é usado pelo Planner2D para a criação do roteiro. # - O método `execute_cognitive_task` é uma ferramenta genérica usada por # outros especialistas (como Planner4D e Deformes3D) para solicitar # tarefas de raciocínio pontuais ao LLM, como decidir um movimento de # câmera ou um plano de composição de imagem. import logging import json import re import yaml from pathlib import Path from PIL import Image from typing import List, Dict, Any, Generator, Optional, Callable from .prompt_engine import prompt_engine_singleton from ..managers.llama_multimodal_manager import llama_multimodal_manager_singleton from ..managers.gemini_manager import gemini_manager_singleton logger = logging.getLogger(__name__) def robust_json_parser(raw_text: str) -> dict: """ Analisa um objeto JSON de uma string que pode conter texto extra. """ logger.debug(f"COMPOSER(JSON_PARSER): Tentando parsear JSON (primeiros 500 chars):\n---\n{raw_text[:500]}\n---") match = re.search(r'```json\s*(\{.*?\})\s*```', raw_text, re.DOTALL) if match: json_str = match.group(1); logger.debug("JSON explícito encontrado.") return json.loads(json_str) try: start_index = raw_text.find('{'); end_index = raw_text.rfind('}') if start_index != -1 and end_index != -1 and end_index > start_index: json_str = raw_text[start_index : end_index + 1]; logger.debug("JSON por delimitadores '{...}' encontrado.") return json.loads(json_str) except json.JSONDecodeError: pass logger.warning("Nenhum JSON válido encontrado nos métodos primários. Tentando parsear o texto inteiro.") return json.loads(raw_text) class Composer: """ O Composer é o hub central de comunicação com o Large Language Model (LLM). Ele executa tanto planos de trabalho de várias etapas quanto tarefas cognitivas únicas. """ def __init__(self): logger.info("COMPOSER: Lendo config.yaml para selecionar o LLM Engine...") with open("config.yaml", 'r') as f: config = yaml.safe_load(f) self.provider = config.get('specialists', {}).get('llm_engine', {}).get('provider', 'llama_multimodal') if self.provider == 'gemini': self.llm_manager = gemini_manager_singleton logger.info("COMPOSER: Motor de LLM configurado para usar 'Gemini'.") else: self.llm_manager = llama_multimodal_manager_singleton logger.info("COMPOSER: Motor de LLM configurado para usar 'Llama' (padrão).") prompt_engine_singleton.set_provider(self.provider) self.task_templates = self._load_task_templates() logger.info(f"Composer inicializado com {len(self.task_templates)} templates de tarefa.") def _load_task_templates(self) -> Dict[str, str]: templates = {} template_dir = Path(__file__).resolve().parent.parent / "prompts" / "task_templates" if not template_dir.is_dir(): raise FileNotFoundError(f"Diretório de templates de tarefa não encontrado: {template_dir}") for task_file in template_dir.glob("*.txt"): task_id = task_file.stem with open(task_file, 'r', encoding='utf-8') as f: templates[task_id] = f.read() return templates def _talk_to_llm(self, generic_prompt: str, images: Optional[List[Image.Image]] = None, expected_format="text") -> Any: final_model_prompt = prompt_engine_singleton.translate( generic_prompt_content=generic_prompt, has_image=bool(images) ) logger.info(f"COMPOSER: PROMPT FINAL SENDO ENVIADO para ({self.provider}):\n--- INÍCIO DO PROMPT ---\n{final_model_prompt}\n--- FIM DO PROMPT ---") response_raw = self.llm_manager.process_turn(prompt_text=final_model_prompt, image_list=images) logger.info(f"COMPOSER: RESPOSTA BRUTA RECEBIDA de ({self.provider}):\n--- INÍCIO DA RESPOSTA BRUTA ---\n{response_raw}\n--- FIM DA RESPOSTA BRUTA ---") if expected_format == "json": try: return robust_json_parser(response_raw) except (json.JSONDecodeError, ValueError) as e: raise ValueError(f"O LLM ({self.provider}) retornou um formato JSON inválido. Erro: {e}") return response_raw def execute_cognitive_task(self, task_id: str, template_data: Dict[str, Any], images: Optional[List[Image.Image]] = None) -> Any: """ Executa uma única tarefa cognitiva (de "pensamento") e retorna o resultado. """ logger.info(f"COMPOSER: Executando tarefa cognitiva: {task_id}") generic_template = self.task_templates.get(task_id) if not generic_template: raise ValueError(f"Template para a tarefa cognitiva '{task_id}' não foi encontrado.") prompt_content = generic_template for key, value in template_data.items(): prompt_content = prompt_content.replace(f"{{{key}}}", str(value)) expected_format = "json" if "JSON REQUIRED" in generic_template.upper() else "text" response = self._talk_to_llm(generic_prompt=prompt_content, images=images, expected_format=expected_format) if expected_format == "text": return response.strip().replace("\"", "") return response def execute_plan(self, execution_plan: List[Dict[str, Any]], initial_data: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: """ Executa um plano de trabalho de várias etapas para a criação do roteiro. """ dna = {"global_prompt": initial_data["global_prompt"], "initial_media_paths": initial_data["user_media_paths"], "continuous_story": "", "scenes": []} user_media = [Image.open(p) for p in initial_data["user_media_paths"]] for i, task in enumerate(execution_plan): try: task_id = task['task_id'] yield {"status": "progress", "message": task.get('description', '')} generic_template = self.task_templates.get(task_id) if not generic_template: raise ValueError(f"Template para a tarefa '{task_id}' não foi encontrado.") prompt_content = generic_template prompt_content = prompt_content.replace("{global_prompt}", str(dna.get("global_prompt", ""))) prompt_content = prompt_content.replace("{num_scenes}", str(task.get('inputs', {}).get("num_scenes", ""))) prompt_content = prompt_content.replace("{continuous_story}", str(dna.get("continuous_story", ""))) prompt_content = prompt_content.replace("{independent_scenes_json}", json.dumps({"scenes": dna.get("scenes", [])}, indent=2)) is_json_output = task_id in ["STEP_02_CREATE_INDEPENDENT_SCENES", "STEP_03_FRAGMENT_SCENES_INTO_ACTS", "STEP_04_FINAL_REVIEW"] expected_format = "json" if is_json_output else "text" response = self._talk_to_llm(prompt_content, user_media if i == 0 else None, expected_format) if task_id == "STEP_01_CREATE_CONTINUOUS_STORY": dna["continuous_story"] = response elif task_id == "STEP_02_CREATE_INDEPENDENT_SCENES": dna["scenes"] = response.get("scenes", []) elif task_id == "STEP_03_FRAGMENT_SCENES_INTO_ACTS": dna["scenes"] = response.get("scenes", []) except Exception as e: raise e yield {"status": "complete", "message": "Execução do Composer concluída.", "dna": dna} composer_singleton = Composer()