Carlexxx
feat: ✨ aBINC 2.2
fb56537
raw
history blame
10.6 kB
# managers/mmaudio_manager.py
#
# Copyright (C) 2025 Carlos Rodrigues dos Santos
#
# Version: 3.0.0 (GPU Pool Manager)
#
# Esta versão refatora o MMAudioManager para um modelo de Pool com Workers,
# permitindo o uso de múltiplas GPUs dedicadas para a geração de áudio
# com um sistema de rodízio para gerenciamento eficiente de VRAM.
import torch
import logging
import subprocess
import os
import time
import yaml
import gc
import threading
from pathlib import Path
import gradio as gr
import sys
# Imports relativos para o hardware_manager
from ..tools.hardware_manager import hardware_manager
logger = logging.getLogger(__name__)
# --- Gerenciamento de Dependências ---
DEPS_DIR = Path("./deps")
MMAUDIO_REPO_DIR = DEPS_DIR / "MMAudio"
MMAUDIO_REPO_URL = "https://github.com/hkchengrex/MMAudio.git"
# Lazy-loaded imports
ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video = None, None, None, None, None
MMAudio, get_my_mmaudio = None, None
FeaturesUtils = None
SequenceConfig = None
FlowMatching = None
class MMAudioWorker:
"""Representa uma única instância do pipeline MMAudio em um dispositivo."""
def __init__(self, device_id: str):
self.device = torch.device(device_id)
self.cpu_device = torch.device("cpu")
self.dtype = torch.bfloat16 if 'cuda' in self.device.type else torch.float32
self.net: 'MMAudio' = None
self.feature_utils: 'FeaturesUtils' = None
self.seq_cfg: 'SequenceConfig' = None
self.model_config: 'ModelConfig' = None
self._check_and_run_global_setup()
self._lazy_load_mmaudio_modules()
logger.info(f"MMAudio Worker inicializado para o dispositivo {self.device}.")
def _lazy_load_mmaudio_modules(self):
"""Importa dinamicamente os módulos do MMAudio."""
global ModelConfig, all_model_cfg, mmaudio_generate, load_video, make_video, MMAudio, get_my_mmaudio, FeaturesUtils, SequenceConfig, FlowMatching
if MMAudio is not None: return
from mmaudio.eval_utils import ModelConfig, all_model_cfg, generate as mmaudio_generate, load_video, make_video
from mmaudio.model.flow_matching import FlowMatching
from mmaudio.model.networks import MMAudio, get_my_mmaudio
from mmaudio.model.utils.features_utils import FeaturesUtils
from mmaudio.model.sequence_config import SequenceConfig
logger.info("Módulos do MMAudio foram carregados dinamicamente.")
@staticmethod
def _check_and_run_global_setup():
"""Executa o setup de clonagem do repositório e download de modelos uma única vez."""
setup_flag = DEPS_DIR / "mmaudio.setup.complete"
if setup_flag.exists():
return True
logger.info("--- Iniciando Setup Global do MMAudio (primeira execução) ---")
if not MMAUDIO_REPO_DIR.exists():
DEPS_DIR.mkdir(exist_ok=True)
subprocess.run(["git", "clone", "--depth", "1", MMAUDIO_REPO_URL, str(MMAUDIO_REPO_DIR)], check=True)
if str(MMAUDIO_REPO_DIR.resolve()) not in sys.path:
sys.path.insert(0, str(MMAUDIO_REPO_DIR.resolve()))
# Importar após adicionar ao path
from mmaudio.eval_utils import all_model_cfg as cfg
# Ajustar caminhos e baixar modelos
for cfg_key in cfg:
config = cfg[cfg_key]
config.model_path = MMAUDIO_REPO_DIR / config.model_path
config.vae_path = MMAUDIO_REPO_DIR / config.vae_path
if config.bigvgan_16k_path:
config.bigvgan_16k_path = MMAUDIO_REPO_DIR / config.bigvgan_16k_path
config.synchformer_ckpt = MMAUDIO_REPO_DIR / config.synchformer_ckpt
config.download_if_needed()
setup_flag.touch()
logger.info("--- Setup Global do MMAudio Concluído ---")
return True
def initialize_models(self):
"""Carrega os modelos do worker para a CPU e depois para a GPU designada."""
if self.net is not None: return
self.model_config = all_model_cfg['large_44k_v2']
self.seq_cfg = self.model_config.seq_cfg
logger.info(f"Worker {self.device}: Carregando modelo MMAudio para a CPU...")
self.net = get_my_mmaudio(self.model_config.model_name).eval()
self.net.load_weights(torch.load(self.model_config.model_path, map_location=self.cpu_device, weights_only=True))
self.feature_utils = FeaturesUtils(
tod_vae_ckpt=self.model_config.vae_path,
synchformer_ckpt=self.model_config.synchformer_ckpt,
enable_conditions=True, mode=self.model_config.mode,
bigvgan_vocoder_ckpt=self.model_config.bigvgan_16k_path,
need_vae_encoder=False
).eval()
self.net.to(self.device, self.dtype)
self.feature_utils.to(self.device, self.dtype)
logger.info(f"Worker {self.device}: Modelos MMAudio prontos na VRAM.")
def unload_models(self):
"""Descarrega os modelos da VRAM, movendo-os para a CPU."""
if self.net is None: return
logger.info(f"Worker {self.device}: Descarregando modelos MMAudio da VRAM...")
self.net.to(self.cpu_device)
self.feature_utils.to(self.cpu_device)
del self.net, self.feature_utils, self.seq_cfg, self.model_config
self.net, self.feature_utils, self.seq_cfg, self.model_config = None, None, None, None
gc.collect()
if torch.cuda.is_available(): torch.cuda.empty_cache()
def generate_audio_internal(self, video_path: str, prompt: str, duration_seconds: float, output_path: str) -> str:
"""Lógica de geração de áudio que roda na GPU do worker."""
negative_prompt = "human voice, speech, talking, singing, narration"
rng = torch.Generator(device=self.device).manual_seed(int(time.time()))
fm = FlowMatching(min_sigma=0, inference_mode='euler', num_steps=25)
video_info = load_video(Path(video_path), duration_seconds)
self.seq_cfg.duration = video_info.duration_sec
self.net.update_seq_lengths(self.seq_cfg.latent_seq_len, self.seq_cfg.clip_seq_len, self.seq_cfg.sync_seq_len)
with torch.no_grad():
audios = mmaudio_generate(
clip_video=video_info.clip_frames.unsqueeze(0).to(self.device, self.dtype),
sync_video=video_info.sync_frames.unsqueeze(0).to(self.device, self.dtype),
text=[prompt], negative_text=[negative_prompt],
feature_utils=self.feature_utils, net=self.net, fm=fm, rng=rng, cfg_strength=4.5
)
audio_waveform = audios.float().cpu()[0]
make_video(video_info, Path(output_path), audio_waveform, sampling_rate=self.seq_cfg.sampling_rate)
return output_path
class MMAudioPoolManager:
def __init__(self, device_ids: list[str], workspace_dir: str):
logger.info(f"MMAUDIO POOL MANAGER: Criando workers para os dispositivos: {device_ids}")
self.workspace_dir = workspace_dir
if not device_ids or 'cpu' in device_ids:
raise ValueError("MMAudioPoolManager requer GPUs dedicadas.")
self.workers = [MMAudioWorker(device_id) for device_id in device_ids]
self.current_worker_index = 0
self.lock = threading.Lock()
self.last_cleanup_thread = None
def _cleanup_worker_thread(self, worker: MMAudioWorker):
logger.info(f"MMAUDIO CLEANUP THREAD: Iniciando limpeza de {worker.device} em background...")
worker.unload_models()
def generate_audio_for_video(self, video_path: str, prompt: str, duration_seconds: float, output_path_override: str = None) -> str:
if duration_seconds < 1:
logger.warning(f"Vídeo muito curto ({duration_seconds:.2f}s). Pulando geração de áudio.")
return video_path
worker_to_use = None
try:
with self.lock:
if self.last_cleanup_thread and self.last_cleanup_thread.is_alive():
self.last_cleanup_thread.join()
worker_to_use = self.workers[self.current_worker_index]
previous_worker_index = (self.current_worker_index - 1 + len(self.workers)) % len(self.workers)
worker_to_cleanup = self.workers[previous_worker_index]
cleanup_thread = threading.Thread(target=self._cleanup_worker_thread, args=(worker_to_cleanup,))
cleanup_thread.start()
self.last_cleanup_thread = cleanup_thread
worker_to_use.initialize_models()
self.current_worker_index = (self.current_worker_index + 1) % len(self.workers)
logger.info(f"MMAUDIO POOL MANAGER: Gerando áudio em {worker_to_use.device}...")
output_path = output_path_override or os.path.join(self.workspace_dir, f"{Path(video_path).stem}_with_audio.mp4")
return worker_to_use.generate_audio_internal(
video_path=video_path, prompt=prompt, duration_seconds=duration_seconds, output_path=output_path
)
except Exception as e:
logger.error(f"MMAUDIO POOL MANAGER: Erro durante a geração de áudio: {e}", exc_info=True)
raise gr.Error(f"Falha na geração de áudio: {e}")
# --- Instanciação Singleton ---
class MMAudioPlaceholder:
def generate_audio_for_video(self, video_path, *args, **kwargs):
logger.error("MMAudio não foi inicializado pois nenhuma GPU foi alocada. Pulando etapa de áudio.")
return video_path
try:
with open("config.yaml", 'r') as f:
config = yaml.safe_load(f)
WORKSPACE_DIR = config['application']['workspace_dir']
mmaudio_gpus_required = config['specialists'].get('mmaudio', {}).get('gpus_required', 0)
mmaudio_device_ids = hardware_manager.allocate_gpus('MMAudio', mmaudio_gpus_required)
if mmaudio_gpus_required > 0 and 'cpu' not in mmaudio_device_ids:
mmaudio_manager_singleton = MMAudioPoolManager(device_ids=mmaudio_device_ids, workspace_dir=WORKSPACE_DIR)
logger.info("Especialista de Áudio (MMAudio Pool) pronto.")
else:
mmaudio_manager_singleton = MMAudioPlaceholder()
logger.warning("MMAudio Pool Manager não foi inicializado. Nenhuma GPU foi requisitada na config.yaml.")
except Exception as e:
logger.critical(f"Falha CRÍTICA ao inicializar o MMAudioManager: {e}", exc_info=True)
mmaudio_manager_singleton = MMAudioPlaceholder()