Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| LinguaCustodia Financial AI API - Clean Production Version | |
| Consolidated, production-ready API with proper architecture. | |
| Version: 24.1.0 - vLLM Backend with ModelInfo fixes | |
| """ | |
| import os | |
| import sys | |
| import uvicorn | |
| import json | |
| import time | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import Optional, Dict, Any, AsyncIterator, List | |
| import logging | |
| import asyncio | |
| import threading | |
| # Fix OMP_NUM_THREADS warning | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| # Load environment variables | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Inline Configuration Pattern for HuggingFace Spaces Deployment | |
| # This avoids module import issues in containerized environments | |
| ARCHITECTURE = "Inline Configuration (HF Optimized)" | |
| # Inline model configuration (synchronized with lingua_fin/config/) | |
| MODEL_CONFIG = { | |
| # LinguaCustodia Pro Finance Suite Models | |
| "pro-finance-large": { | |
| "model_id": "LinguaCustodia/Llama-Pro-Finance-Large", | |
| "display_name": "Llama Pro Finance Large", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "70B", | |
| "memory_gb": 140, | |
| "vram_gb": 80, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "pro-finance-medium": { | |
| "model_id": "LinguaCustodia/LLM-Pro-Finance-Medium", | |
| "display_name": "LLM Pro Finance Medium", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "32B", | |
| "memory_gb": 64, | |
| "vram_gb": 32, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "pro-finance-small": { | |
| "model_id": "LinguaCustodia/LLM-Pro-Finance-Small", | |
| "display_name": "LLM Pro Finance Small", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "8B", | |
| "memory_gb": 16, | |
| "vram_gb": 8, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "pro-finance-mini": { | |
| "model_id": "LinguaCustodia/LLM-Pro-Finance-Mini", | |
| "display_name": "LLM Pro Finance Mini", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "3B", | |
| "memory_gb": 6, | |
| "vram_gb": 3, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "llama-pro-finance-mini": { | |
| "model_id": "LinguaCustodia/Llama-Pro-Finance-Mini", | |
| "display_name": "Llama Pro Finance Mini", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "1B", | |
| "memory_gb": 3, | |
| "vram_gb": 2, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "fin-pythia-1.4b": { | |
| "model_id": "LinguaCustodia/fin-pythia-1.4b", | |
| "display_name": "Fin-Pythia 1.4B Financial", | |
| "architecture": "GPTNeoXForCausalLM", | |
| "parameters": "1.4B", | |
| "memory_gb": 3, | |
| "vram_gb": 2, | |
| "eos_token_id": 0, | |
| "bos_token_id": 0, | |
| "vocab_size": 50304 | |
| }, | |
| # v1.0 Models (Latest Generation) | |
| "llama3.1-8b-v1.0": { | |
| "model_id": "LinguaCustodia/llama3.1-8b-fin-v1.0", | |
| "display_name": "Llama 3.1 8B Financial v1.0", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "8B", | |
| "memory_gb": 16, | |
| "vram_gb": 8, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "qwen3-8b-v1.0": { | |
| "model_id": "LinguaCustodia/qwen3-8b-fin-v1.0", | |
| "display_name": "Qwen 3 8B Financial v1.0", | |
| "architecture": "Qwen3ForCausalLM", | |
| "parameters": "8B", | |
| "memory_gb": 16, | |
| "vram_gb": 8, | |
| "eos_token_id": 151645, | |
| "bos_token_id": None, | |
| "vocab_size": 151936 | |
| }, | |
| "qwen3-32b-v1.0": { | |
| "model_id": "LinguaCustodia/qwen3-32b-fin-v1.0", | |
| "display_name": "Qwen 3 32B Financial v1.0", | |
| "architecture": "Qwen3ForCausalLM", | |
| "parameters": "32B", | |
| "memory_gb": 64, | |
| "vram_gb": 32, | |
| "eos_token_id": 151645, | |
| "bos_token_id": None, | |
| "vocab_size": 151936 | |
| }, | |
| "llama3.1-70b-v1.0": { | |
| "model_id": "LinguaCustodia/llama3.1-70b-fin-v1.0", | |
| "display_name": "Llama 3.1 70B Financial v1.0", | |
| "architecture": "LlamaForCausalLM", | |
| "parameters": "70B", | |
| "memory_gb": 140, | |
| "vram_gb": 80, | |
| "eos_token_id": 128009, | |
| "bos_token_id": 128000, | |
| "vocab_size": 128000 | |
| }, | |
| "gemma3-12b-v1.0": { | |
| "model_id": "LinguaCustodia/gemma3-12b-fin-v1.0", | |
| "display_name": "Gemma 3 12B Financial v1.0", | |
| "architecture": "GemmaForCausalLM", | |
| "parameters": "12B", | |
| "memory_gb": 32, | |
| "vram_gb": 12, | |
| "eos_token_id": 1, | |
| "bos_token_id": 2, | |
| "vocab_size": 262144 | |
| } | |
| } | |
| # Inline generation configuration | |
| GENERATION_CONFIG = { | |
| "temperature": 0.6, | |
| "top_p": 0.9, | |
| "max_new_tokens": 150, | |
| "repetition_penalty": 1.05, | |
| "early_stopping": False, | |
| "min_length": 50 | |
| } | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="LinguaCustodia Financial AI API", | |
| description=f"Production-ready API with {ARCHITECTURE}", | |
| version="23.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Pydantic models for API | |
| class InferenceRequest(BaseModel): | |
| prompt: str | |
| max_new_tokens: Optional[int] = 150 | |
| temperature: Optional[float] = 0.6 | |
| class InferenceResponse(BaseModel): | |
| response: str | |
| model_used: str | |
| success: bool | |
| tokens_generated: int | |
| generation_params: Dict[str, Any] | |
| class HealthResponse(BaseModel): | |
| status: str | |
| model_loaded: bool | |
| current_model: Optional[str] | |
| gpu_available: bool | |
| memory_usage: Optional[Dict[str, Any]] | |
| storage_info: Optional[Dict[str, Any]] | |
| architecture: str | |
| loading_status: Optional[Dict[str, Any]] = None | |
| # Global variables for inline configuration | |
| model = None | |
| tokenizer = None | |
| pipe = None | |
| model_loaded = False | |
| current_model_name = None | |
| storage_info = None | |
| # Platform-Specific vLLM Configurations | |
| def get_vllm_config_for_model(model_name: str, platform: str = "huggingface") -> dict: | |
| """Get vLLM configuration optimized for specific model and platform.""" | |
| base_config = { | |
| "tensor_parallel_size": 1, # Single GPU | |
| "pipeline_parallel_size": 1, # No pipeline parallelism | |
| "trust_remote_code": True, # Required for LinguaCustodia | |
| "dtype": "bfloat16", # L40 GPU optimization | |
| "enforce_eager": True, # Disable CUDA graphs (HF compatibility - conservative) | |
| "disable_custom_all_reduce": True, # Disable custom kernels (HF compatibility) | |
| "disable_log_stats": True, # Reduce logging overhead | |
| } | |
| # Model-specific context length configurations | |
| if "llama3.1-8b" in model_name: | |
| max_context = 128000 # Llama 3.1 8B supports 128K | |
| elif "qwen3-8b" in model_name: | |
| max_context = 32768 # Qwen 3 8B supports 32K | |
| elif "qwen3-32b" in model_name: | |
| max_context = 32768 # Qwen 3 32B supports 32K | |
| elif "llama3.1-70b" in model_name: | |
| max_context = 128000 # Llama 3.1 70B supports 128K | |
| elif "gemma3-12b" in model_name: | |
| max_context = 8192 # Gemma 3 12B supports 8K | |
| else: | |
| max_context = 32768 # Default fallback | |
| if platform == "huggingface": | |
| # Model-specific configurations for HF L40 (48GB VRAM) | |
| if "32b" in model_name.lower() or "70b" in model_name.lower(): | |
| # ⚠️ WARNING: 32B and 70B models are too large for L40 GPU (48GB VRAM) | |
| # These configurations are experimental and may not work | |
| return { | |
| **base_config, | |
| "gpu_memory_utilization": 0.50, # Extremely conservative for large models | |
| "max_model_len": min(max_context, 4096), # Use model's max or 4K for HF | |
| "max_num_batched_tokens": min(max_context, 4096), # Reduced batching | |
| } | |
| elif "12b" in model_name.lower(): | |
| # ⚠️ WARNING: Gemma 12B is too large for L40 GPU (48GB VRAM) | |
| # Model weights load fine (~22GB) but KV cache allocation fails | |
| return { | |
| **base_config, | |
| "gpu_memory_utilization": 0.50, # Conservative for 12B model | |
| "max_model_len": min(max_context, 2048), # Use model's max or 2K for HF | |
| "max_num_batched_tokens": min(max_context, 2048), # Reduced batching | |
| } | |
| else: | |
| # Default for 8B and smaller models | |
| return { | |
| **base_config, | |
| "gpu_memory_utilization": 0.75, # Standard for 8B models | |
| "max_model_len": max_context, # Use model's actual max context | |
| "max_num_batched_tokens": max_context, # Full batching | |
| } | |
| else: | |
| # Scaleway configuration (more aggressive) | |
| return { | |
| **base_config, | |
| "gpu_memory_utilization": 0.85, # Aggressive for Scaleway L40S | |
| "max_model_len": max_context, # Use model's actual max context | |
| "max_num_batched_tokens": max_context, # Full batching | |
| "enforce_eager": False, # Enable CUDA graphs for maximum performance | |
| "disable_custom_all_reduce": False, # Enable all optimizations | |
| } | |
| VLLM_CONFIG_HF = { | |
| "gpu_memory_utilization": 0.75, # Standard for 8B models | |
| "max_model_len": 32768, # Default 32K context (Llama 3.1 8B can use 128K) | |
| "tensor_parallel_size": 1, # Single GPU | |
| "pipeline_parallel_size": 1, # No pipeline parallelism | |
| "trust_remote_code": True, # Required for LinguaCustodia | |
| "dtype": "bfloat16", # L40 GPU optimization | |
| "enforce_eager": True, # Disable CUDA graphs (HF compatibility - conservative) | |
| "disable_custom_all_reduce": True, # Disable custom kernels (HF compatibility) | |
| "disable_log_stats": True, # Reduce logging overhead | |
| "max_num_batched_tokens": 32768, # Default batching | |
| } | |
| VLLM_CONFIG_SCW = { | |
| "gpu_memory_utilization": 0.85, # Aggressive for Scaleway L40S (40.8GB of 48GB) | |
| "max_model_len": 32768, # Default 32K context (model-specific) | |
| "tensor_parallel_size": 1, # Single GPU | |
| "pipeline_parallel_size": 1, # No pipeline parallelism | |
| "trust_remote_code": True, # Required for LinguaCustodia | |
| "dtype": "bfloat16", # L40S GPU optimization | |
| "enforce_eager": False, # Use CUDA graphs for maximum speed | |
| "disable_custom_all_reduce": False, # Enable all optimizations | |
| } | |
| # Backend Abstraction Layer | |
| class InferenceBackend: | |
| """Unified interface for all inference backends.""" | |
| def __init__(self, backend_type: str, model_config: dict): | |
| self.backend_type = backend_type | |
| self.model_config = model_config | |
| self.engine = None | |
| def load_model(self, model_id: str) -> bool: | |
| """Load model with platform-specific optimizations.""" | |
| raise NotImplementedError | |
| def run_inference(self, prompt: str, **kwargs) -> dict: | |
| """Run inference with consistent response format.""" | |
| raise NotImplementedError | |
| def get_memory_info(self) -> dict: | |
| """Get memory usage information.""" | |
| raise NotImplementedError | |
| def sleep(self) -> bool: | |
| """Put backend into sleep mode (for HuggingFace Spaces).""" | |
| raise NotImplementedError | |
| def wake(self) -> bool: | |
| """Wake up backend from sleep mode.""" | |
| raise NotImplementedError | |
| def cleanup(self) -> None: | |
| """Clean up resources.""" | |
| raise NotImplementedError | |
| class VLLMBackend(InferenceBackend): | |
| """vLLM implementation with platform-specific optimizations.""" | |
| def __init__(self, model_config: dict, platform: str = "huggingface"): | |
| super().__init__("vllm", model_config) | |
| self.platform = platform | |
| # Get model-specific configuration | |
| model_name = getattr(model_config, 'model_id', 'default') | |
| self.config = get_vllm_config_for_model(model_name, platform) | |
| logger.info(f"🔧 Using {platform}-optimized vLLM config for {model_name}") | |
| logger.info(f"📊 vLLM Config: {self.config}") | |
| def load_model(self, model_id: str) -> bool: | |
| """Load model with vLLM engine.""" | |
| try: | |
| from vllm import LLM | |
| import os | |
| logger.info(f"🚀 Initializing vLLM engine for {model_id}") | |
| logger.info(f"📊 vLLM Config: {self.config}") | |
| # Set HF token for vLLM authentication - try both LinguaCustodia tokens | |
| hf_token_lc = os.getenv("HF_TOKEN_LC") | |
| hf_token_lc2 = os.getenv("HF_TOKEN_LC2") | |
| # Clean tokens and choose the right one | |
| if hf_token_lc: | |
| hf_token_lc = hf_token_lc.strip() | |
| if hf_token_lc2: | |
| hf_token_lc2 = hf_token_lc2.strip() | |
| # Use HF_TOKEN_LC2 if available, fallback to HF_TOKEN_LC | |
| working_token = hf_token_lc2 if hf_token_lc2 else hf_token_lc | |
| if working_token: | |
| # Clear HuggingFace cache to force fresh download with correct token | |
| import shutil | |
| hf_cache_dir = os.path.expanduser("~/.cache/huggingface") | |
| hf_home_cache = os.path.join(os.getenv('HF_HOME', '/data/.huggingface'), 'hub') | |
| # Clear model cache for this specific model to force re-download | |
| model_cache_dirs = [ | |
| os.path.join(hf_cache_dir, 'hub'), | |
| hf_home_cache | |
| ] | |
| for cache_dir in model_cache_dirs: | |
| if os.path.exists(cache_dir): | |
| # Look for this specific model in cache and remove it | |
| model_cache_name = model_id.replace('/', '--') | |
| for item in os.listdir(cache_dir): | |
| if model_cache_name in item: | |
| item_path = os.path.join(cache_dir, item) | |
| if os.path.isdir(item_path): | |
| logger.info(f"🗑️ Clearing cached model: {item_path}") | |
| shutil.rmtree(item_path, ignore_errors=True) | |
| # Set working LinguaCustodia token as primary token | |
| os.environ["HF_TOKEN"] = working_token | |
| os.environ["HUGGING_FACE_HUB_TOKEN"] = working_token | |
| token_name = "HF_TOKEN_LC2" if working_token == hf_token_lc2 else "HF_TOKEN_LC" | |
| logger.info(f"🔑 Using {token_name} for LinguaCustodia model authentication") | |
| # Force re-authentication with the working LinguaCustodia token | |
| try: | |
| from huggingface_hub import login | |
| login(token=working_token, add_to_git_credential=False) | |
| logger.info(f"✅ Successfully authenticated with {token_name}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ {token_name} authentication failed: {e}") | |
| else: | |
| logger.warning("⚠️ No working LinguaCustodia token found - may fail for LinguaCustodia models") | |
| self.engine = LLM( | |
| model=model_id, | |
| **self.config | |
| ) | |
| logger.info("✅ vLLM engine initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ vLLM model loading failed: {e}") | |
| return False | |
| def run_inference(self, prompt: str, **kwargs) -> dict: | |
| """Run inference with vLLM engine.""" | |
| if not self.engine: | |
| return {"error": "vLLM engine not loaded", "success": False} | |
| try: | |
| from vllm import SamplingParams | |
| # Get stop tokens from kwargs or use model-specific defaults | |
| stop_tokens = kwargs.get('stop') | |
| if not stop_tokens and hasattr(self, 'model_config'): | |
| model_name = getattr(self.model_config, 'model_id', '') | |
| stop_tokens = get_stop_tokens_for_model(model_name) | |
| sampling_params = SamplingParams( | |
| temperature=kwargs.get('temperature', 0.6), | |
| max_tokens=kwargs.get('max_new_tokens', 512), # Increased default | |
| top_p=kwargs.get('top_p', 0.9), | |
| repetition_penalty=kwargs.get('repetition_penalty', 1.1), # Increased from 1.05 | |
| stop=stop_tokens # Add stop tokens | |
| ) | |
| outputs = self.engine.generate([prompt], sampling_params) | |
| response = outputs[0].outputs[0].text | |
| return { | |
| "response": response, | |
| "model_used": getattr(self.model_config, 'model_id', 'unknown'), | |
| "success": True, | |
| "backend": "vLLM", | |
| "tokens_generated": len(response.split()), | |
| "generation_params": { | |
| "temperature": sampling_params.temperature, | |
| "max_tokens": sampling_params.max_tokens, | |
| "top_p": sampling_params.top_p | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"vLLM inference error: {e}") | |
| return {"error": str(e), "success": False} | |
| def get_memory_info(self) -> dict: | |
| """Get vLLM memory information.""" | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| return { | |
| "gpu_available": True, | |
| "gpu_memory_allocated": torch.cuda.memory_allocated(), | |
| "gpu_memory_reserved": torch.cuda.memory_reserved(), | |
| "backend": "vLLM" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting vLLM memory info: {e}") | |
| return {"gpu_available": False, "backend": "vLLM"} | |
| def sleep(self) -> bool: | |
| """Put vLLM engine into sleep mode (for HuggingFace Spaces).""" | |
| try: | |
| if self.engine and hasattr(self.engine, 'sleep'): | |
| logger.info("😴 Putting vLLM engine to sleep...") | |
| self.engine.sleep() | |
| logger.info("✅ vLLM engine is now sleeping (GPU memory released)") | |
| return True | |
| else: | |
| logger.info("ℹ️ vLLM engine doesn't support sleep mode or not loaded") | |
| return False | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error putting vLLM to sleep (non-critical): {e}") | |
| return False | |
| def wake(self) -> bool: | |
| """Wake up vLLM engine from sleep mode.""" | |
| try: | |
| if self.engine and hasattr(self.engine, 'wake'): | |
| logger.info("🌅 Waking up vLLM engine...") | |
| self.engine.wake() | |
| logger.info("✅ vLLM engine is now awake") | |
| return True | |
| else: | |
| logger.info("ℹ️ vLLM engine doesn't support wake mode or not loaded") | |
| return False | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error waking up vLLM (non-critical): {e}") | |
| return False | |
| def cleanup(self) -> None: | |
| """Clean up vLLM resources gracefully.""" | |
| try: | |
| if self.engine: | |
| logger.info("🧹 Shutting down vLLM engine...") | |
| # vLLM engines don't have explicit shutdown methods, but we can clean up references | |
| del self.engine | |
| self.engine = None | |
| logger.info("✅ vLLM engine reference cleared") | |
| # Clear CUDA cache | |
| import torch | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| logger.info("✅ CUDA cache cleared") | |
| # Force garbage collection | |
| import gc | |
| gc.collect() | |
| logger.info("✅ Garbage collection completed") | |
| except Exception as e: | |
| logger.error(f"❌ Error during vLLM cleanup: {e}") | |
| class TransformersBackend(InferenceBackend): | |
| """Current Transformers implementation (fallback).""" | |
| def __init__(self, model_config: dict): | |
| super().__init__("transformers", model_config) | |
| def load_model(self, model_id: str) -> bool: | |
| """Load model with Transformers (current implementation).""" | |
| return load_linguacustodia_model() | |
| def run_inference(self, prompt: str, **kwargs) -> dict: | |
| """Run inference with Transformers pipeline.""" | |
| return run_inference(prompt, **kwargs) | |
| def get_memory_info(self) -> dict: | |
| """Get Transformers memory information.""" | |
| return get_gpu_memory_info() | |
| def sleep(self) -> bool: | |
| """Put Transformers backend into sleep mode.""" | |
| try: | |
| logger.info("😴 Transformers backend doesn't support sleep mode, cleaning up memory instead...") | |
| cleanup_model_memory() | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Error during Transformers sleep: {e}") | |
| return False | |
| def wake(self) -> bool: | |
| """Wake up Transformers backend from sleep mode.""" | |
| try: | |
| logger.info("🌅 Transformers backend wake - no action needed") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Error during Transformers wake: {e}") | |
| return False | |
| def cleanup(self) -> None: | |
| """Clean up Transformers resources.""" | |
| cleanup_model_memory() | |
| # Inline configuration functions | |
| def get_app_settings(): | |
| """Get application settings from environment variables.""" | |
| # Check if MODEL_NAME is set, if not use qwen3-8b as default | |
| model_name = os.getenv('MODEL_NAME') | |
| if not model_name or model_name not in MODEL_CONFIG: | |
| model_name = 'pro-finance-small' # Default to Pro Finance Small (8B) | |
| logger.info(f"Using default model: {model_name}") | |
| return type('Settings', (), { | |
| 'model_name': model_name, | |
| 'hf_token_lc': os.getenv('HF_TOKEN_LC'), | |
| 'hf_token': os.getenv('HF_TOKEN') | |
| })() | |
| def get_model_config(model_name: str): | |
| """Get model configuration.""" | |
| if model_name not in MODEL_CONFIG: | |
| raise ValueError(f"Model '{model_name}' not found") | |
| return type('ModelInfo', (), MODEL_CONFIG[model_name])() | |
| def get_linguacustodia_config(): | |
| """Get complete configuration.""" | |
| return type('Config', (), { | |
| 'models': MODEL_CONFIG, | |
| 'get_model_info': lambda name: type('ModelInfo', (), MODEL_CONFIG[name])(), | |
| 'list_models': lambda: MODEL_CONFIG | |
| })() | |
| def create_inference_backend() -> InferenceBackend: | |
| """Factory method for creating appropriate backend.""" | |
| # Environment detection | |
| deployment_env = os.getenv('DEPLOYMENT_ENV', 'huggingface') | |
| use_vllm = os.getenv('USE_VLLM', 'true').lower() == 'true' | |
| # Get model configuration | |
| settings = get_app_settings() | |
| model_config = get_model_config(settings.model_name) | |
| # Backend selection logic with platform-specific optimizations | |
| if use_vllm and deployment_env in ['huggingface', 'scaleway']: | |
| logger.info(f"🚀 Initializing vLLM backend for {deployment_env}") | |
| return VLLMBackend(model_config, platform=deployment_env) | |
| else: | |
| logger.info(f"🔄 Using Transformers backend for {deployment_env}") | |
| return TransformersBackend(model_config) | |
| # Global backend instance - will be initialized on startup | |
| inference_backend = None | |
| # Model loading state tracking | |
| model_loading_state = { | |
| "is_loading": False, | |
| "loading_model": None, | |
| "loading_progress": 0, | |
| "loading_status": "idle", | |
| "loading_start_time": None, | |
| "loading_error": None | |
| } | |
| def setup_storage(): | |
| """Setup storage configuration.""" | |
| hf_home = os.getenv('HF_HOME', '/data/.huggingface') | |
| os.environ['HF_HOME'] = hf_home | |
| return { | |
| 'hf_home': hf_home, | |
| 'persistent_storage': True, | |
| 'cache_dir_exists': True, | |
| 'cache_dir_writable': True | |
| } | |
| def update_loading_state(status: str, progress: int = 0, error: str = None): | |
| """Update the global loading state.""" | |
| global model_loading_state | |
| model_loading_state.update({ | |
| "loading_status": status, | |
| "loading_progress": progress, | |
| "loading_error": error | |
| }) | |
| if error: | |
| model_loading_state["is_loading"] = False | |
| def save_model_preference(model_name: str) -> bool: | |
| """Save model preference to persistent storage for restart.""" | |
| try: | |
| preference_file = "/data/.model_preference" | |
| os.makedirs("/data", exist_ok=True) | |
| with open(preference_file, 'w') as f: | |
| f.write(model_name) | |
| logger.info(f"✅ Saved model preference: {model_name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to save model preference: {e}") | |
| return False | |
| def load_model_preference() -> Optional[str]: | |
| """Load saved model preference from persistent storage.""" | |
| try: | |
| preference_file = "/data/.model_preference" | |
| if os.path.exists(preference_file): | |
| with open(preference_file, 'r') as f: | |
| model_name = f.read().strip() | |
| logger.info(f"✅ Loaded model preference: {model_name}") | |
| return model_name | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load model preference: {e}") | |
| return None | |
| async def trigger_service_restart(): | |
| """Trigger a graceful service restart for model switching.""" | |
| try: | |
| logger.info("🔄 Triggering graceful service restart for model switch...") | |
| # Give time for response to be sent | |
| await asyncio.sleep(2) | |
| # On HuggingFace Spaces, we can trigger a restart by exiting | |
| # The Space will automatically restart | |
| import sys | |
| sys.exit(0) | |
| except Exception as e: | |
| logger.error(f"❌ Error triggering restart: {e}") | |
| async def load_model_async(model_name: str, model_info: dict, new_model_config: dict): | |
| """ | |
| Model switching via service restart. | |
| vLLM doesn't support runtime model switching, so we save the preference | |
| and trigger a graceful restart. The new model will be loaded on startup. | |
| """ | |
| global model_loading_state | |
| try: | |
| # Update loading state | |
| model_loading_state.update({ | |
| "is_loading": True, | |
| "loading_model": model_name, | |
| "loading_progress": 10, | |
| "loading_status": "saving_preference", | |
| "loading_start_time": time.time(), | |
| "loading_error": None | |
| }) | |
| # Save the model preference to persistent storage | |
| logger.info(f"💾 Saving model preference: {model_name}") | |
| if not save_model_preference(model_name): | |
| update_loading_state("error", 0, "Failed to save model preference") | |
| return | |
| update_loading_state("preparing_restart", 50) | |
| logger.info(f"🔄 Model preference saved. Triggering service restart to load {model_info['display_name']}...") | |
| # Trigger graceful restart | |
| await trigger_service_restart() | |
| except Exception as e: | |
| logger.error(f"Error in model switching: {e}") | |
| update_loading_state("error", 0, str(e)) | |
| def load_linguacustodia_model(force_reload=False): | |
| """ | |
| Load the LinguaCustodia model with intelligent caching. | |
| Strategy: | |
| - If no model loaded: Load from cache if available, else download | |
| - If same model already loaded: Skip (use loaded model) | |
| - If different model requested: Clean memory, clean storage, then load new model | |
| """ | |
| global model, tokenizer, pipe, model_loaded, current_model_name | |
| try: | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from huggingface_hub import login | |
| import torch | |
| settings = get_app_settings() | |
| model_config = get_model_config(settings.model_name) | |
| requested_model_id = model_config.model_id | |
| # Case 1: Same model already loaded in memory - reuse it | |
| if model_loaded and current_model_name == requested_model_id and not force_reload: | |
| logger.info(f"✅ Model {model_config.display_name} already loaded in memory, reusing") | |
| return True | |
| # Case 2: Different model requested - clean everything first | |
| if model_loaded and current_model_name != requested_model_id: | |
| logger.info(f"🔄 Model switch detected: {current_model_name} → {requested_model_id}") | |
| logger.info(f"🧹 Cleaning memory and storage for model switch...") | |
| cleanup_model_memory() | |
| # Note: HuggingFace will automatically use cached model files if available | |
| # We only clean GPU memory, not disk cache | |
| # Case 3: Force reload requested | |
| if force_reload and model_loaded: | |
| logger.info(f"🔄 Force reload requested for {requested_model_id}") | |
| cleanup_model_memory() | |
| # Authenticate with HuggingFace | |
| login(token=settings.hf_token_lc, add_to_git_credential=False) | |
| logger.info(f"✅ Authenticated with HuggingFace") | |
| # Load model (will use cached files if available) | |
| logger.info(f"🚀 Loading model: {model_config.display_name}") | |
| logger.info(f"📦 Model ID: {requested_model_id}") | |
| logger.info(f"💾 Will use cached files from {os.getenv('HF_HOME', '~/.cache/huggingface')} if available") | |
| # Load tokenizer from cache or download | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| requested_model_id, | |
| token=settings.hf_token_lc, | |
| trust_remote_code=True | |
| ) | |
| logger.info(f"✅ Tokenizer loaded") | |
| # Load model from cache or download | |
| model = AutoModelForCausalLM.from_pretrained( | |
| requested_model_id, | |
| token=settings.hf_token_lc, | |
| dtype=torch.bfloat16, | |
| device_map="auto", | |
| trust_remote_code=True | |
| ) | |
| logger.info(f"✅ Model loaded") | |
| # Create inference pipeline | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| dtype=torch.bfloat16, | |
| device_map="auto" | |
| ) | |
| logger.info(f"✅ Pipeline created") | |
| # Update global state | |
| current_model_name = requested_model_id | |
| model_loaded = True | |
| logger.info(f"🎉 {model_config.display_name} ready for inference!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load model: {e}") | |
| cleanup_model_memory() | |
| return False | |
| def cleanup_model_memory(): | |
| """ | |
| Clean up model memory before loading a new model. | |
| This clears GPU memory but keeps disk cache intact for faster reloading. | |
| """ | |
| global model, tokenizer, pipe, model_loaded | |
| try: | |
| import torch | |
| import gc | |
| logger.info("🧹 Starting memory cleanup...") | |
| # Delete model objects from memory | |
| if pipe is not None: | |
| del pipe | |
| pipe = None | |
| logger.info(" ✓ Pipeline removed") | |
| if model is not None: | |
| del model | |
| model = None | |
| logger.info(" ✓ Model removed") | |
| if tokenizer is not None: | |
| del tokenizer | |
| tokenizer = None | |
| logger.info(" ✓ Tokenizer removed") | |
| model_loaded = False | |
| # Clear GPU cache if available | |
| if torch.cuda.is_available(): | |
| allocated_before = torch.cuda.memory_allocated() / (1024**3) | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| allocated_after = torch.cuda.memory_allocated() / (1024**3) | |
| freed = allocated_before - allocated_after | |
| logger.info(f" ✓ GPU cache cleared (freed ~{freed:.2f}GB)") | |
| # Force garbage collection | |
| gc.collect() | |
| logger.info(" ✓ Garbage collection completed") | |
| logger.info("✅ Memory cleanup completed successfully") | |
| logger.info("💾 Disk cache preserved for faster model loading") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error during memory cleanup: {e}") | |
| def run_inference(prompt: str, max_new_tokens: int = 150, temperature: float = 0.6): | |
| """Run inference with the loaded model.""" | |
| global pipe, model, tokenizer, model_loaded, current_model_name | |
| if not model_loaded or pipe is None: | |
| return { | |
| "response": "", | |
| "model_used": current_model_name, | |
| "success": False, | |
| "tokens_generated": 0, | |
| "generation_params": {}, | |
| "error": "Model not loaded" | |
| } | |
| try: | |
| # Update pipeline parameters | |
| pipe.max_new_tokens = max_new_tokens | |
| pipe.temperature = temperature | |
| # Generate response | |
| result = pipe(prompt) | |
| generated_text = result[0]['generated_text'] | |
| response_text = generated_text[len(prompt):].strip() | |
| tokens_generated = len(tokenizer.encode(response_text)) | |
| return { | |
| "response": response_text, | |
| "model_used": current_model_name, | |
| "success": True, | |
| "tokens_generated": tokens_generated, | |
| "generation_params": { | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": temperature, | |
| **GENERATION_CONFIG | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Inference error: {e}") | |
| return { | |
| "response": "", | |
| "model_used": current_model_name, | |
| "success": False, | |
| "tokens_generated": 0, | |
| "generation_params": {}, | |
| "error": str(e) | |
| } | |
| def get_gpu_memory_info(): | |
| """Get GPU memory information.""" | |
| try: | |
| import torch | |
| if not torch.cuda.is_available(): | |
| return {"gpu_available": False} | |
| allocated = torch.cuda.memory_allocated() | |
| reserved = torch.cuda.memory_reserved() | |
| total = torch.cuda.get_device_properties(0).total_memory | |
| return { | |
| "gpu_available": True, | |
| "gpu_name": torch.cuda.get_device_name(0), | |
| "gpu_memory_allocated": f"{allocated / (1024**3):.2f}GB", | |
| "gpu_memory_reserved": f"{reserved / (1024**3):.2f}GB", | |
| "gpu_memory_total": f"{total / (1024**3):.2f}GB" | |
| } | |
| except Exception as e: | |
| return {"gpu_available": False, "error": str(e)} | |
| async def startup_event(): | |
| """Initialize the application on startup.""" | |
| global storage_info, inference_backend | |
| logger.info(f"🚀 Starting LinguaCustodia API - {ARCHITECTURE} v24.1.0 (vLLM Ready)...") | |
| # Setup storage first and store globally | |
| storage_info = setup_storage() | |
| logger.info(f"📊 Storage configuration: {storage_info}") | |
| # Initialize backend | |
| inference_backend = create_inference_backend() | |
| logger.info(f"🔧 Backend initialized: {inference_backend.backend_type}") | |
| # Check for saved model preference (from restart-based model switching) | |
| saved_preference = load_model_preference() | |
| if saved_preference: | |
| logger.info(f"🔄 Found saved model preference: {saved_preference}") | |
| model_name = saved_preference | |
| else: | |
| # Use default from environment or settings | |
| settings = get_app_settings() | |
| model_name = settings.model_name | |
| logger.info(f"📋 Using default model: {model_name}") | |
| # Load the selected model | |
| model_config = get_model_config(model_name) | |
| success = inference_backend.load_model(model_config.model_id) | |
| if success: | |
| logger.info(f"✅ Model loaded successfully on startup using {inference_backend.backend_type} backend") | |
| # For vLLM backend, check if we need to wake up from sleep | |
| if inference_backend.backend_type == "vllm": | |
| logger.info("🌅 Checking if vLLM needs to wake up from sleep...") | |
| try: | |
| wake_success = inference_backend.wake() | |
| if wake_success: | |
| logger.info("✅ vLLM wake-up successful") | |
| else: | |
| logger.info("ℹ️ vLLM wake-up not needed (fresh startup)") | |
| except Exception as e: | |
| logger.info(f"ℹ️ vLLM wake-up check completed (normal on fresh startup): {e}") | |
| else: | |
| logger.error("❌ Failed to load model on startup") | |
| async def shutdown_event(): | |
| """Gracefully shutdown the application.""" | |
| global inference_backend | |
| logger.info("🛑 Starting graceful shutdown...") | |
| try: | |
| if inference_backend: | |
| logger.info(f"🧹 Cleaning up {inference_backend.backend_type} backend...") | |
| inference_backend.cleanup() | |
| logger.info("✅ Backend cleanup completed") | |
| # Additional cleanup for global variables | |
| cleanup_model_memory() | |
| logger.info("✅ Global memory cleanup completed") | |
| logger.info("✅ Graceful shutdown completed successfully") | |
| except Exception as e: | |
| logger.error(f"❌ Error during shutdown: {e}") | |
| # Don't raise the exception to avoid preventing shutdown | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| global storage_info, inference_backend, model_loading_state | |
| if inference_backend is None: | |
| return HealthResponse( | |
| status="starting", | |
| model_loaded=False, | |
| current_model="unknown", | |
| gpu_available=False, | |
| memory_usage=None, | |
| storage_info=storage_info, | |
| architecture=f"{ARCHITECTURE} + INITIALIZING", | |
| loading_status=model_loading_state | |
| ) | |
| memory_info = inference_backend.get_memory_info() | |
| return HealthResponse( | |
| status="healthy" if inference_backend.engine else "model_not_loaded", | |
| model_loaded=inference_backend.engine is not None, | |
| current_model=getattr(inference_backend.model_config, 'model_id', 'unknown'), | |
| gpu_available=memory_info.get("gpu_available", False), | |
| memory_usage=memory_info if memory_info.get("gpu_available") else None, | |
| storage_info=storage_info, | |
| architecture=f"{ARCHITECTURE} + {inference_backend.backend_type.upper()}", | |
| loading_status=model_loading_state | |
| ) | |
| async def test_model_configs(): | |
| """Test endpoint to verify actual model configurations from HuggingFace Hub.""" | |
| import requests | |
| import os | |
| models_to_test = [ | |
| "LinguaCustodia/Llama-Pro-Finance-Large", | |
| "LinguaCustodia/LLM-Pro-Finance-Medium", | |
| "LinguaCustodia/LLM-Pro-Finance-Small", | |
| "LinguaCustodia/LLM-Pro-Finance-Mini", | |
| "LinguaCustodia/Llama-Pro-Finance-Mini" | |
| ] | |
| results = {} | |
| # Get LinguaCustodia token for authentication | |
| hf_token_lc = os.getenv("HF_TOKEN_LC") | |
| headers = {} | |
| if hf_token_lc: | |
| headers["Authorization"] = f"Bearer {hf_token_lc}" | |
| # Debug: Check if LinguaCustodia token is available | |
| token_available = bool(hf_token_lc) | |
| token_length = len(hf_token_lc) if hf_token_lc else 0 | |
| for model_name in models_to_test: | |
| try: | |
| # Use HuggingFace Hub library to get model config | |
| from huggingface_hub import hf_hub_download | |
| import json | |
| # Download the config.json file using the LinguaCustodia token | |
| config_path = hf_hub_download( | |
| repo_id=model_name, | |
| filename="config.json", | |
| token=hf_token_lc if hf_token_lc else None | |
| ) | |
| # Read the config file | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| # Extract context length | |
| context_length = None | |
| context_params = [ | |
| "max_position_embeddings", | |
| "n_positions", | |
| "max_sequence_length", | |
| "context_length", | |
| "max_context_length" | |
| ] | |
| for param in context_params: | |
| if param in config: | |
| value = config[param] | |
| if isinstance(value, dict) and "max_position_embeddings" in value: | |
| context_length = value["max_position_embeddings"] | |
| elif isinstance(value, int): | |
| context_length = value | |
| break | |
| results[model_name] = { | |
| "context_length": context_length, | |
| "model_type": config.get("model_type", "unknown"), | |
| "architectures": config.get("architectures", []), | |
| "config_available": True | |
| } | |
| except Exception as e: | |
| results[model_name] = { | |
| "context_length": None, | |
| "config_available": False, | |
| "error": str(e) | |
| } | |
| return { | |
| "test_results": results, | |
| "expected_contexts": { | |
| "LinguaCustodia/Llama-Pro-Finance-Large": "TBD", | |
| "LinguaCustodia/LLM-Pro-Finance-Medium": "TBD", | |
| "LinguaCustodia/LLM-Pro-Finance-Small": "TBD", | |
| "LinguaCustodia/LLM-Pro-Finance-Mini": "TBD", | |
| "LinguaCustodia/Llama-Pro-Finance-Mini": "TBD" | |
| }, | |
| "debug_info": { | |
| "token_available": token_available, | |
| "token_length": token_length, | |
| "headers_sent": bool(headers.get("Authorization")) | |
| } | |
| } | |
| async def debug_env(): | |
| """Debug endpoint to check environment variables.""" | |
| import os | |
| env_vars = {} | |
| for key, value in os.environ.items(): | |
| if "HF" in key or "TOKEN" in key: | |
| # Mask sensitive values | |
| if "TOKEN" in key or "SECRET" in key: | |
| env_vars[key] = f"{value[:10]}..." if value else "None" | |
| else: | |
| env_vars[key] = value | |
| return {"environment_variables": env_vars} | |
| async def debug_token_test(): | |
| """Test both HF_TOKEN_LC and HF_TOKEN_LC2 to see which can access v1.0 models.""" | |
| import requests | |
| import os | |
| # Get both tokens and clean them | |
| hf_token_lc = os.getenv("HF_TOKEN_LC") | |
| hf_token_lc2 = os.getenv("HF_TOKEN_LC2") | |
| # Clean tokens (remove whitespace/newlines) | |
| if hf_token_lc: | |
| hf_token_lc = hf_token_lc.strip() | |
| if hf_token_lc2: | |
| hf_token_lc2 = hf_token_lc2.strip() | |
| if not hf_token_lc and not hf_token_lc2: | |
| return {"error": "No HF_TOKEN_LC or HF_TOKEN_LC2 found"} | |
| test_model = "LinguaCustodia/qwen3-8b-fin-v1.0" | |
| results = {} | |
| # Test both tokens | |
| tokens_to_test = {} | |
| if hf_token_lc: | |
| tokens_to_test["HF_TOKEN_LC"] = hf_token_lc | |
| if hf_token_lc2: | |
| tokens_to_test["HF_TOKEN_LC2"] = hf_token_lc2 | |
| for token_name, token_value in tokens_to_test.items(): | |
| headers = {"Authorization": f"Bearer {token_value}"} | |
| token_results = {} | |
| try: | |
| # Test 1: HuggingFace Hub API | |
| api_url = f"https://huggingface.co/api/models/{test_model}" | |
| response1 = requests.get(api_url, headers=headers, timeout=30) | |
| token_results["hub_api"] = { | |
| "status_code": response1.status_code, | |
| "response": response1.text[:200] | |
| } | |
| # Test 2: Try the collection API | |
| collection_url = "https://huggingface.co/api/collections/LinguaCustodia/llm-pro-finance-suite-68de414b003d9078f76e43fc" | |
| response2 = requests.get(collection_url, headers=headers, timeout=30) | |
| token_results["collection_api"] = { | |
| "status_code": response2.status_code, | |
| "response": response2.text[:200] | |
| } | |
| # Test 3: Try raw config access | |
| config_url = f"https://huggingface.co/{test_model}/raw/main/config.json" | |
| response3 = requests.get(config_url, headers=headers, timeout=30) | |
| token_results["raw_config"] = { | |
| "status_code": response3.status_code, | |
| "response": response3.text[:200] | |
| } | |
| results[token_name] = { | |
| "token_length": len(token_value), | |
| "headers_sent": headers.get("Authorization", "")[:20] + "...", | |
| "test_results": token_results, | |
| "success": any(r["status_code"] == 200 for r in token_results.values()) | |
| } | |
| except Exception as e: | |
| results[token_name] = { | |
| "token_length": len(token_value), | |
| "error": str(e), | |
| "success": False | |
| } | |
| return { | |
| "model": test_model, | |
| "token_comparison": results, | |
| "working_token": next((name for name, data in results.items() if data.get("success")), None) | |
| } | |
| async def backend_info(): | |
| """Get backend information.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| return { | |
| "backend_type": "initializing", | |
| "model_loaded": False, | |
| "current_model": "unknown", | |
| "vllm_config": None, | |
| "memory_info": {"gpu_available": False} | |
| } | |
| vllm_config = None | |
| if inference_backend.backend_type == "vllm": | |
| if hasattr(inference_backend, 'platform'): | |
| vllm_config = VLLM_CONFIG_HF if inference_backend.platform == "huggingface" else VLLM_CONFIG_SCW | |
| else: | |
| vllm_config = VLLM_CONFIG_HF # fallback | |
| return { | |
| "backend_type": inference_backend.backend_type, | |
| "model_loaded": inference_backend.engine is not None, | |
| "current_model": getattr(inference_backend.model_config, 'model_id', 'unknown'), | |
| "platform": getattr(inference_backend, 'platform', 'unknown'), | |
| "vllm_config": vllm_config, | |
| "memory_info": inference_backend.get_memory_info() | |
| } | |
| async def root(): | |
| """Root endpoint with API information.""" | |
| global storage_info | |
| try: | |
| settings = get_app_settings() | |
| model_config = get_model_config(settings.model_name) | |
| return { | |
| "message": f"LinguaCustodia Financial AI API - {ARCHITECTURE}", | |
| "version": "23.0.0", | |
| "status": "running", | |
| "model_loaded": model_loaded, | |
| "current_model": settings.model_name, | |
| "current_model_info": { | |
| "display_name": model_config.display_name, | |
| "model_id": model_config.model_id, | |
| "architecture": model_config.architecture, | |
| "parameters": model_config.parameters, | |
| "memory_gb": model_config.memory_gb, | |
| "vram_gb": model_config.vram_gb, | |
| "vocab_size": model_config.vocab_size, | |
| "eos_token_id": model_config.eos_token_id | |
| }, | |
| "endpoints": { | |
| "health": "/health", | |
| "inference": "/inference", | |
| "models": "/models", | |
| "load-model": "/load-model", | |
| "docs": "/docs", | |
| "diagnose": "/diagnose" | |
| }, | |
| "storage_info": storage_info, | |
| "architecture": ARCHITECTURE | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in root endpoint: {e}") | |
| return { | |
| "message": f"LinguaCustodia Financial AI API - {ARCHITECTURE}", | |
| "version": "23.0.0", | |
| "status": "running", | |
| "model_loaded": model_loaded, | |
| "current_model": current_model_name, | |
| "error": str(e), | |
| "storage_info": storage_info, | |
| "architecture": ARCHITECTURE | |
| } | |
| async def list_models(): | |
| """List all available models and their configurations.""" | |
| try: | |
| settings = get_app_settings() | |
| model_config = get_model_config(settings.model_name) | |
| # Build simplified model info for all models | |
| all_models = {} | |
| for model_name, model_data in MODEL_CONFIG.items(): | |
| all_models[model_name] = { | |
| "display_name": model_data["display_name"], | |
| "model_id": model_data["model_id"], | |
| "architecture": model_data["architecture"], | |
| "parameters": model_data["parameters"], | |
| "memory_gb": model_data["memory_gb"], | |
| "vram_gb": model_data["vram_gb"] | |
| } | |
| return { | |
| "current_model": settings.model_name, | |
| "current_model_info": { | |
| "display_name": model_config.display_name, | |
| "model_id": model_config.model_id, | |
| "architecture": model_config.architecture, | |
| "parameters": model_config.parameters, | |
| "memory_gb": model_config.memory_gb, | |
| "vram_gb": model_config.vram_gb, | |
| "vocab_size": model_config.vocab_size, | |
| "eos_token_id": model_config.eos_token_id | |
| }, | |
| "available_models": all_models, | |
| "total_models": len(MODEL_CONFIG) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing models: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error listing models: {e}") | |
| async def inference(request: InferenceRequest): | |
| """Run inference with the loaded model using backend abstraction.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.") | |
| try: | |
| # Use the global inference backend | |
| result = inference_backend.run_inference( | |
| prompt=request.prompt, | |
| max_new_tokens=request.max_new_tokens, | |
| temperature=request.temperature | |
| ) | |
| if not result["success"]: | |
| raise HTTPException(status_code=500, detail=result.get("error", "Inference failed")) | |
| return InferenceResponse( | |
| response=result["response"], | |
| model_used=result["model_used"], | |
| success=result["success"], | |
| tokens_generated=result.get("tokens_generated", 0), | |
| generation_params=result.get("generation_params", {}) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Inference error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def load_model(model_name: str): | |
| """Load a specific model by name (async with progress tracking).""" | |
| global inference_backend, model_loading_state | |
| try: | |
| # Check if already loading | |
| if model_loading_state["is_loading"]: | |
| return { | |
| "message": f"Model loading already in progress: {model_loading_state['loading_model']}", | |
| "loading_status": model_loading_state["loading_status"], | |
| "loading_progress": model_loading_state["loading_progress"], | |
| "status": "loading" | |
| } | |
| # Validate model name | |
| if model_name not in MODEL_CONFIG: | |
| available_models = list(MODEL_CONFIG.keys()) | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Model '{model_name}' not found. Available models: {available_models}" | |
| ) | |
| # Set the model name in environment | |
| os.environ['MODEL_NAME'] = model_name | |
| # Get new model configuration | |
| model_info = MODEL_CONFIG[model_name] | |
| new_model_config = get_model_config(model_name) | |
| # Start async model switching (via restart) | |
| asyncio.create_task(load_model_async(model_name, model_info, new_model_config)) | |
| return { | |
| "message": f"Model switch to '{model_info['display_name']}' initiated. Service will restart to load the new model.", | |
| "model_name": model_name, | |
| "model_id": model_info["model_id"], | |
| "display_name": model_info["display_name"], | |
| "backend_type": inference_backend.backend_type, | |
| "status": "restart_initiated", | |
| "loading_status": "saving_preference", | |
| "loading_progress": 10, | |
| "note": "vLLM doesn't support runtime model switching. The service will restart with the new model." | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error starting model loading: {e}") | |
| raise HTTPException(status_code=500, detail=f"Error starting model loading: {e}") | |
| async def get_loading_status(): | |
| """Get current model loading status and progress.""" | |
| global model_loading_state | |
| # Calculate elapsed time if loading | |
| elapsed_time = None | |
| if model_loading_state["loading_start_time"]: | |
| elapsed_time = time.time() - model_loading_state["loading_start_time"] | |
| return { | |
| "is_loading": model_loading_state["is_loading"], | |
| "loading_model": model_loading_state["loading_model"], | |
| "loading_progress": model_loading_state["loading_progress"], | |
| "loading_status": model_loading_state["loading_status"], | |
| "loading_error": model_loading_state["loading_error"], | |
| "elapsed_time_seconds": elapsed_time, | |
| "estimated_time_remaining": None # Could be calculated based on model size | |
| } | |
| async def cleanup_storage(): | |
| """Clean up persistent storage (admin endpoint).""" | |
| try: | |
| import shutil | |
| if os.path.exists('/data'): | |
| shutil.rmtree('/data') | |
| os.makedirs('/data', exist_ok=True) | |
| return {"message": "Storage cleaned successfully", "status": "success"} | |
| else: | |
| return {"message": "No persistent storage found", "status": "info"} | |
| except Exception as e: | |
| logger.error(f"Storage cleanup error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def put_to_sleep(): | |
| """Put the backend into sleep mode (for HuggingFace Spaces).""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend not initialized") | |
| try: | |
| success = inference_backend.sleep() | |
| if success: | |
| return { | |
| "message": "Backend put to sleep successfully", | |
| "status": "sleeping", | |
| "backend": inference_backend.backend_type, | |
| "note": "GPU memory released, ready for HuggingFace Space sleep" | |
| } | |
| else: | |
| return { | |
| "message": "Sleep mode not supported or failed", | |
| "status": "error", | |
| "backend": inference_backend.backend_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error putting backend to sleep: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def wake_up(): | |
| """Wake up the backend from sleep mode.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend not initialized") | |
| try: | |
| success = inference_backend.wake() | |
| if success: | |
| return { | |
| "message": "Backend woken up successfully", | |
| "status": "awake", | |
| "backend": inference_backend.backend_type, | |
| "note": "Ready for inference" | |
| } | |
| else: | |
| return { | |
| "message": "Wake mode not supported or failed", | |
| "status": "error", | |
| "backend": inference_backend.backend_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error waking up backend: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def diagnose(): | |
| """Diagnose system status and configuration.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| return { | |
| "python_version": sys.version, | |
| "architecture": ARCHITECTURE, | |
| "model_loaded": False, | |
| "current_model": "unknown", | |
| "backend_type": "initializing", | |
| "available_models": list(MODEL_CONFIG.keys()), | |
| "storage_info": storage_info, | |
| "gpu_info": {"gpu_available": False} | |
| } | |
| return { | |
| "python_version": sys.version, | |
| "architecture": ARCHITECTURE, | |
| "model_loaded": inference_backend.engine is not None, | |
| "current_model": getattr(inference_backend.model_config, 'model_id', 'unknown'), | |
| "backend_type": inference_backend.backend_type, | |
| "available_models": list(MODEL_CONFIG.keys()), | |
| "storage_info": storage_info, | |
| "gpu_info": inference_backend.get_memory_info() | |
| } | |
| # OpenAI-Compatible Endpoints - Helper Functions | |
| def get_stop_tokens_for_model(model_name: str) -> List[str]: | |
| """Get model-specific stop tokens to prevent hallucinations.""" | |
| model_stops = { | |
| "llama3.1-8b": ["<|end_of_text|>", "<|eot_id|>", "<|endoftext|>", "\nUser:", "\nAssistant:", "\nSystem:"], | |
| "qwen": ["<|im_end|>", "<|endoftext|>", "</s>", "\nUser:", "\nAssistant:", "\nSystem:"], | |
| "gemma": ["<end_of_turn>", "<eos>", "</s>", "\nUser:", "\nAssistant:", "\nSystem:"], | |
| } | |
| model_lower = model_name.lower() | |
| for key in model_stops: | |
| if key in model_lower: | |
| return model_stops[key] | |
| # Default comprehensive stop list | |
| return ["<|endoftext|>", "</s>", "<eos>", "\nUser:", "\nAssistant:", "\nSystem:"] | |
| def count_tokens_in_messages(messages: List[Dict[str, str]], model_name: str) -> int: | |
| """Count total tokens in a list of messages.""" | |
| try: | |
| from transformers import AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(f"LinguaCustodia/{model_name}") | |
| total_tokens = 0 | |
| for message in messages: | |
| content = message.get('content', '') | |
| total_tokens += len(tokenizer.encode(content)) | |
| return total_tokens | |
| except Exception: | |
| # Fallback: rough estimation (4 chars per token) | |
| total_chars = sum(len(msg.get('content', '')) for msg in messages) | |
| return total_chars // 4 | |
| def manage_chat_context(messages: List[Dict[str, str]], model_name: str, max_context_tokens: int = 3800) -> List[Dict[str, str]]: | |
| """Manage chat context to stay within token limits.""" | |
| # Count total tokens | |
| total_tokens = count_tokens_in_messages(messages, model_name) | |
| # If under limit, return as-is (no truncation needed) | |
| if total_tokens <= max_context_tokens: | |
| return messages | |
| # Only truncate if we're significantly over the limit | |
| # This prevents unnecessary truncation for small overages | |
| if total_tokens <= max_context_tokens + 200: # Allow 200 token buffer | |
| return messages | |
| # Strategy: Keep system message + recent messages | |
| system_msg = messages[0] if messages and messages[0].get('role') == 'system' else None | |
| recent_messages = messages[1:] if system_msg else messages | |
| # Keep only recent messages that fit | |
| result = [] | |
| if system_msg: | |
| result.append(system_msg) | |
| current_tokens = count_tokens_in_messages([system_msg] if system_msg else [], model_name) | |
| for message in reversed(recent_messages): | |
| message_tokens = count_tokens_in_messages([message], model_name) | |
| if current_tokens + message_tokens > max_context_tokens: | |
| break | |
| result.insert(1 if system_msg else 0, message) | |
| current_tokens += message_tokens | |
| # Add context truncation notice if we had to truncate | |
| if len(result) < len(messages): | |
| truncation_notice = { | |
| "role": "system", | |
| "content": f"[Context truncated: {len(messages) - len(result)} messages removed to fit token limit]" | |
| } | |
| result.insert(1 if system_msg else 0, truncation_notice) | |
| return result | |
| def format_chat_messages(messages: List[Dict[str, str]], model_name: str) -> str: | |
| """Format chat messages with proper template to prevent hallucinations.""" | |
| # Better prompt formatting for different models | |
| if "llama3.1" in model_name.lower(): | |
| # Llama 3.1 chat format | |
| prompt = "<|begin_of_text|>" | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "system": | |
| prompt += f"<|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>" | |
| elif role == "user": | |
| prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>" | |
| elif role == "assistant": | |
| prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>" | |
| prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" | |
| return prompt | |
| elif "qwen" in model_name.lower(): | |
| # Qwen chat format | |
| prompt = "" | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "system": | |
| prompt += f"<|im_start|>system\n{content}<|im_end|>\n" | |
| elif role == "user": | |
| prompt += f"<|im_start|>user\n{content}<|im_end|>\n" | |
| elif role == "assistant": | |
| prompt += f"<|im_start|>assistant\n{content}<|im_end|>\n" | |
| prompt += "<|im_start|>assistant\n" | |
| return prompt | |
| elif "gemma" in model_name.lower(): | |
| # Gemma chat format | |
| prompt = "<bos>" | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "user": | |
| prompt += f"<start_of_turn>user\n{content}<end_of_turn>\n" | |
| elif role == "assistant": | |
| prompt += f"<start_of_turn>model\n{content}<end_of_turn>\n" | |
| prompt += "<start_of_turn>model\n" | |
| return prompt | |
| else: | |
| # Fallback: Simple format but with clear delimiters | |
| prompt = "" | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| prompt += f"### {role.capitalize()}\n{content}\n\n" | |
| prompt += "### Assistant\n" | |
| return prompt | |
| async def stream_chat_completion(prompt: str, model: str, temperature: float, max_tokens: int, request_id: str): | |
| """Generator for streaming chat completions with TRUE delta streaming.""" | |
| try: | |
| from vllm import SamplingParams | |
| # Get model-specific stop tokens | |
| stop_tokens = get_stop_tokens_for_model(model) | |
| # Create sampling params with stop tokens | |
| sampling_params = SamplingParams( | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=0.9, | |
| repetition_penalty=1.1, # Increased from 1.05 to prevent repetition | |
| stop=stop_tokens # Add stop tokens to prevent hallucinations | |
| ) | |
| # Track previous text to send only deltas | |
| previous_text = "" | |
| # Stream from vLLM | |
| for output in inference_backend.engine.generate([prompt], sampling_params, use_tqdm=False): | |
| if output.outputs: | |
| current_text = output.outputs[0].text | |
| # Calculate delta (only NEW text since last iteration) | |
| if len(current_text) > len(previous_text): | |
| new_text = current_text[len(previous_text):] | |
| # Format as OpenAI SSE chunk with TRUE delta | |
| chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": new_text}, # Only send NEW text | |
| "finish_reason": None | |
| }] | |
| } | |
| yield f"data: {json.dumps(chunk)}\n\n" | |
| previous_text = current_text | |
| # Send final chunk | |
| final_chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {}, | |
| "finish_reason": "stop" | |
| }] | |
| } | |
| yield f"data: {json.dumps(final_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}") | |
| error_chunk = {"error": str(e)} | |
| yield f"data: {json.dumps(error_chunk)}\n\n" | |
| async def openai_chat_completions(request: dict): | |
| """OpenAI-compatible chat completions endpoint with streaming support.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.") | |
| try: | |
| # Extract messages and parameters | |
| messages = request.get("messages", []) | |
| model = request.get("model", "linguacustodia") | |
| temperature = request.get("temperature", 0.6) | |
| max_tokens = request.get("max_tokens", 512) # Increased from 150 for better responses | |
| stream = request.get("stream", False) | |
| # Manage chat context to stay within token limits | |
| managed_messages = manage_chat_context(messages, model, max_context_tokens=3800) | |
| # Convert messages to prompt using proper chat template | |
| prompt = format_chat_messages(managed_messages, model) | |
| # Generate request ID | |
| request_id = f"chatcmpl-{hash(prompt) % 10000000000}" | |
| # Handle streaming | |
| if stream and inference_backend.backend_type == "vllm": | |
| return StreamingResponse( | |
| stream_chat_completion(prompt, model, temperature, max_tokens, request_id), | |
| media_type="text/event-stream" | |
| ) | |
| # Non-streaming response | |
| stop_tokens = get_stop_tokens_for_model(model) | |
| result = inference_backend.run_inference( | |
| prompt=prompt, | |
| temperature=temperature, | |
| max_new_tokens=max_tokens, | |
| stop=stop_tokens, | |
| repetition_penalty=1.1 | |
| ) | |
| if not result["success"]: | |
| raise HTTPException(status_code=500, detail=result.get("error", "Inference failed")) | |
| # Format OpenAI response | |
| response = { | |
| "id": request_id, | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": result["response"] | |
| }, | |
| "finish_reason": "stop" | |
| }], | |
| "usage": { | |
| "prompt_tokens": len(prompt.split()), | |
| "completion_tokens": result.get("tokens_generated", 0), | |
| "total_tokens": len(prompt.split()) + result.get("tokens_generated", 0) | |
| } | |
| } | |
| return response | |
| except Exception as e: | |
| logger.error(f"OpenAI chat completions error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def openai_completions(request: dict): | |
| """OpenAI-compatible completions endpoint.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.") | |
| try: | |
| # Extract parameters | |
| prompt = request.get("prompt", "") | |
| model = request.get("model", "linguacustodia") | |
| temperature = request.get("temperature", 0.6) | |
| max_tokens = request.get("max_tokens", 150) | |
| # Run inference | |
| result = inference_backend.run_inference( | |
| prompt=prompt, | |
| temperature=temperature, | |
| max_new_tokens=max_tokens | |
| ) | |
| if not result["success"]: | |
| raise HTTPException(status_code=500, detail=result.get("error", "Inference failed")) | |
| # Format OpenAI response | |
| response = { | |
| "id": f"cmpl-{hash(prompt) % 10000000000}", | |
| "object": "text_completion", | |
| "created": int(__import__("time").time()), | |
| "model": model, | |
| "choices": [{ | |
| "text": result["response"], | |
| "index": 0, | |
| "finish_reason": "stop" | |
| }], | |
| "usage": { | |
| "prompt_tokens": len(prompt.split()), | |
| "completion_tokens": result.get("tokens_generated", 0), | |
| "total_tokens": len(prompt.split()) + result.get("tokens_generated", 0) | |
| } | |
| } | |
| return response | |
| except Exception as e: | |
| logger.error(f"OpenAI completions error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def openai_models(): | |
| """OpenAI-compatible models endpoint.""" | |
| try: | |
| models = [] | |
| for model_name, config in MODEL_CONFIG.items(): | |
| models.append({ | |
| "id": config["model_id"], | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "linguacustodia", | |
| "permission": [], | |
| "root": config["model_id"], | |
| "parent": None | |
| }) | |
| return { | |
| "object": "list", | |
| "data": models | |
| } | |
| except Exception as e: | |
| logger.error(f"OpenAI models error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Analytics Endpoints | |
| async def analytics_performance(): | |
| """Get performance analytics for the inference backend.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend not initialized") | |
| try: | |
| memory_info = inference_backend.get_memory_info() | |
| # Calculate performance metrics | |
| if memory_info.get("gpu_available"): | |
| gpu_allocated = memory_info.get("gpu_memory_allocated", 0) | |
| gpu_reserved = memory_info.get("gpu_memory_reserved", 0) | |
| gpu_utilization = (gpu_allocated / gpu_reserved * 100) if gpu_reserved > 0 else 0 | |
| else: | |
| gpu_utilization = 0 | |
| return { | |
| "backend": inference_backend.backend_type, | |
| "model": getattr(inference_backend.model_config, 'model_id', 'unknown'), | |
| "gpu_utilization_percent": round(gpu_utilization, 2), | |
| "memory": { | |
| "gpu_allocated_gb": round(memory_info.get("gpu_memory_allocated", 0) / (1024**3), 2), | |
| "gpu_reserved_gb": round(memory_info.get("gpu_memory_reserved", 0) / (1024**3), 2), | |
| "gpu_available": memory_info.get("gpu_available", False) | |
| }, | |
| "platform": { | |
| "deployment": os.getenv('DEPLOYMENT_ENV', 'huggingface'), | |
| "hardware": "L40 GPU (48GB VRAM)" if memory_info.get("gpu_available") else "CPU" | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Performance analytics error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def analytics_costs(): | |
| """Get token cost analytics based on LinguaCustodia pricing.""" | |
| # LinguaCustodia token pricing (estimated based on model size and hardware) | |
| COST_PER_1K_INPUT_TOKENS = 0.0001 # $0.0001 per 1K input tokens | |
| COST_PER_1K_OUTPUT_TOKENS = 0.0003 # $0.0003 per 1K output tokens | |
| # Hardware costs | |
| L40_HOURLY_COST = 1.80 # $1.80/hour for L40 GPU on HuggingFace | |
| return { | |
| "pricing": { | |
| "model": "LinguaCustodia Financial Models", | |
| "input_tokens": { | |
| "cost_per_1k": COST_PER_1K_INPUT_TOKENS, | |
| "currency": "USD" | |
| }, | |
| "output_tokens": { | |
| "cost_per_1k": COST_PER_1K_OUTPUT_TOKENS, | |
| "currency": "USD" | |
| } | |
| }, | |
| "hardware": { | |
| "type": "L40 GPU (48GB VRAM)", | |
| "cost_per_hour": L40_HOURLY_COST, | |
| "cost_per_day": round(L40_HOURLY_COST * 24, 2), | |
| "cost_per_month": round(L40_HOURLY_COST * 24 * 30, 2), | |
| "currency": "USD" | |
| }, | |
| "examples": { | |
| "100k_tokens_input": f"${round(COST_PER_1K_INPUT_TOKENS * 100, 4)}", | |
| "100k_tokens_output": f"${round(COST_PER_1K_OUTPUT_TOKENS * 100, 4)}", | |
| "1m_tokens_total": f"${round((COST_PER_1K_INPUT_TOKENS + COST_PER_1K_OUTPUT_TOKENS) * 500, 2)}" | |
| }, | |
| "note": "Costs are estimates. Actual costs may vary based on usage patterns and model selection." | |
| } | |
| async def analytics_usage(): | |
| """Get usage statistics for the API.""" | |
| global inference_backend | |
| if inference_backend is None: | |
| raise HTTPException(status_code=503, detail="Backend not initialized") | |
| try: | |
| memory_info = inference_backend.get_memory_info() | |
| # Get current model info | |
| model_config = inference_backend.model_config | |
| model_id = getattr(model_config, 'model_id', 'unknown') | |
| return { | |
| "current_session": { | |
| "model_loaded": inference_backend.engine is not None, | |
| "model_id": model_id, | |
| "backend": inference_backend.backend_type, | |
| "uptime_status": "running" | |
| }, | |
| "capabilities": { | |
| "streaming": inference_backend.backend_type == "vllm", | |
| "openai_compatible": True, | |
| "max_context_length": 2048 if inference_backend.backend_type == "vllm" else 4096, | |
| "supported_endpoints": [ | |
| "/v1/chat/completions", | |
| "/v1/completions", | |
| "/v1/models" | |
| ] | |
| }, | |
| "performance": { | |
| "gpu_available": memory_info.get("gpu_available", False), | |
| "backend_optimizations": "vLLM with eager mode" if inference_backend.backend_type == "vllm" else "Transformers" | |
| }, | |
| "note": "This API provides real-time access to LinguaCustodia financial AI models with OpenAI-compatible interface." | |
| } | |
| except Exception as e: | |
| logger.error(f"Usage analytics error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("APP_PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |