Aduc-sdr-2_5s / services /vince_pool_manager.py
euIaxs22's picture
Update services/vince_pool_manager.py
ae54e7e verified
raw
history blame
9.76 kB
import os
import sys
import gc
import subprocess
import threading
from pathlib import Path
from typing import List, Optional
import torch
from huggingface_hub import snapshot_download
from omegaconf import OmegaConf, open_dict
# --- Configurações Globais ---
# Os diretórios são definidos para usar /data para persistência, espelhando o SeedVR.
VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
VINCE_GIT_URL = os.getenv("VINCE_GIT_URL", "https://github.com/ByteDance-Seed/VINCIE")
VINCE_REPO_ID = os.getenv("VINCE_REPO_ID", "ByteDance-Seed/VINCIE-3B")
VINCE_CKPT = Path(os.getenv("VINCE_CKPT", "/data/ckpt/VINCIE-3B"))
HF_HOME_CACHE = os.getenv("HF_HOME", "/data/.cache/huggingface")
# --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
class VinceWorker:
"""Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico."""
def __init__(self, device_id: str, config_path: str):
self.device_id_str = device_id
self.gpu_index = self.device_id_str.split(':')[-1]
self.config_path = config_path
self.gen = None
self.config = None
self.device = torch.device(self.device_id_str)
print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU {self.gpu_index}.")
def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
"""Wrapper que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU."""
original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
try:
os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index
if torch.cuda.is_available():
torch.cuda.set_device(0) # 'cuda:0' agora é a nossa GPU alvo
return function_to_run(*args, **kwargs)
finally:
if original_cuda_visible is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
elif 'CUDA_VISIBLE_DEVICES' in os.environ:
del os.environ['CUDA_VISIBLE_DEVICES']
def _load_model_task(self):
"""Tarefa de carregamento do modelo, a ser executada no ambiente isolado."""
print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para a VRAM (GPU física visível: {self.gpu_index})...")
device_for_vincie = 'cuda:0'
original_cwd = Path.cwd()
try:
os.chdir(str(VINCIE_DIR))
if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
from common.config import load_config, create_object
cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
self.gen = create_object(cfg)
self.config = cfg
for name in ("configure_persistence", "configure_models", "configure_diffusion"):
getattr(self.gen, name)()
self.gen.to(torch.device(device_for_vincie))
print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE está 'quente' e pronto na GPU física {self.gpu_index}.")
finally:
os.chdir(original_cwd)
def load_model_to_gpu(self):
if self.gen is None:
self._execute_in_isolated_env(self._load_model_task)
def _infer_task(self, **kwargs) -> Path:
"""Tarefa de inferência, a ser executada no ambiente isolado."""
original_cwd = Path.cwd()
try:
os.chdir(str(VINCIE_DIR))
with open_dict(self.gen.config):
self.gen.config.generation.output.dir = str(kwargs["output_dir"])
image_paths = kwargs.get("image_path", [])
self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
if "prompts" in kwargs:
self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
self.gen.inference_loop()
return Path(kwargs["output_dir"])
finally:
os.chdir(original_cwd)
gc.collect()
torch.cuda.empty_cache()
def infer(self, **kwargs) -> Path:
if self.gen is None:
raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
return self._execute_in_isolated_env(self._infer_task, **kwargs)
# --- Classe Pool Manager (A Orquestradora Singleton) ---
class VincePoolManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
print("Criando a instância singleton do VincePoolManager...")
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, num_gpus: int = 4, output_root: str = "/app/outputs"):
if self._initialized: return
with self._lock:
if self._initialized: return
print("Inicializando o VincePoolManager (Padrão SeedVR)...")
self.output_root = Path(output_root)
self.output_root.mkdir(parents=True, exist_ok=True)
self.worker_lock = threading.Lock()
self.next_worker_idx = 0
self.setup_dependencies()
if not torch.cuda.is_available() or torch.cuda.device_count() < num_gpus:
raise RuntimeError(f"Erro: {num_gpus} GPUs são necessárias, mas {torch.cuda.device_count()} foram encontradas.")
devices = [f'cuda:{i}' for i in range(num_gpus)]
vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
print("Iniciando carregamento dos modelos em paralelo para todas as GPUs...")
threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
for t in threads: t.start()
for t in threads: t.join()
self._initialized = True
print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
def setup_dependencies(self):
"""Garante que o código e os modelos do VINCIE estejam disponíveis, usando cache."""
# 1. Código do Repositório (clona em /data/VINCIE)
if not (VINCIE_DIR / ".git").exists():
print(f"Clonando repositório VINCIE para {VINCIE_DIR}...")
VINCIE_DIR.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(["git", "clone", "--depth", "1", VINCE_GIT_URL, str(VINCIE_DIR)], check=True)
else:
print("Repositório VINCIE já existe em /data/VINCIE.")
# 2. Modelos (com cache, baixando para /data/ckpt/VINCIE-3B)
print(f"Verificando checkpoints VINCIE em {VINCE_CKPT} (usando cache em {HF_HOME_CACHE})...")
try:
snapshot_download(
repo_id=VINCE_REPO_ID,
local_dir=VINCE_CKPT,
#local_dir_use_symlinks=False,
cache_dir=HF_HOME_CACHE,
force_download=False,
token=os.getenv("HF_TOKEN")
)
print("Checkpoints VINCIE prontos.")
except Exception as e:
print(f"ERRO durante o snapshot_download para VINCIE: {e}")
raise
# 3. Symlink para compatibilidade
repo_ckpt_dir = VINCIE_DIR / "ckpt"
repo_ckpt_dir.mkdir(parents=True, exist_ok=True)
link = repo_ckpt_dir / "VINCIE-3B"
if not link.exists():
link.symlink_to(VINCE_CKPT.resolve(), target_is_directory=True)
print(f"Symlink de compatibilidade criado: {link} -> {VINCE_CKPT.resolve()}")
else:
print("Symlink de checkpoint já existe.")
def _get_next_worker(self) -> VinceWorker:
with self.worker_lock:
worker = self.workers[self.next_worker_idx]
self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
print(f"Tarefa despachada para o worker: {worker.device_id_str}")
return worker
def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
worker = self._get_next_worker()
out_dir = self.output_root / f"multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
return worker.infer(**infer_kwargs)
def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
worker = self._get_next_worker()
out_dir = self.output_root / f"multi_concept_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
all_prompts = concept_prompts + [final_prompt]
infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
return worker.infer(**infer_kwargs)
# --- Instância Singleton Global ---
try:
NUM_GPUS_FOR_VINCE = int(os.getenv("VINCE_GPUS", "1"))
# Passamos `output_root` lido da env var para o construtor.
output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
vince_pool_manager_singleton = VincePoolManager(num_gpus=NUM_GPUS_FOR_VINCE, output_root=output_root_path)
except Exception as e:
print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
vince_pool_manager_singleton = None