# managers/mmaudio_manager.py # # Copyright (C) 2025 Carlos Rodrigues dos Santos # # Version: 3.0.0 (GPU Pool Manager) # # Esta versão refatora o MMAudioManager para um modelo de Pool com Workers, # permitindo o uso de múltiplas GPUs dedicadas para a geração de áudio # com um sistema de rodízio para gerenciamento eficiente de VRAM. import torch import logging import subprocess import os import time import yaml import gc import threading from pathlib import Path import gradio as gr import sys # Imports relativos para o hardware_manager from ..tools.hardware_manager import hardware_manager logger = logging.getLogger(__name__) # --- Gerenciamento de Dependências --- DEPS_DIR = Path("./deps") MMAUDIO_REPO_DIR = DEPS_DIR / "MMAudio" MMAUDIO_REPO_URL = "https://github.com/hkchengrex/MMAudio.git" # Lazy-loaded imports ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video = None, None, None, None, None MMAudio, get_my_mmaudio = None, None FeaturesUtils = None SequenceConfig = None FlowMatching = None class MMAudioWorker: """Representa uma única instância do pipeline MMAudio em um dispositivo.""" def __init__(self, device_id: str): self.device = torch.device(device_id) self.cpu_device = torch.device("cpu") self.dtype = torch.bfloat16 if 'cuda' in self.device.type else torch.float32 self.net: 'MMAudio' = None self.feature_utils: 'FeaturesUtils' = None self.seq_cfg: 'SequenceConfig' = None self.model_config: 'ModelConfig' = None self._check_and_run_global_setup() self._lazy_load_mmaudio_modules() logger.info(f"MMAudio Worker inicializado para o dispositivo {self.device}.") def _lazy_load_mmaudio_modules(self): """Importa dinamicamente os módulos do MMAudio.""" global ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video, MMAudio, get_my_mmaudio, FeaturesUtils, SequenceConfig, FlowMatching if MMAudio is not None: return from mmaudio.eval_utils import ModelConfig, all_model_cfg, generate as mmaudio_generate, load_video, make_video from mmaudio.model.flow_matching import FlowMatching from mmaudio.model.networks import MMAudio, get_my_mmaudio from mmaudio.model.utils.features_utils import FeaturesUtils from mmaudio.model.sequence_config import SequenceConfig logger.info("Módulos do MMAudio foram carregados dinamicamente.") @staticmethod def _check_and_run_global_setup(): """Executa o setup de clonagem do repositório e download de modelos uma única vez.""" setup_flag = DEPS_DIR / "mmaudio.setup.complete" if setup_flag.exists(): return True logger.info("--- Iniciando Setup Global do MMAudio (primeira execução) ---") if not MMAUDIO_REPO_DIR.exists(): DEPS_DIR.mkdir(exist_ok=True) subprocess.run(["git", "clone", "--depth", "1", MMAUDIO_REPO_URL, str(MMAUDIO_REPO_DIR)], check=True) if str(MMAUDIO_REPO_DIR.resolve()) not in sys.path: sys.path.insert(0, str(MMAUDIO_REPO_DIR.resolve())) # Importar após adicionar ao path from mmaudio.eval_utils import all_model_cfg as cfg # Ajustar caminhos e baixar modelos for cfg_key in cfg: config = cfg[cfg_key] config.model_path = MMAUDIO_REPO_DIR / config.model_path config.vae_path = MMAUDIO_REPO_DIR / config.vae_path if config.bigvgan_16k_path: config.bigvgan_16k_path = MMAUDIO_REPO_DIR / config.bigvgan_16k_path config.synchformer_ckpt = MMAUDIO_REPO_DIR / config.synchformer_ckpt config.download_if_needed() setup_flag.touch() logger.info("--- Setup Global do MMAudio Concluído ---") return True def initialize_models(self): """Carrega os modelos do worker para a CPU e depois para a GPU designada.""" if self.net is not None: return self.model_config = all_model_cfg['large_44k_v2'] self.seq_cfg = self.model_config.seq_cfg logger.info(f"Worker {self.device}: Carregando modelo MMAudio para a CPU...") self.net = get_my_mmaudio(self.model_config.model_name).eval() self.net.load_weights(torch.load(self.model_config.model_path, map_location=self.cpu_device, weights_only=True)) self.feature_utils = FeaturesUtils( tod_vae_ckpt=self.model_config.vae_path, synchformer_ckpt=self.model_config.synchformer_ckpt, enable_conditions=True, mode=self.model_config.mode, bigvgan_vocoder_ckpt=self.model_config.bigvgan_16k_path, need_vae_encoder=False ).eval() self.net.to(self.device, self.dtype) self.feature_utils.to(self.device, self.dtype) logger.info(f"Worker {self.device}: Modelos MMAudio prontos na VRAM.") def unload_models(self): """Descarrega os modelos da VRAM, movendo-os para a CPU.""" if self.net is None: return logger.info(f"Worker {self.device}: Descarregando modelos MMAudio da VRAM...") self.net.to(self.cpu_device) self.feature_utils.to(self.cpu_device) del self.net, self.feature_utils, self.seq_cfg, self.model_config self.net, self.feature_utils, self.seq_cfg, self.model_config = None, None, None, None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def generate_audio_internal(self, video_path: str, prompt: str, duration_seconds: float, output_path: str) -> str: """Lógica de geração de áudio que roda na GPU do worker.""" negative_prompt = "human voice, speech, talking, singing, narration" rng = torch.Generator(device=self.device).manual_seed(int(time.time())) fm = FlowMatching(min_sigma=0, inference_mode='euler', num_steps=25) video_info = load_video(Path(video_path), duration_seconds) self.seq_cfg.duration = video_info.duration_sec self.net.update_seq_lengths(self.seq_cfg.latent_seq_len, self.seq_cfg.clip_seq_len, self.seq_cfg.sync_seq_len) with torch.no_grad(): audios = mmaudio_generate( clip_video=video_info.clip_frames.unsqueeze(0).to(self.device, self.dtype), sync_video=video_info.sync_frames.unsqueeze(0).to(self.device, self.dtype), text=[prompt], negative_text=[negative_prompt], feature_utils=self.feature_utils, net=self.net, fm=fm, rng=rng, cfg_strength=4.5 ) audio_waveform = audios.float().cpu()[0] make_video(video_info, Path(output_path), audio_waveform, sampling_rate=self.seq_cfg.sampling_rate) return output_path class MMAudioPoolManager: def __init__(self, device_ids: list[str], workspace_dir: str): logger.info(f"MMAUDIO POOL MANAGER: Criando workers para os dispositivos: {device_ids}") self.workspace_dir = workspace_dir if not device_ids or 'cpu' in device_ids: raise ValueError("MMAudioPoolManager requer GPUs dedicadas.") self.workers = [MMAudioWorker(device_id) for device_id in device_ids] self.current_worker_index = 0 self.lock = threading.Lock() self.last_cleanup_thread = None def _cleanup_worker_thread(self, worker: MMAudioWorker): logger.info(f"MMAUDIO CLEANUP THREAD: Iniciando limpeza de {worker.device} em background...") worker.unload_models() def generate_audio_for_video(self, video_path: str, prompt: str, duration_seconds: float, output_path_override: str = None) -> str: if duration_seconds < 1: logger.warning(f"Vídeo muito curto ({duration_seconds:.2f}s). Pulando geração de áudio.") return video_path worker_to_use = None try: with self.lock: if self.last_cleanup_thread and self.last_cleanup_thread.is_alive(): self.last_cleanup_thread.join() worker_to_use = self.workers[self.current_worker_index] previous_worker_index = (self.current_worker_index - 1 + len(self.workers)) % len(self.workers) worker_to_cleanup = self.workers[previous_worker_index] cleanup_thread = threading.Thread(target=self._cleanup_worker_thread, args=(worker_to_cleanup,)) cleanup_thread.start() self.last_cleanup_thread = cleanup_thread worker_to_use.initialize_models() self.current_worker_index = (self.current_worker_index + 1) % len(self.workers) logger.info(f"MMAUDIO POOL MANAGER: Gerando áudio em {worker_to_use.device}...") output_path = output_path_override or os.path.join(self.workspace_dir, f"{Path(video_path).stem}_with_audio.mp4") return worker_to_use.generate_audio_internal( video_path=video_path, prompt=prompt, duration_seconds=duration_seconds, output_path=output_path ) except Exception as e: logger.error(f"MMAUDIO POOL MANAGER: Erro durante a geração de áudio: {e}", exc_info=True) raise gr.Error(f"Falha na geração de áudio: {e}") # --- Instanciação Singleton --- class MMAudioPlaceholder: def generate_audio_for_video(self, video_path, *args, **kwargs): logger.error("MMAudio não foi inicializado pois nenhuma GPU foi alocada. Pulando etapa de áudio.") return video_path try: with open("config.yaml", 'r') as f: config = yaml.safe_load(f) WORKSPACE_DIR = config['application']['workspace_dir'] mmaudio_gpus_required = config['specialists'].get('mmaudio', {}).get('gpus_required', 0) mmaudio_device_ids = hardware_manager.allocate_gpus('MMAudio', mmaudio_gpus_required) if mmaudio_gpus_required > 0 and 'cpu' not in mmaudio_device_ids: mmaudio_manager_singleton = MMAudioPoolManager(device_ids=mmaudio_device_ids, workspace_dir=WORKSPACE_DIR) logger.info("Especialista de Áudio (MMAudio Pool) pronto.") else: mmaudio_manager_singleton = MMAudioPlaceholder() logger.warning("MMAudio Pool Manager não foi inicializado. Nenhuma GPU foi requisitada na config.yaml.") except Exception as e: logger.critical(f"Falha CRÍTICA ao inicializar o MMAudioManager: {e}", exc_info=True) mmaudio_manager_singleton = MMAudioPlaceholder()