Spaces:
Paused
Paused
| # managers/mmaudio_manager.py | |
| # | |
| # Copyright (C) 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Version: 3.0.0 (GPU Pool Manager) | |
| # | |
| # Esta versão refatora o MMAudioManager para um modelo de Pool com Workers, | |
| # permitindo o uso de múltiplas GPUs dedicadas para a geração de áudio | |
| # com um sistema de rodízio para gerenciamento eficiente de VRAM. | |
| import torch | |
| import logging | |
| import subprocess | |
| import os | |
| import time | |
| import yaml | |
| import gc | |
| import threading | |
| from pathlib import Path | |
| import gradio as gr | |
| import sys | |
| # Imports relativos para o hardware_manager | |
| from ..tools.hardware_manager import hardware_manager | |
| logger = logging.getLogger(__name__) | |
| # --- Gerenciamento de Dependências --- | |
| DEPS_DIR = Path("./deps") | |
| MMAUDIO_REPO_DIR = DEPS_DIR / "MMAudio" | |
| MMAUDIO_REPO_URL = "https://github.com/hkchengrex/MMAudio.git" | |
| # Lazy-loaded imports | |
| ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video = None, None, None, None, None | |
| MMAudio, get_my_mmaudio = None, None | |
| FeaturesUtils = None | |
| SequenceConfig = None | |
| FlowMatching = None | |
| class MMAudioWorker: | |
| """Representa uma única instância do pipeline MMAudio em um dispositivo.""" | |
| def __init__(self, device_id: str): | |
| self.device = torch.device(device_id) | |
| self.cpu_device = torch.device("cpu") | |
| self.dtype = torch.bfloat16 if 'cuda' in self.device.type else torch.float32 | |
| self.net: 'MMAudio' = None | |
| self.feature_utils: 'FeaturesUtils' = None | |
| self.seq_cfg: 'SequenceConfig' = None | |
| self.model_config: 'ModelConfig' = None | |
| self._check_and_run_global_setup() | |
| self._lazy_load_mmaudio_modules() | |
| logger.info(f"MMAudio Worker inicializado para o dispositivo {self.device}.") | |
| def _lazy_load_mmaudio_modules(self): | |
| """Importa dinamicamente os módulos do MMAudio.""" | |
| global ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video, MMAudio, get_my_mmaudio, FeaturesUtils, SequenceConfig, FlowMatching | |
| if MMAudio is not None: return | |
| from mmaudio.eval_utils import ModelConfig, all_model_cfg, generate as mmaudio_generate, load_video, make_video | |
| from mmaudio.model.flow_matching import FlowMatching | |
| from mmaudio.model.networks import MMAudio, get_my_mmaudio | |
| from mmaudio.model.utils.features_utils import FeaturesUtils | |
| from mmaudio.model.sequence_config import SequenceConfig | |
| logger.info("Módulos do MMAudio foram carregados dinamicamente.") | |
| def _check_and_run_global_setup(): | |
| """Executa o setup de clonagem do repositório e download de modelos uma única vez.""" | |
| setup_flag = DEPS_DIR / "mmaudio.setup.complete" | |
| if setup_flag.exists(): | |
| return True | |
| logger.info("--- Iniciando Setup Global do MMAudio (primeira execução) ---") | |
| if not MMAUDIO_REPO_DIR.exists(): | |
| DEPS_DIR.mkdir(exist_ok=True) | |
| subprocess.run(["git", "clone", "--depth", "1", MMAUDIO_REPO_URL, str(MMAUDIO_REPO_DIR)], check=True) | |
| if str(MMAUDIO_REPO_DIR.resolve()) not in sys.path: | |
| sys.path.insert(0, str(MMAUDIO_REPO_DIR.resolve())) | |
| # Importar após adicionar ao path | |
| from mmaudio.eval_utils import all_model_cfg as cfg | |
| # Ajustar caminhos e baixar modelos | |
| for cfg_key in cfg: | |
| config = cfg[cfg_key] | |
| config.model_path = MMAUDIO_REPO_DIR / config.model_path | |
| config.vae_path = MMAUDIO_REPO_DIR / config.vae_path | |
| if config.bigvgan_16k_path: | |
| config.bigvgan_16k_path = MMAUDIO_REPO_DIR / config.bigvgan_16k_path | |
| config.synchformer_ckpt = MMAUDIO_REPO_DIR / config.synchformer_ckpt | |
| config.download_if_needed() | |
| setup_flag.touch() | |
| logger.info("--- Setup Global do MMAudio Concluído ---") | |
| return True | |
| def initialize_models(self): | |
| """Carrega os modelos do worker para a CPU e depois para a GPU designada.""" | |
| if self.net is not None: return | |
| self.model_config = all_model_cfg['large_44k_v2'] | |
| self.seq_cfg = self.model_config.seq_cfg | |
| logger.info(f"Worker {self.device}: Carregando modelo MMAudio para a CPU...") | |
| self.net = get_my_mmaudio(self.model_config.model_name).eval() | |
| self.net.load_weights(torch.load(self.model_config.model_path, map_location=self.cpu_device, weights_only=True)) | |
| self.feature_utils = FeaturesUtils( | |
| tod_vae_ckpt=self.model_config.vae_path, | |
| synchformer_ckpt=self.model_config.synchformer_ckpt, | |
| enable_conditions=True, mode=self.model_config.mode, | |
| bigvgan_vocoder_ckpt=self.model_config.bigvgan_16k_path, | |
| need_vae_encoder=False | |
| ).eval() | |
| self.net.to(self.device, self.dtype) | |
| self.feature_utils.to(self.device, self.dtype) | |
| logger.info(f"Worker {self.device}: Modelos MMAudio prontos na VRAM.") | |
| def unload_models(self): | |
| """Descarrega os modelos da VRAM, movendo-os para a CPU.""" | |
| if self.net is None: return | |
| logger.info(f"Worker {self.device}: Descarregando modelos MMAudio da VRAM...") | |
| self.net.to(self.cpu_device) | |
| self.feature_utils.to(self.cpu_device) | |
| del self.net, self.feature_utils, self.seq_cfg, self.model_config | |
| self.net, self.feature_utils, self.seq_cfg, self.model_config = None, None, None, None | |
| gc.collect() | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| def generate_audio_internal(self, video_path: str, prompt: str, duration_seconds: float, output_path: str) -> str: | |
| """Lógica de geração de áudio que roda na GPU do worker.""" | |
| negative_prompt = "human voice, speech, talking, singing, narration" | |
| rng = torch.Generator(device=self.device).manual_seed(int(time.time())) | |
| fm = FlowMatching(min_sigma=0, inference_mode='euler', num_steps=25) | |
| video_info = load_video(Path(video_path), duration_seconds) | |
| self.seq_cfg.duration = video_info.duration_sec | |
| self.net.update_seq_lengths(self.seq_cfg.latent_seq_len, self.seq_cfg.clip_seq_len, self.seq_cfg.sync_seq_len) | |
| with torch.no_grad(): | |
| audios = mmaudio_generate( | |
| clip_video=video_info.clip_frames.unsqueeze(0).to(self.device, self.dtype), | |
| sync_video=video_info.sync_frames.unsqueeze(0).to(self.device, self.dtype), | |
| text=[prompt], negative_text=[negative_prompt], | |
| feature_utils=self.feature_utils, net=self.net, fm=fm, rng=rng, cfg_strength=4.5 | |
| ) | |
| audio_waveform = audios.float().cpu()[0] | |
| make_video(video_info, Path(output_path), audio_waveform, sampling_rate=self.seq_cfg.sampling_rate) | |
| return output_path | |
| class MMAudioPoolManager: | |
| def __init__(self, device_ids: list[str], workspace_dir: str): | |
| logger.info(f"MMAUDIO POOL MANAGER: Criando workers para os dispositivos: {device_ids}") | |
| self.workspace_dir = workspace_dir | |
| if not device_ids or 'cpu' in device_ids: | |
| raise ValueError("MMAudioPoolManager requer GPUs dedicadas.") | |
| self.workers = [MMAudioWorker(device_id) for device_id in device_ids] | |
| self.current_worker_index = 0 | |
| self.lock = threading.Lock() | |
| self.last_cleanup_thread = None | |
| def _cleanup_worker_thread(self, worker: MMAudioWorker): | |
| logger.info(f"MMAUDIO CLEANUP THREAD: Iniciando limpeza de {worker.device} em background...") | |
| worker.unload_models() | |
| def generate_audio_for_video(self, video_path: str, prompt: str, duration_seconds: float, output_path_override: str = None) -> str: | |
| if duration_seconds < 1: | |
| logger.warning(f"Vídeo muito curto ({duration_seconds:.2f}s). Pulando geração de áudio.") | |
| return video_path | |
| worker_to_use = None | |
| try: | |
| with self.lock: | |
| if self.last_cleanup_thread and self.last_cleanup_thread.is_alive(): | |
| self.last_cleanup_thread.join() | |
| worker_to_use = self.workers[self.current_worker_index] | |
| previous_worker_index = (self.current_worker_index - 1 + len(self.workers)) % len(self.workers) | |
| worker_to_cleanup = self.workers[previous_worker_index] | |
| cleanup_thread = threading.Thread(target=self._cleanup_worker_thread, args=(worker_to_cleanup,)) | |
| cleanup_thread.start() | |
| self.last_cleanup_thread = cleanup_thread | |
| worker_to_use.initialize_models() | |
| self.current_worker_index = (self.current_worker_index + 1) % len(self.workers) | |
| logger.info(f"MMAUDIO POOL MANAGER: Gerando áudio em {worker_to_use.device}...") | |
| output_path = output_path_override or os.path.join(self.workspace_dir, f"{Path(video_path).stem}_with_audio.mp4") | |
| return worker_to_use.generate_audio_internal( | |
| video_path=video_path, prompt=prompt, duration_seconds=duration_seconds, output_path=output_path | |
| ) | |
| except Exception as e: | |
| logger.error(f"MMAUDIO POOL MANAGER: Erro durante a geração de áudio: {e}", exc_info=True) | |
| raise gr.Error(f"Falha na geração de áudio: {e}") | |
| # --- Instanciação Singleton --- | |
| class MMAudioPlaceholder: | |
| def generate_audio_for_video(self, video_path, *args, **kwargs): | |
| logger.error("MMAudio não foi inicializado pois nenhuma GPU foi alocada. Pulando etapa de áudio.") | |
| return video_path | |
| try: | |
| with open("config.yaml", 'r') as f: | |
| config = yaml.safe_load(f) | |
| WORKSPACE_DIR = config['application']['workspace_dir'] | |
| mmaudio_gpus_required = config['specialists'].get('mmaudio', {}).get('gpus_required', 0) | |
| mmaudio_device_ids = hardware_manager.allocate_gpus('MMAudio', mmaudio_gpus_required) | |
| if mmaudio_gpus_required > 0 and 'cpu' not in mmaudio_device_ids: | |
| mmaudio_manager_singleton = MMAudioPoolManager(device_ids=mmaudio_device_ids, workspace_dir=WORKSPACE_DIR) | |
| logger.info("Especialista de Áudio (MMAudio Pool) pronto.") | |
| else: | |
| mmaudio_manager_singleton = MMAudioPlaceholder() | |
| logger.warning("MMAudio Pool Manager não foi inicializado. Nenhuma GPU foi requisitada na config.yaml.") | |
| except Exception as e: | |
| logger.critical(f"Falha CRÍTICA ao inicializar o MMAudioManager: {e}", exc_info=True) | |
| mmaudio_manager_singleton = MMAudioPlaceholder() |