Spaces:
Runtime error
Runtime error
| # gemini_helpers.py | |
| # Copyright (C) 4 de Agosto de 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Este programa é software livre: você pode redistribuí-lo e/ou modificá-lo | |
| # sob os termos da Licença Pública Geral Affero GNU como publicada pela | |
| # Free Software Foundation, seja a versão 3 da Licença, ou | |
| # (a seu critério) qualquer versão posterior. | |
| # | |
| # AVISO DE PATENTE PENDENTE: O método e sistema ADUC implementado neste | |
| # software está em processo de patenteamento. Consulte NOTICE.md. | |
| import os | |
| import logging | |
| import json | |
| import gradio as gr | |
| from PIL import Image | |
| import google.generativeai as genai | |
| import re | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def robust_json_parser(raw_text: str) -> dict: | |
| clean_text = raw_text.strip() | |
| try: | |
| # Tenta encontrar o JSON delimitado por ```json ... ``` | |
| match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL) | |
| if match: | |
| json_str = match.group(1) | |
| return json.loads(json_str) | |
| # Se não encontrar, tenta encontrar o primeiro '{' e o último '}' | |
| start_index = clean_text.find('{') | |
| end_index = clean_text.rfind('}') | |
| if start_index != -1 and end_index != -1 and end_index > start_index: | |
| json_str = clean_text[start_index : end_index + 1] | |
| return json.loads(json_str) | |
| else: | |
| raise ValueError("Nenhum objeto JSON válido foi encontrado na resposta da IA.") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Falha ao decodificar JSON. A IA retornou o seguinte texto:\n---\n{raw_text}\n---") | |
| raise ValueError(f"A IA retornou um formato de JSON inválido: {e}") | |
| class GeminiSingleton: | |
| def __init__(self): | |
| self.api_key = os.environ.get("GEMINI_API_KEY") | |
| if self.api_key: | |
| genai.configure(api_key=self.api_key) | |
| # Modelo mais recente e capaz para tarefas complexas de visão e raciocínio. | |
| self.model = genai.GenerativeModel('gemini-2.5-flash') | |
| logger.info("Especialista Gemini (1.5 Pro) inicializado com sucesso.") | |
| else: | |
| self.model = None | |
| logger.warning("Chave da API Gemini não encontrada. Especialista desativado.") | |
| def _check_model(self): | |
| if not self.model: | |
| raise gr.Error("A chave da API do Google Gemini não está configurada (GEMINI_API_KEY).") | |
| def _read_prompt_template(self, filename: str) -> str: | |
| try: | |
| with open(os.path.join("prompts", filename), "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| raise gr.Error(f"Arquivo de prompt não encontrado: prompts/{filename}") | |
| def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]: | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("unified_storyboard_prompt.txt") | |
| storyboard_prompt = template.format(user_prompt=prompt, num_fragments=num_keyframes) | |
| model_contents = [storyboard_prompt] + [Image.open(p) for p in ref_image_paths] | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (generate_storyboard) ---\n{response.text}\n--------------------") | |
| storyboard_data = robust_json_parser(response.text) | |
| storyboard = storyboard_data.get("scene_storyboard", []) | |
| if not storyboard or len(storyboard) != num_keyframes: raise ValueError(f"Número incorreto de cenas gerado.") | |
| return storyboard | |
| except Exception as e: | |
| raise gr.Error(f"O Roteirista (Gemini) falhou: {e}") | |
| def select_keyframes_from_pool(self, storyboard: list, base_image_paths: list[str], pool_image_paths: list[str]) -> list[str]: | |
| self._check_model() | |
| if not pool_image_paths: | |
| raise gr.Error("O 'banco de imagens' (Imagens Adicionais) está vazio.") | |
| try: | |
| template = self._read_prompt_template("keyframe_selection_prompt.txt") | |
| image_map = {f"IMG-{i+1}": path for i, path in enumerate(pool_image_paths)} | |
| base_image_map = {f"BASE-{i+1}": path for i, path in enumerate(base_image_paths)} | |
| model_contents = ["# Reference Images (Story Base)"] | |
| for identifier, path in base_image_map.items(): | |
| model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) | |
| model_contents.append("\n# Image Pool (Scene Bank)") | |
| for identifier, path in image_map.items(): | |
| model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) | |
| storyboard_str = "\n".join([f"- Scene {i+1}: {s}" for i, s in enumerate(storyboard)]) | |
| selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys())) | |
| model_contents.append(selection_prompt) | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (select_keyframes_from_pool) ---\n{response.text}\n--------------------") | |
| selection_data = robust_json_parser(response.text) | |
| selected_identifiers = selection_data.get("selected_image_identifiers", []) | |
| if len(selected_identifiers) != len(storyboard): | |
| raise ValueError("A IA não selecionou o número correto de imagens para as cenas.") | |
| selected_paths = [image_map[identifier] for identifier in selected_identifiers] | |
| return selected_paths | |
| except Exception as e: | |
| raise gr.Error(f"O Fotógrafo (Gemini) falhou ao selecionar as imagens: {e}") | |
| def get_anticipatory_keyframe_prompt(self, global_prompt: str, scene_history: str, current_scene_desc: str, future_scene_desc: str, last_image_path: str, fixed_ref_paths: list[str]) -> str: | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("anticipatory_keyframe_prompt.txt") | |
| director_prompt = template.format( | |
| historico_prompt=scene_history, | |
| cena_atual=current_scene_desc, | |
| cena_futura=future_scene_desc | |
| ) | |
| model_contents = [ | |
| "# CONTEXTO:", | |
| f"- Global Story Goal: {global_prompt}", | |
| "# VISUAL ASSETS:", | |
| "Current Base Image [IMG-BASE]:", | |
| Image.open(last_image_path) | |
| ] | |
| ref_counter = 1 | |
| for path in fixed_ref_paths: | |
| if path != last_image_path: | |
| model_contents.extend([f"General Reference Image [IMG-REF-{ref_counter}]:", Image.open(path)]) | |
| ref_counter += 1 | |
| model_contents.append(director_prompt) | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_anticipatory_keyframe_prompt) ---\n{response.text}\n--------------------") | |
| final_flux_prompt = response.text.strip() | |
| return final_flux_prompt | |
| except Exception as e: | |
| raise gr.Error(f"O Diretor de Arte (Gemini) falhou: {e}") | |
| def get_initial_motion_prompt(self, user_prompt: str, start_image_path: str, destination_image_path: str, dest_scene_desc: str) -> str: | |
| """Gera o prompt de movimento para a PRIMEIRA transição, que não tem um 'passado'.""" | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("initial_motion_prompt.txt") | |
| prompt_text = template.format(user_prompt=user_prompt, destination_scene_description=dest_scene_desc) | |
| model_contents = [ | |
| prompt_text, | |
| "START Image:", | |
| Image.open(start_image_path), | |
| "DESTINATION Image:", | |
| Image.open(destination_image_path) | |
| ] | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_initial_motion_prompt) ---\n{response.text}\n--------------------") | |
| return response.text.strip() | |
| except Exception as e: | |
| raise gr.Error(f"O Cineasta Inicial (Gemini) falhou: {e}") | |
| def get_cinematic_decision(self, global_prompt: str, story_history: str, | |
| past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str, | |
| past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict: | |
| """ | |
| Atua como um 'Cineasta', analisando passado, presente e futuro para tomar decisões | |
| de edição e gerar prompts de movimento detalhados. | |
| """ | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("cinematic_director_prompt.txt") | |
| prompt_text = template.format( | |
| global_prompt=global_prompt, | |
| story_history=story_history, | |
| past_scene_desc=past_scene_desc, | |
| present_scene_desc=present_scene_desc, | |
| future_scene_desc=future_scene_desc | |
| ) | |
| model_contents = [ | |
| prompt_text, | |
| "[PAST_IMAGE]:", Image.open(past_keyframe_path), | |
| "[PRESENT_IMAGE]:", Image.open(present_keyframe_path), | |
| "[FUTURE_IMAGE]:", Image.open(future_keyframe_path) | |
| ] | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_cinematic_decision) ---\n{response.text}\n--------------------") | |
| decision_data = robust_json_parser(response.text) | |
| if "transition_type" not in decision_data or "motion_prompt" not in decision_data: | |
| raise ValueError("Resposta da IA (Cineasta) está mal formatada. Faltam 'transition_type' ou 'motion_prompt'.") | |
| return decision_data | |
| except Exception as e: | |
| # Fallback para uma decisão segura em caso de erro | |
| logger.error(f"O Diretor de Cinema (Gemini) falhou: {e}. Usando fallback para 'continuous'.") | |
| return { | |
| "transition_type": "continuous", | |
| "motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'." | |
| } | |
| def get_sound_director_prompt(self, audio_history: str, | |
| past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str, | |
| present_scene_desc: str, motion_prompt: str, future_scene_desc: str) -> str: | |
| """ | |
| Atua como um 'Diretor de Som', analisando o contexto completo para criar um prompt | |
| de áudio imersivo e contínuo para a cena atual. | |
| """ | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("sound_director_prompt.txt") | |
| prompt_text = template.format( | |
| audio_history=audio_history, | |
| present_scene_desc=present_scene_desc, | |
| motion_prompt=motion_prompt, | |
| future_scene_desc=future_scene_desc | |
| ) | |
| model_contents = [ | |
| prompt_text, | |
| "[PAST_IMAGE]:", Image.open(past_keyframe_path), | |
| "[PRESENT_IMAGE]:", Image.open(present_keyframe_path), | |
| "[FUTURE_IMAGE]:", Image.open(future_keyframe_path) | |
| ] | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"--- RESPOSTA COMPLETA DO GEMINI (get_sound_director_prompt) ---\n{response.text}\n--------------------") | |
| return response.text.strip() | |
| except Exception as e: | |
| logger.error(f"O Diretor de Som (Gemini) falhou: {e}. Usando fallback.") | |
| return f"Sound effects matching the scene: {present_scene_desc}" | |
| gemini_singleton = GeminiSingleton() |