Spaces:
Paused
Paused
| import os | |
| import sys | |
| import gc | |
| import subprocess | |
| import threading | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import torch | |
| from huggingface_hub import snapshot_download | |
| from omegaconf import OmegaConf, open_dict | |
| # --- Configurações Globais --- | |
| # Os diretórios são definidos para usar /data para persistência, espelhando o SeedVR. | |
| VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE")) | |
| VINCE_GIT_URL = os.getenv("VINCE_GIT_URL", "https://github.com/ByteDance-Seed/VINCIE") | |
| VINCE_REPO_ID = os.getenv("VINCE_REPO_ID", "ByteDance-Seed/VINCIE-3B") | |
| VINCE_CKPT = Path(os.getenv("VINCE_CKPT", "/data/ckpt/VINCIE-3B")) | |
| HF_HOME_CACHE = os.getenv("HF_HOME", "/data/.cache/huggingface") | |
| # --- Classe Worker (Gerencia uma única GPU de forma isolada) --- | |
| class VinceWorker: | |
| """Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.""" | |
| def __init__(self, device_id: str, config_path: str): | |
| self.device_id_str = device_id | |
| self.gpu_index = self.device_id_str.split(':')[-1] | |
| self.config_path = config_path | |
| self.gen = None | |
| self.config = None | |
| self.device = torch.device(self.device_id_str) | |
| print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU {self.gpu_index}.") | |
| def _execute_in_isolated_env(self, function_to_run, *args, **kwargs): | |
| """Wrapper que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.""" | |
| original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES') | |
| try: | |
| os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index | |
| if torch.cuda.is_available(): | |
| torch.cuda.set_device(0) # 'cuda:0' agora é a nossa GPU alvo | |
| return function_to_run(*args, **kwargs) | |
| finally: | |
| if original_cuda_visible is not None: | |
| os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible | |
| elif 'CUDA_VISIBLE_DEVICES' in os.environ: | |
| del os.environ['CUDA_VISIBLE_DEVICES'] | |
| def _load_model_task(self): | |
| """Tarefa de carregamento do modelo, a ser executada no ambiente isolado.""" | |
| print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para a VRAM (GPU física visível: {self.gpu_index})...") | |
| device_for_vincie = 'cuda:0' | |
| original_cwd = Path.cwd() | |
| try: | |
| os.chdir(str(VINCIE_DIR)) | |
| if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR)) | |
| from common.config import load_config, create_object | |
| cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"]) | |
| self.gen = create_object(cfg) | |
| self.config = cfg | |
| for name in ("configure_persistence", "configure_models", "configure_diffusion"): | |
| getattr(self.gen, name)() | |
| self.gen.to(torch.device(device_for_vincie)) | |
| print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE está 'quente' e pronto na GPU física {self.gpu_index}.") | |
| finally: | |
| os.chdir(original_cwd) | |
| def load_model_to_gpu(self): | |
| if self.gen is None: | |
| self._execute_in_isolated_env(self._load_model_task) | |
| def _infer_task(self, **kwargs) -> Path: | |
| """Tarefa de inferência, a ser executada no ambiente isolado.""" | |
| original_cwd = Path.cwd() | |
| try: | |
| os.chdir(str(VINCIE_DIR)) | |
| with open_dict(self.gen.config): | |
| self.gen.config.generation.output.dir = str(kwargs["output_dir"]) | |
| image_paths = kwargs.get("image_path", []) | |
| self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)] | |
| if "prompts" in kwargs: | |
| self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"]) | |
| if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None: | |
| self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"]) | |
| self.gen.inference_loop() | |
| return Path(kwargs["output_dir"]) | |
| finally: | |
| os.chdir(original_cwd) | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| def infer(self, **kwargs) -> Path: | |
| if self.gen is None: | |
| raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.") | |
| return self._execute_in_isolated_env(self._infer_task, **kwargs) | |
| # --- Classe Pool Manager (A Orquestradora Singleton) --- | |
| class VincePoolManager: | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls, *args, **kwargs): | |
| with cls._lock: | |
| if cls._instance is None: | |
| print("Criando a instância singleton do VincePoolManager...") | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self, num_gpus: int = 4, output_root: str = "/app/outputs"): | |
| if self._initialized: return | |
| with self._lock: | |
| if self._initialized: return | |
| print("Inicializando o VincePoolManager (Padrão SeedVR)...") | |
| self.output_root = Path(output_root) | |
| self.output_root.mkdir(parents=True, exist_ok=True) | |
| self.worker_lock = threading.Lock() | |
| self.next_worker_idx = 0 | |
| self.setup_dependencies() | |
| if not torch.cuda.is_available() or torch.cuda.device_count() < num_gpus: | |
| raise RuntimeError(f"Erro: {num_gpus} GPUs são necessárias, mas {torch.cuda.device_count()} foram encontradas.") | |
| devices = [f'cuda:{i}' for i in range(num_gpus)] | |
| vincie_config_path = VINCIE_DIR / "configs/generate.yaml" | |
| self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices] | |
| print("Iniciando carregamento dos modelos em paralelo para todas as GPUs...") | |
| threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers] | |
| for t in threads: t.start() | |
| for t in threads: t.join() | |
| self._initialized = True | |
| print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.") | |
| def setup_dependencies(self): | |
| """Garante que o código e os modelos do VINCIE estejam disponíveis, usando cache.""" | |
| # 1. Código do Repositório (clona em /data/VINCIE) | |
| if not (VINCIE_DIR / ".git").exists(): | |
| print(f"Clonando repositório VINCIE para {VINCIE_DIR}...") | |
| VINCIE_DIR.parent.mkdir(parents=True, exist_ok=True) | |
| subprocess.run(["git", "clone", "--depth", "1", VINCE_GIT_URL, str(VINCIE_DIR)], check=True) | |
| else: | |
| print("Repositório VINCIE já existe em /data/VINCIE.") | |
| # 2. Modelos (com cache, baixando para /data/ckpt/VINCIE-3B) | |
| print(f"Verificando checkpoints VINCIE em {VINCE_CKPT} (usando cache em {HF_HOME_CACHE})...") | |
| try: | |
| snapshot_download( | |
| repo_id=VINCE_REPO_ID, | |
| local_dir=VINCE_CKPT, | |
| #local_dir_use_symlinks=False, | |
| cache_dir=HF_HOME_CACHE, | |
| force_download=False, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| print("Checkpoints VINCIE prontos.") | |
| except Exception as e: | |
| print(f"ERRO durante o snapshot_download para VINCIE: {e}") | |
| raise | |
| # 3. Symlink para compatibilidade | |
| repo_ckpt_dir = VINCIE_DIR / "ckpt" | |
| repo_ckpt_dir.mkdir(parents=True, exist_ok=True) | |
| link = repo_ckpt_dir / "VINCIE-3B" | |
| if not link.exists(): | |
| link.symlink_to(VINCE_CKPT.resolve(), target_is_directory=True) | |
| print(f"Symlink de compatibilidade criado: {link} -> {VINCE_CKPT.resolve()}") | |
| else: | |
| print("Symlink de checkpoint já existe.") | |
| def _get_next_worker(self) -> VinceWorker: | |
| with self.worker_lock: | |
| worker = self.workers[self.next_worker_idx] | |
| self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers) | |
| print(f"Tarefa despachada para o worker: {worker.device_id_str}") | |
| return worker | |
| def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path: | |
| worker = self._get_next_worker() | |
| out_dir = self.output_root / f"multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}" | |
| out_dir.mkdir(parents=True) | |
| infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs} | |
| return worker.infer(**infer_kwargs) | |
| def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path: | |
| worker = self._get_next_worker() | |
| out_dir = self.output_root / f"multi_concept_{os.urandom(4).hex()}" | |
| out_dir.mkdir(parents=True) | |
| all_prompts = concept_prompts + [final_prompt] | |
| infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs} | |
| return worker.infer(**infer_kwargs) | |
| # --- Instância Singleton Global --- | |
| try: | |
| NUM_GPUS_FOR_VINCE = int(os.getenv("VINCE_GPUS", "1")) | |
| # Passamos `output_root` lido da env var para o construtor. | |
| output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs") | |
| vince_pool_manager_singleton = VincePoolManager(num_gpus=NUM_GPUS_FOR_VINCE, output_root=output_root_path) | |
| except Exception as e: | |
| print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr) | |
| vince_pool_manager_singleton = None |