Spaces:
Paused
Paused
| # aduc_framework/tools/video_encode_tool.py | |
| # | |
| # Versão 1.4.0 (Conjunto de Ferramentas de Vídeo Finalizado) | |
| # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Este módulo atua como o especialista central para todas as operações de | |
| # manipulação e codificação de vídeo. Ele abstrai as interações com | |
| # FFmpeg e imageio, fornecendo uma API limpa e robusta para o resto do framework. | |
| # - save_video_from_tensor: Converte um tensor de pixel em um arquivo .mp4. | |
| # - extract_..._frame: Extrai frames específicos de clipes de vídeo. | |
| # - concatenate_videos: Monta o filme final a partir dos clipes de cena. | |
| import os | |
| import subprocess | |
| import logging | |
| import random | |
| import time | |
| import shutil | |
| from typing import List, Optional, Tuple | |
| import imageio | |
| import numpy as np | |
| import torch | |
| logger = logging.getLogger(__name__) | |
| class VideoToolError(Exception): | |
| """Exceção personalizada para erros originados do VideoEncodeTool.""" | |
| pass | |
| class VideoEncodeTool: | |
| """ | |
| Um especialista para lidar com tarefas de codificação e manipulação de vídeo. | |
| """ | |
| def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24): | |
| """ | |
| Salva um tensor de pixel como um arquivo de vídeo .mp4 usando parâmetros otimizados. | |
| Espera um tensor no formato (B, C, F, H, W) onde B=1. | |
| """ | |
| # Verificações de robustez para garantir que o tensor é válido | |
| if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[0] != 1 or video_tensor.shape[2] == 0: | |
| logger.warning(f"Tensor de vídeo inválido ou vazio recebido. Shape: {video_tensor.shape if video_tensor is not None else 'None'}. Pulando salvamento de vídeo para '{path}'.") | |
| return | |
| logger.info(f"Salvando tensor de vídeo com shape {video_tensor.shape} para '{os.path.basename(path)}'...") | |
| try: | |
| # Squeeze: (1, C, F, H, W) -> (C, F, H, W) | |
| # Permute: (C, F, H, W) -> (F, H, W, C) - formato esperado por imageio | |
| video_tensor_permuted = video_tensor.squeeze(0).permute(1, 2, 3, 0) | |
| # Desnormaliza de [-1, 1] para [0, 1] | |
| video_tensor_normalized = (video_tensor_permuted.clamp(-1, 1) + 1) / 2.0 | |
| # Converte para [0, 255], move para CPU e converte para numpy uint8 | |
| video_np = (video_tensor_normalized.detach().cpu().float().numpy() * 255).astype(np.uint8) | |
| # Salva o vídeo com parâmetros de alta compatibilidade | |
| with imageio.get_writer( | |
| path, | |
| fps=fps, | |
| codec='libx264', | |
| quality=8, # Qualidade boa (0-10, onde 10 é a melhor) | |
| output_params=['-pix_fmt', 'yuv420p'] # Formato de pixel para compatibilidade máxima | |
| ) as writer: | |
| for frame in video_np: | |
| writer.append_data(frame) | |
| logger.info(f"Vídeo salvo com sucesso em: {path}") | |
| except Exception as e: | |
| logger.error(f"Falha ao salvar vídeo com imageio para '{path}': {e}", exc_info=True) | |
| raise VideoToolError(f"Não foi possível escrever o arquivo de vídeo: {e}") | |
| def extract_first_frame(self, video_path: str, output_image_path: str) -> str: | |
| """ | |
| Extrai o primeiro frame de um arquivo de vídeo e o salva como uma imagem. | |
| """ | |
| logger.info(f"Extraindo primeiro frame de '{os.path.basename(video_path)}'...") | |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| return output_image_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"FFmpeg (extract_first_frame) falhou: {e.stderr}") | |
| raise VideoToolError(f"Falha ao extrair o primeiro frame de {video_path}") | |
| def extract_last_frame(self, video_path: str, output_image_path: str) -> str: | |
| """ | |
| Extrai o último frame de um arquivo de vídeo e o salva como uma imagem. | |
| """ | |
| logger.info(f"Extraindo último frame de '{os.path.basename(video_path)}'...") | |
| cmd = ['ffmpeg', '-y', '-v', 'error', '-sseof', '-0.1', '-i', video_path, '-vframes', '1', '-q:v', '2', output_image_path] | |
| try: | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| return output_image_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"FFmpeg (extract_last_frame) falhou: {e.stderr}") | |
| raise VideoToolError(f"Falha ao extrair o último frame de {video_path}") | |
| def create_transition_bridge(self, start_image_path: str, end_image_path: str, | |
| duration: float, fps: int, target_resolution: Tuple[int, int], | |
| workspace_dir: str, effect: Optional[str] = None) -> str: | |
| """ | |
| Cria um clipe de vídeo curto que transiciona entre duas imagens estáticas. | |
| """ | |
| output_path = os.path.join(workspace_dir, f"bridge_{int(time.time())}_{random.randint(100, 999)}.mp4") | |
| width, height = target_resolution | |
| fade_effects = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "dissolve", "fadeblack", "fadewhite", "radial", "rectcrop", "circleopen", "circleclose", "horzopen", "horzclose"] | |
| selected_effect = effect if effect and effect.strip() else random.choice(fade_effects) | |
| transition_duration = max(0.1, duration) | |
| cmd = (f"ffmpeg -y -v error -loop 1 -t {transition_duration} -i \"{start_image_path}\" -loop 1 -t {transition_duration} -i \"{end_image_path}\" " | |
| f"-filter_complex \"[0:v]scale={width}:{height},setsar=1[v0];[1:v]scale={width}:{height},setsar=1[v1];" | |
| f"[v0][v1]xfade=transition={selected_effect}:duration={transition_duration}:offset=0[out]\" " | |
| f"-map \"[out]\" -c:v libx264 -r {fps} -pix_fmt yuv420p \"{output_path}\"") | |
| logger.info(f"Criando ponte de transição com efeito '{selected_effect}'...") | |
| try: | |
| subprocess.run(cmd, shell=True, check=True, text=True) | |
| except subprocess.CalledProcessError as e: | |
| raise VideoToolError(f"Falha ao criar vídeo de transição: {e.stderr}") | |
| return output_path | |
| def concatenate_videos(self, video_paths: List[str], output_path: str, workspace_dir: str) -> str: | |
| """ | |
| Concatena múltiplos clipes de vídeo em um único arquivo sem re-codificar. | |
| """ | |
| if not video_paths: | |
| raise VideoToolError("Nenhum fragmento de vídeo fornecido para concatenação.") | |
| # Se houver apenas um clipe, apenas o copie para o destino final. | |
| if len(video_paths) == 1: | |
| shutil.copy(video_paths[0], output_path) | |
| logger.info(f"Apenas um clipe fornecido. Copiado para '{output_path}'.") | |
| return output_path | |
| list_file_path = os.path.join(workspace_dir, f"concat_list_{int(time.time())}.txt") | |
| try: | |
| with open(list_file_path, 'w', encoding='utf-8') as f: | |
| for path in video_paths: | |
| # Garante que o caminho seja absoluto para o ffmpeg encontrar | |
| f.write(f"file '{os.path.abspath(path)}'\n") | |
| cmd_list = ['ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file_path, '-c', 'copy', output_path] | |
| logger.info(f"Concatenando {len(video_paths)} clipes para '{os.path.basename(output_path)}'...") | |
| subprocess.run(cmd_list, check=True, capture_output=True, text=True) | |
| logger.info("Concatenação FFmpeg bem-sucedida.") | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Falha ao montar o vídeo final com FFmpeg: {e.stderr}") | |
| raise VideoToolError(f"Falha ao montar o vídeo final com FFmpeg.") | |
| finally: | |
| if os.path.exists(list_file_path): | |
| os.remove(list_file_path) | |
| # --- Instância Singleton --- | |
| video_encode_tool_singleton = VideoEncodeTool() |