jeanbaptdzd's picture
feat: Update all configs to use LinguaCustodia Pro Finance Suite models
1fa5de9
#!/usr/bin/env python3
"""
LinguaCustodia Financial AI API - Clean Production Version
Consolidated, production-ready API with proper architecture.
Version: 24.1.0 - vLLM Backend with ModelInfo fixes
"""
import os
import sys
import uvicorn
import json
import time
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any, AsyncIterator, List
import logging
import asyncio
import threading
# Fix OMP_NUM_THREADS warning
os.environ["OMP_NUM_THREADS"] = "1"
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Inline Configuration Pattern for HuggingFace Spaces Deployment
# This avoids module import issues in containerized environments
ARCHITECTURE = "Inline Configuration (HF Optimized)"
# Inline model configuration (synchronized with lingua_fin/config/)
MODEL_CONFIG = {
# LinguaCustodia Pro Finance Suite Models
"pro-finance-large": {
"model_id": "LinguaCustodia/Llama-Pro-Finance-Large",
"display_name": "Llama Pro Finance Large",
"architecture": "LlamaForCausalLM",
"parameters": "70B",
"memory_gb": 140,
"vram_gb": 80,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"pro-finance-medium": {
"model_id": "LinguaCustodia/LLM-Pro-Finance-Medium",
"display_name": "LLM Pro Finance Medium",
"architecture": "LlamaForCausalLM",
"parameters": "32B",
"memory_gb": 64,
"vram_gb": 32,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"pro-finance-small": {
"model_id": "LinguaCustodia/LLM-Pro-Finance-Small",
"display_name": "LLM Pro Finance Small",
"architecture": "LlamaForCausalLM",
"parameters": "8B",
"memory_gb": 16,
"vram_gb": 8,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"pro-finance-mini": {
"model_id": "LinguaCustodia/LLM-Pro-Finance-Mini",
"display_name": "LLM Pro Finance Mini",
"architecture": "LlamaForCausalLM",
"parameters": "3B",
"memory_gb": 6,
"vram_gb": 3,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"llama-pro-finance-mini": {
"model_id": "LinguaCustodia/Llama-Pro-Finance-Mini",
"display_name": "Llama Pro Finance Mini",
"architecture": "LlamaForCausalLM",
"parameters": "1B",
"memory_gb": 3,
"vram_gb": 2,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"fin-pythia-1.4b": {
"model_id": "LinguaCustodia/fin-pythia-1.4b",
"display_name": "Fin-Pythia 1.4B Financial",
"architecture": "GPTNeoXForCausalLM",
"parameters": "1.4B",
"memory_gb": 3,
"vram_gb": 2,
"eos_token_id": 0,
"bos_token_id": 0,
"vocab_size": 50304
},
# v1.0 Models (Latest Generation)
"llama3.1-8b-v1.0": {
"model_id": "LinguaCustodia/llama3.1-8b-fin-v1.0",
"display_name": "Llama 3.1 8B Financial v1.0",
"architecture": "LlamaForCausalLM",
"parameters": "8B",
"memory_gb": 16,
"vram_gb": 8,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"qwen3-8b-v1.0": {
"model_id": "LinguaCustodia/qwen3-8b-fin-v1.0",
"display_name": "Qwen 3 8B Financial v1.0",
"architecture": "Qwen3ForCausalLM",
"parameters": "8B",
"memory_gb": 16,
"vram_gb": 8,
"eos_token_id": 151645,
"bos_token_id": None,
"vocab_size": 151936
},
"qwen3-32b-v1.0": {
"model_id": "LinguaCustodia/qwen3-32b-fin-v1.0",
"display_name": "Qwen 3 32B Financial v1.0",
"architecture": "Qwen3ForCausalLM",
"parameters": "32B",
"memory_gb": 64,
"vram_gb": 32,
"eos_token_id": 151645,
"bos_token_id": None,
"vocab_size": 151936
},
"llama3.1-70b-v1.0": {
"model_id": "LinguaCustodia/llama3.1-70b-fin-v1.0",
"display_name": "Llama 3.1 70B Financial v1.0",
"architecture": "LlamaForCausalLM",
"parameters": "70B",
"memory_gb": 140,
"vram_gb": 80,
"eos_token_id": 128009,
"bos_token_id": 128000,
"vocab_size": 128000
},
"gemma3-12b-v1.0": {
"model_id": "LinguaCustodia/gemma3-12b-fin-v1.0",
"display_name": "Gemma 3 12B Financial v1.0",
"architecture": "GemmaForCausalLM",
"parameters": "12B",
"memory_gb": 32,
"vram_gb": 12,
"eos_token_id": 1,
"bos_token_id": 2,
"vocab_size": 262144
}
}
# Inline generation configuration
GENERATION_CONFIG = {
"temperature": 0.6,
"top_p": 0.9,
"max_new_tokens": 150,
"repetition_penalty": 1.05,
"early_stopping": False,
"min_length": 50
}
# Initialize FastAPI app
app = FastAPI(
title="LinguaCustodia Financial AI API",
description=f"Production-ready API with {ARCHITECTURE}",
version="23.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models for API
class InferenceRequest(BaseModel):
prompt: str
max_new_tokens: Optional[int] = 150
temperature: Optional[float] = 0.6
class InferenceResponse(BaseModel):
response: str
model_used: str
success: bool
tokens_generated: int
generation_params: Dict[str, Any]
class HealthResponse(BaseModel):
status: str
model_loaded: bool
current_model: Optional[str]
gpu_available: bool
memory_usage: Optional[Dict[str, Any]]
storage_info: Optional[Dict[str, Any]]
architecture: str
loading_status: Optional[Dict[str, Any]] = None
# Global variables for inline configuration
model = None
tokenizer = None
pipe = None
model_loaded = False
current_model_name = None
storage_info = None
# Platform-Specific vLLM Configurations
def get_vllm_config_for_model(model_name: str, platform: str = "huggingface") -> dict:
"""Get vLLM configuration optimized for specific model and platform."""
base_config = {
"tensor_parallel_size": 1, # Single GPU
"pipeline_parallel_size": 1, # No pipeline parallelism
"trust_remote_code": True, # Required for LinguaCustodia
"dtype": "bfloat16", # L40 GPU optimization
"enforce_eager": True, # Disable CUDA graphs (HF compatibility - conservative)
"disable_custom_all_reduce": True, # Disable custom kernels (HF compatibility)
"disable_log_stats": True, # Reduce logging overhead
}
# Model-specific context length configurations
if "llama3.1-8b" in model_name:
max_context = 128000 # Llama 3.1 8B supports 128K
elif "qwen3-8b" in model_name:
max_context = 32768 # Qwen 3 8B supports 32K
elif "qwen3-32b" in model_name:
max_context = 32768 # Qwen 3 32B supports 32K
elif "llama3.1-70b" in model_name:
max_context = 128000 # Llama 3.1 70B supports 128K
elif "gemma3-12b" in model_name:
max_context = 8192 # Gemma 3 12B supports 8K
else:
max_context = 32768 # Default fallback
if platform == "huggingface":
# Model-specific configurations for HF L40 (48GB VRAM)
if "32b" in model_name.lower() or "70b" in model_name.lower():
# ⚠️ WARNING: 32B and 70B models are too large for L40 GPU (48GB VRAM)
# These configurations are experimental and may not work
return {
**base_config,
"gpu_memory_utilization": 0.50, # Extremely conservative for large models
"max_model_len": min(max_context, 4096), # Use model's max or 4K for HF
"max_num_batched_tokens": min(max_context, 4096), # Reduced batching
}
elif "12b" in model_name.lower():
# ⚠️ WARNING: Gemma 12B is too large for L40 GPU (48GB VRAM)
# Model weights load fine (~22GB) but KV cache allocation fails
return {
**base_config,
"gpu_memory_utilization": 0.50, # Conservative for 12B model
"max_model_len": min(max_context, 2048), # Use model's max or 2K for HF
"max_num_batched_tokens": min(max_context, 2048), # Reduced batching
}
else:
# Default for 8B and smaller models
return {
**base_config,
"gpu_memory_utilization": 0.75, # Standard for 8B models
"max_model_len": max_context, # Use model's actual max context
"max_num_batched_tokens": max_context, # Full batching
}
else:
# Scaleway configuration (more aggressive)
return {
**base_config,
"gpu_memory_utilization": 0.85, # Aggressive for Scaleway L40S
"max_model_len": max_context, # Use model's actual max context
"max_num_batched_tokens": max_context, # Full batching
"enforce_eager": False, # Enable CUDA graphs for maximum performance
"disable_custom_all_reduce": False, # Enable all optimizations
}
VLLM_CONFIG_HF = {
"gpu_memory_utilization": 0.75, # Standard for 8B models
"max_model_len": 32768, # Default 32K context (Llama 3.1 8B can use 128K)
"tensor_parallel_size": 1, # Single GPU
"pipeline_parallel_size": 1, # No pipeline parallelism
"trust_remote_code": True, # Required for LinguaCustodia
"dtype": "bfloat16", # L40 GPU optimization
"enforce_eager": True, # Disable CUDA graphs (HF compatibility - conservative)
"disable_custom_all_reduce": True, # Disable custom kernels (HF compatibility)
"disable_log_stats": True, # Reduce logging overhead
"max_num_batched_tokens": 32768, # Default batching
}
VLLM_CONFIG_SCW = {
"gpu_memory_utilization": 0.85, # Aggressive for Scaleway L40S (40.8GB of 48GB)
"max_model_len": 32768, # Default 32K context (model-specific)
"tensor_parallel_size": 1, # Single GPU
"pipeline_parallel_size": 1, # No pipeline parallelism
"trust_remote_code": True, # Required for LinguaCustodia
"dtype": "bfloat16", # L40S GPU optimization
"enforce_eager": False, # Use CUDA graphs for maximum speed
"disable_custom_all_reduce": False, # Enable all optimizations
}
# Backend Abstraction Layer
class InferenceBackend:
"""Unified interface for all inference backends."""
def __init__(self, backend_type: str, model_config: dict):
self.backend_type = backend_type
self.model_config = model_config
self.engine = None
def load_model(self, model_id: str) -> bool:
"""Load model with platform-specific optimizations."""
raise NotImplementedError
def run_inference(self, prompt: str, **kwargs) -> dict:
"""Run inference with consistent response format."""
raise NotImplementedError
def get_memory_info(self) -> dict:
"""Get memory usage information."""
raise NotImplementedError
def sleep(self) -> bool:
"""Put backend into sleep mode (for HuggingFace Spaces)."""
raise NotImplementedError
def wake(self) -> bool:
"""Wake up backend from sleep mode."""
raise NotImplementedError
def cleanup(self) -> None:
"""Clean up resources."""
raise NotImplementedError
class VLLMBackend(InferenceBackend):
"""vLLM implementation with platform-specific optimizations."""
def __init__(self, model_config: dict, platform: str = "huggingface"):
super().__init__("vllm", model_config)
self.platform = platform
# Get model-specific configuration
model_name = getattr(model_config, 'model_id', 'default')
self.config = get_vllm_config_for_model(model_name, platform)
logger.info(f"🔧 Using {platform}-optimized vLLM config for {model_name}")
logger.info(f"📊 vLLM Config: {self.config}")
def load_model(self, model_id: str) -> bool:
"""Load model with vLLM engine."""
try:
from vllm import LLM
import os
logger.info(f"🚀 Initializing vLLM engine for {model_id}")
logger.info(f"📊 vLLM Config: {self.config}")
# Set HF token for vLLM authentication - try both LinguaCustodia tokens
hf_token_lc = os.getenv("HF_TOKEN_LC")
hf_token_lc2 = os.getenv("HF_TOKEN_LC2")
# Clean tokens and choose the right one
if hf_token_lc:
hf_token_lc = hf_token_lc.strip()
if hf_token_lc2:
hf_token_lc2 = hf_token_lc2.strip()
# Use HF_TOKEN_LC2 if available, fallback to HF_TOKEN_LC
working_token = hf_token_lc2 if hf_token_lc2 else hf_token_lc
if working_token:
# Clear HuggingFace cache to force fresh download with correct token
import shutil
hf_cache_dir = os.path.expanduser("~/.cache/huggingface")
hf_home_cache = os.path.join(os.getenv('HF_HOME', '/data/.huggingface'), 'hub')
# Clear model cache for this specific model to force re-download
model_cache_dirs = [
os.path.join(hf_cache_dir, 'hub'),
hf_home_cache
]
for cache_dir in model_cache_dirs:
if os.path.exists(cache_dir):
# Look for this specific model in cache and remove it
model_cache_name = model_id.replace('/', '--')
for item in os.listdir(cache_dir):
if model_cache_name in item:
item_path = os.path.join(cache_dir, item)
if os.path.isdir(item_path):
logger.info(f"🗑️ Clearing cached model: {item_path}")
shutil.rmtree(item_path, ignore_errors=True)
# Set working LinguaCustodia token as primary token
os.environ["HF_TOKEN"] = working_token
os.environ["HUGGING_FACE_HUB_TOKEN"] = working_token
token_name = "HF_TOKEN_LC2" if working_token == hf_token_lc2 else "HF_TOKEN_LC"
logger.info(f"🔑 Using {token_name} for LinguaCustodia model authentication")
# Force re-authentication with the working LinguaCustodia token
try:
from huggingface_hub import login
login(token=working_token, add_to_git_credential=False)
logger.info(f"✅ Successfully authenticated with {token_name}")
except Exception as e:
logger.warning(f"⚠️ {token_name} authentication failed: {e}")
else:
logger.warning("⚠️ No working LinguaCustodia token found - may fail for LinguaCustodia models")
self.engine = LLM(
model=model_id,
**self.config
)
logger.info("✅ vLLM engine initialized successfully")
return True
except Exception as e:
logger.error(f"❌ vLLM model loading failed: {e}")
return False
def run_inference(self, prompt: str, **kwargs) -> dict:
"""Run inference with vLLM engine."""
if not self.engine:
return {"error": "vLLM engine not loaded", "success": False}
try:
from vllm import SamplingParams
# Get stop tokens from kwargs or use model-specific defaults
stop_tokens = kwargs.get('stop')
if not stop_tokens and hasattr(self, 'model_config'):
model_name = getattr(self.model_config, 'model_id', '')
stop_tokens = get_stop_tokens_for_model(model_name)
sampling_params = SamplingParams(
temperature=kwargs.get('temperature', 0.6),
max_tokens=kwargs.get('max_new_tokens', 512), # Increased default
top_p=kwargs.get('top_p', 0.9),
repetition_penalty=kwargs.get('repetition_penalty', 1.1), # Increased from 1.05
stop=stop_tokens # Add stop tokens
)
outputs = self.engine.generate([prompt], sampling_params)
response = outputs[0].outputs[0].text
return {
"response": response,
"model_used": getattr(self.model_config, 'model_id', 'unknown'),
"success": True,
"backend": "vLLM",
"tokens_generated": len(response.split()),
"generation_params": {
"temperature": sampling_params.temperature,
"max_tokens": sampling_params.max_tokens,
"top_p": sampling_params.top_p
}
}
except Exception as e:
logger.error(f"vLLM inference error: {e}")
return {"error": str(e), "success": False}
def get_memory_info(self) -> dict:
"""Get vLLM memory information."""
try:
import torch
if torch.cuda.is_available():
return {
"gpu_available": True,
"gpu_memory_allocated": torch.cuda.memory_allocated(),
"gpu_memory_reserved": torch.cuda.memory_reserved(),
"backend": "vLLM"
}
except Exception as e:
logger.error(f"Error getting vLLM memory info: {e}")
return {"gpu_available": False, "backend": "vLLM"}
def sleep(self) -> bool:
"""Put vLLM engine into sleep mode (for HuggingFace Spaces)."""
try:
if self.engine and hasattr(self.engine, 'sleep'):
logger.info("😴 Putting vLLM engine to sleep...")
self.engine.sleep()
logger.info("✅ vLLM engine is now sleeping (GPU memory released)")
return True
else:
logger.info("ℹ️ vLLM engine doesn't support sleep mode or not loaded")
return False
except Exception as e:
logger.warning(f"⚠️ Error putting vLLM to sleep (non-critical): {e}")
return False
def wake(self) -> bool:
"""Wake up vLLM engine from sleep mode."""
try:
if self.engine and hasattr(self.engine, 'wake'):
logger.info("🌅 Waking up vLLM engine...")
self.engine.wake()
logger.info("✅ vLLM engine is now awake")
return True
else:
logger.info("ℹ️ vLLM engine doesn't support wake mode or not loaded")
return False
except Exception as e:
logger.warning(f"⚠️ Error waking up vLLM (non-critical): {e}")
return False
def cleanup(self) -> None:
"""Clean up vLLM resources gracefully."""
try:
if self.engine:
logger.info("🧹 Shutting down vLLM engine...")
# vLLM engines don't have explicit shutdown methods, but we can clean up references
del self.engine
self.engine = None
logger.info("✅ vLLM engine reference cleared")
# Clear CUDA cache
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info("✅ CUDA cache cleared")
# Force garbage collection
import gc
gc.collect()
logger.info("✅ Garbage collection completed")
except Exception as e:
logger.error(f"❌ Error during vLLM cleanup: {e}")
class TransformersBackend(InferenceBackend):
"""Current Transformers implementation (fallback)."""
def __init__(self, model_config: dict):
super().__init__("transformers", model_config)
def load_model(self, model_id: str) -> bool:
"""Load model with Transformers (current implementation)."""
return load_linguacustodia_model()
def run_inference(self, prompt: str, **kwargs) -> dict:
"""Run inference with Transformers pipeline."""
return run_inference(prompt, **kwargs)
def get_memory_info(self) -> dict:
"""Get Transformers memory information."""
return get_gpu_memory_info()
def sleep(self) -> bool:
"""Put Transformers backend into sleep mode."""
try:
logger.info("😴 Transformers backend doesn't support sleep mode, cleaning up memory instead...")
cleanup_model_memory()
return True
except Exception as e:
logger.error(f"❌ Error during Transformers sleep: {e}")
return False
def wake(self) -> bool:
"""Wake up Transformers backend from sleep mode."""
try:
logger.info("🌅 Transformers backend wake - no action needed")
return True
except Exception as e:
logger.error(f"❌ Error during Transformers wake: {e}")
return False
def cleanup(self) -> None:
"""Clean up Transformers resources."""
cleanup_model_memory()
# Inline configuration functions
def get_app_settings():
"""Get application settings from environment variables."""
# Check if MODEL_NAME is set, if not use qwen3-8b as default
model_name = os.getenv('MODEL_NAME')
if not model_name or model_name not in MODEL_CONFIG:
model_name = 'pro-finance-small' # Default to Pro Finance Small (8B)
logger.info(f"Using default model: {model_name}")
return type('Settings', (), {
'model_name': model_name,
'hf_token_lc': os.getenv('HF_TOKEN_LC'),
'hf_token': os.getenv('HF_TOKEN')
})()
def get_model_config(model_name: str):
"""Get model configuration."""
if model_name not in MODEL_CONFIG:
raise ValueError(f"Model '{model_name}' not found")
return type('ModelInfo', (), MODEL_CONFIG[model_name])()
def get_linguacustodia_config():
"""Get complete configuration."""
return type('Config', (), {
'models': MODEL_CONFIG,
'get_model_info': lambda name: type('ModelInfo', (), MODEL_CONFIG[name])(),
'list_models': lambda: MODEL_CONFIG
})()
def create_inference_backend() -> InferenceBackend:
"""Factory method for creating appropriate backend."""
# Environment detection
deployment_env = os.getenv('DEPLOYMENT_ENV', 'huggingface')
use_vllm = os.getenv('USE_VLLM', 'true').lower() == 'true'
# Get model configuration
settings = get_app_settings()
model_config = get_model_config(settings.model_name)
# Backend selection logic with platform-specific optimizations
if use_vllm and deployment_env in ['huggingface', 'scaleway']:
logger.info(f"🚀 Initializing vLLM backend for {deployment_env}")
return VLLMBackend(model_config, platform=deployment_env)
else:
logger.info(f"🔄 Using Transformers backend for {deployment_env}")
return TransformersBackend(model_config)
# Global backend instance - will be initialized on startup
inference_backend = None
# Model loading state tracking
model_loading_state = {
"is_loading": False,
"loading_model": None,
"loading_progress": 0,
"loading_status": "idle",
"loading_start_time": None,
"loading_error": None
}
def setup_storage():
"""Setup storage configuration."""
hf_home = os.getenv('HF_HOME', '/data/.huggingface')
os.environ['HF_HOME'] = hf_home
return {
'hf_home': hf_home,
'persistent_storage': True,
'cache_dir_exists': True,
'cache_dir_writable': True
}
def update_loading_state(status: str, progress: int = 0, error: str = None):
"""Update the global loading state."""
global model_loading_state
model_loading_state.update({
"loading_status": status,
"loading_progress": progress,
"loading_error": error
})
if error:
model_loading_state["is_loading"] = False
def save_model_preference(model_name: str) -> bool:
"""Save model preference to persistent storage for restart."""
try:
preference_file = "/data/.model_preference"
os.makedirs("/data", exist_ok=True)
with open(preference_file, 'w') as f:
f.write(model_name)
logger.info(f"✅ Saved model preference: {model_name}")
return True
except Exception as e:
logger.error(f"❌ Failed to save model preference: {e}")
return False
def load_model_preference() -> Optional[str]:
"""Load saved model preference from persistent storage."""
try:
preference_file = "/data/.model_preference"
if os.path.exists(preference_file):
with open(preference_file, 'r') as f:
model_name = f.read().strip()
logger.info(f"✅ Loaded model preference: {model_name}")
return model_name
return None
except Exception as e:
logger.error(f"❌ Failed to load model preference: {e}")
return None
async def trigger_service_restart():
"""Trigger a graceful service restart for model switching."""
try:
logger.info("🔄 Triggering graceful service restart for model switch...")
# Give time for response to be sent
await asyncio.sleep(2)
# On HuggingFace Spaces, we can trigger a restart by exiting
# The Space will automatically restart
import sys
sys.exit(0)
except Exception as e:
logger.error(f"❌ Error triggering restart: {e}")
async def load_model_async(model_name: str, model_info: dict, new_model_config: dict):
"""
Model switching via service restart.
vLLM doesn't support runtime model switching, so we save the preference
and trigger a graceful restart. The new model will be loaded on startup.
"""
global model_loading_state
try:
# Update loading state
model_loading_state.update({
"is_loading": True,
"loading_model": model_name,
"loading_progress": 10,
"loading_status": "saving_preference",
"loading_start_time": time.time(),
"loading_error": None
})
# Save the model preference to persistent storage
logger.info(f"💾 Saving model preference: {model_name}")
if not save_model_preference(model_name):
update_loading_state("error", 0, "Failed to save model preference")
return
update_loading_state("preparing_restart", 50)
logger.info(f"🔄 Model preference saved. Triggering service restart to load {model_info['display_name']}...")
# Trigger graceful restart
await trigger_service_restart()
except Exception as e:
logger.error(f"Error in model switching: {e}")
update_loading_state("error", 0, str(e))
def load_linguacustodia_model(force_reload=False):
"""
Load the LinguaCustodia model with intelligent caching.
Strategy:
- If no model loaded: Load from cache if available, else download
- If same model already loaded: Skip (use loaded model)
- If different model requested: Clean memory, clean storage, then load new model
"""
global model, tokenizer, pipe, model_loaded, current_model_name
try:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from huggingface_hub import login
import torch
settings = get_app_settings()
model_config = get_model_config(settings.model_name)
requested_model_id = model_config.model_id
# Case 1: Same model already loaded in memory - reuse it
if model_loaded and current_model_name == requested_model_id and not force_reload:
logger.info(f"✅ Model {model_config.display_name} already loaded in memory, reusing")
return True
# Case 2: Different model requested - clean everything first
if model_loaded and current_model_name != requested_model_id:
logger.info(f"🔄 Model switch detected: {current_model_name}{requested_model_id}")
logger.info(f"🧹 Cleaning memory and storage for model switch...")
cleanup_model_memory()
# Note: HuggingFace will automatically use cached model files if available
# We only clean GPU memory, not disk cache
# Case 3: Force reload requested
if force_reload and model_loaded:
logger.info(f"🔄 Force reload requested for {requested_model_id}")
cleanup_model_memory()
# Authenticate with HuggingFace
login(token=settings.hf_token_lc, add_to_git_credential=False)
logger.info(f"✅ Authenticated with HuggingFace")
# Load model (will use cached files if available)
logger.info(f"🚀 Loading model: {model_config.display_name}")
logger.info(f"📦 Model ID: {requested_model_id}")
logger.info(f"💾 Will use cached files from {os.getenv('HF_HOME', '~/.cache/huggingface')} if available")
# Load tokenizer from cache or download
tokenizer = AutoTokenizer.from_pretrained(
requested_model_id,
token=settings.hf_token_lc,
trust_remote_code=True
)
logger.info(f"✅ Tokenizer loaded")
# Load model from cache or download
model = AutoModelForCausalLM.from_pretrained(
requested_model_id,
token=settings.hf_token_lc,
dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
logger.info(f"✅ Model loaded")
# Create inference pipeline
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
dtype=torch.bfloat16,
device_map="auto"
)
logger.info(f"✅ Pipeline created")
# Update global state
current_model_name = requested_model_id
model_loaded = True
logger.info(f"🎉 {model_config.display_name} ready for inference!")
return True
except Exception as e:
logger.error(f"❌ Failed to load model: {e}")
cleanup_model_memory()
return False
def cleanup_model_memory():
"""
Clean up model memory before loading a new model.
This clears GPU memory but keeps disk cache intact for faster reloading.
"""
global model, tokenizer, pipe, model_loaded
try:
import torch
import gc
logger.info("🧹 Starting memory cleanup...")
# Delete model objects from memory
if pipe is not None:
del pipe
pipe = None
logger.info(" ✓ Pipeline removed")
if model is not None:
del model
model = None
logger.info(" ✓ Model removed")
if tokenizer is not None:
del tokenizer
tokenizer = None
logger.info(" ✓ Tokenizer removed")
model_loaded = False
# Clear GPU cache if available
if torch.cuda.is_available():
allocated_before = torch.cuda.memory_allocated() / (1024**3)
torch.cuda.empty_cache()
torch.cuda.synchronize()
allocated_after = torch.cuda.memory_allocated() / (1024**3)
freed = allocated_before - allocated_after
logger.info(f" ✓ GPU cache cleared (freed ~{freed:.2f}GB)")
# Force garbage collection
gc.collect()
logger.info(" ✓ Garbage collection completed")
logger.info("✅ Memory cleanup completed successfully")
logger.info("💾 Disk cache preserved for faster model loading")
except Exception as e:
logger.warning(f"⚠️ Error during memory cleanup: {e}")
def run_inference(prompt: str, max_new_tokens: int = 150, temperature: float = 0.6):
"""Run inference with the loaded model."""
global pipe, model, tokenizer, model_loaded, current_model_name
if not model_loaded or pipe is None:
return {
"response": "",
"model_used": current_model_name,
"success": False,
"tokens_generated": 0,
"generation_params": {},
"error": "Model not loaded"
}
try:
# Update pipeline parameters
pipe.max_new_tokens = max_new_tokens
pipe.temperature = temperature
# Generate response
result = pipe(prompt)
generated_text = result[0]['generated_text']
response_text = generated_text[len(prompt):].strip()
tokens_generated = len(tokenizer.encode(response_text))
return {
"response": response_text,
"model_used": current_model_name,
"success": True,
"tokens_generated": tokens_generated,
"generation_params": {
"max_new_tokens": max_new_tokens,
"temperature": temperature,
**GENERATION_CONFIG
}
}
except Exception as e:
logger.error(f"Inference error: {e}")
return {
"response": "",
"model_used": current_model_name,
"success": False,
"tokens_generated": 0,
"generation_params": {},
"error": str(e)
}
def get_gpu_memory_info():
"""Get GPU memory information."""
try:
import torch
if not torch.cuda.is_available():
return {"gpu_available": False}
allocated = torch.cuda.memory_allocated()
reserved = torch.cuda.memory_reserved()
total = torch.cuda.get_device_properties(0).total_memory
return {
"gpu_available": True,
"gpu_name": torch.cuda.get_device_name(0),
"gpu_memory_allocated": f"{allocated / (1024**3):.2f}GB",
"gpu_memory_reserved": f"{reserved / (1024**3):.2f}GB",
"gpu_memory_total": f"{total / (1024**3):.2f}GB"
}
except Exception as e:
return {"gpu_available": False, "error": str(e)}
@app.on_event("startup")
async def startup_event():
"""Initialize the application on startup."""
global storage_info, inference_backend
logger.info(f"🚀 Starting LinguaCustodia API - {ARCHITECTURE} v24.1.0 (vLLM Ready)...")
# Setup storage first and store globally
storage_info = setup_storage()
logger.info(f"📊 Storage configuration: {storage_info}")
# Initialize backend
inference_backend = create_inference_backend()
logger.info(f"🔧 Backend initialized: {inference_backend.backend_type}")
# Check for saved model preference (from restart-based model switching)
saved_preference = load_model_preference()
if saved_preference:
logger.info(f"🔄 Found saved model preference: {saved_preference}")
model_name = saved_preference
else:
# Use default from environment or settings
settings = get_app_settings()
model_name = settings.model_name
logger.info(f"📋 Using default model: {model_name}")
# Load the selected model
model_config = get_model_config(model_name)
success = inference_backend.load_model(model_config.model_id)
if success:
logger.info(f"✅ Model loaded successfully on startup using {inference_backend.backend_type} backend")
# For vLLM backend, check if we need to wake up from sleep
if inference_backend.backend_type == "vllm":
logger.info("🌅 Checking if vLLM needs to wake up from sleep...")
try:
wake_success = inference_backend.wake()
if wake_success:
logger.info("✅ vLLM wake-up successful")
else:
logger.info("ℹ️ vLLM wake-up not needed (fresh startup)")
except Exception as e:
logger.info(f"ℹ️ vLLM wake-up check completed (normal on fresh startup): {e}")
else:
logger.error("❌ Failed to load model on startup")
@app.on_event("shutdown")
async def shutdown_event():
"""Gracefully shutdown the application."""
global inference_backend
logger.info("🛑 Starting graceful shutdown...")
try:
if inference_backend:
logger.info(f"🧹 Cleaning up {inference_backend.backend_type} backend...")
inference_backend.cleanup()
logger.info("✅ Backend cleanup completed")
# Additional cleanup for global variables
cleanup_model_memory()
logger.info("✅ Global memory cleanup completed")
logger.info("✅ Graceful shutdown completed successfully")
except Exception as e:
logger.error(f"❌ Error during shutdown: {e}")
# Don't raise the exception to avoid preventing shutdown
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint."""
global storage_info, inference_backend, model_loading_state
if inference_backend is None:
return HealthResponse(
status="starting",
model_loaded=False,
current_model="unknown",
gpu_available=False,
memory_usage=None,
storage_info=storage_info,
architecture=f"{ARCHITECTURE} + INITIALIZING",
loading_status=model_loading_state
)
memory_info = inference_backend.get_memory_info()
return HealthResponse(
status="healthy" if inference_backend.engine else "model_not_loaded",
model_loaded=inference_backend.engine is not None,
current_model=getattr(inference_backend.model_config, 'model_id', 'unknown'),
gpu_available=memory_info.get("gpu_available", False),
memory_usage=memory_info if memory_info.get("gpu_available") else None,
storage_info=storage_info,
architecture=f"{ARCHITECTURE} + {inference_backend.backend_type.upper()}",
loading_status=model_loading_state
)
@app.get("/test/model-configs")
async def test_model_configs():
"""Test endpoint to verify actual model configurations from HuggingFace Hub."""
import requests
import os
models_to_test = [
"LinguaCustodia/Llama-Pro-Finance-Large",
"LinguaCustodia/LLM-Pro-Finance-Medium",
"LinguaCustodia/LLM-Pro-Finance-Small",
"LinguaCustodia/LLM-Pro-Finance-Mini",
"LinguaCustodia/Llama-Pro-Finance-Mini"
]
results = {}
# Get LinguaCustodia token for authentication
hf_token_lc = os.getenv("HF_TOKEN_LC")
headers = {}
if hf_token_lc:
headers["Authorization"] = f"Bearer {hf_token_lc}"
# Debug: Check if LinguaCustodia token is available
token_available = bool(hf_token_lc)
token_length = len(hf_token_lc) if hf_token_lc else 0
for model_name in models_to_test:
try:
# Use HuggingFace Hub library to get model config
from huggingface_hub import hf_hub_download
import json
# Download the config.json file using the LinguaCustodia token
config_path = hf_hub_download(
repo_id=model_name,
filename="config.json",
token=hf_token_lc if hf_token_lc else None
)
# Read the config file
with open(config_path, 'r') as f:
config = json.load(f)
# Extract context length
context_length = None
context_params = [
"max_position_embeddings",
"n_positions",
"max_sequence_length",
"context_length",
"max_context_length"
]
for param in context_params:
if param in config:
value = config[param]
if isinstance(value, dict) and "max_position_embeddings" in value:
context_length = value["max_position_embeddings"]
elif isinstance(value, int):
context_length = value
break
results[model_name] = {
"context_length": context_length,
"model_type": config.get("model_type", "unknown"),
"architectures": config.get("architectures", []),
"config_available": True
}
except Exception as e:
results[model_name] = {
"context_length": None,
"config_available": False,
"error": str(e)
}
return {
"test_results": results,
"expected_contexts": {
"LinguaCustodia/Llama-Pro-Finance-Large": "TBD",
"LinguaCustodia/LLM-Pro-Finance-Medium": "TBD",
"LinguaCustodia/LLM-Pro-Finance-Small": "TBD",
"LinguaCustodia/LLM-Pro-Finance-Mini": "TBD",
"LinguaCustodia/Llama-Pro-Finance-Mini": "TBD"
},
"debug_info": {
"token_available": token_available,
"token_length": token_length,
"headers_sent": bool(headers.get("Authorization"))
}
}
@app.get("/debug/env")
async def debug_env():
"""Debug endpoint to check environment variables."""
import os
env_vars = {}
for key, value in os.environ.items():
if "HF" in key or "TOKEN" in key:
# Mask sensitive values
if "TOKEN" in key or "SECRET" in key:
env_vars[key] = f"{value[:10]}..." if value else "None"
else:
env_vars[key] = value
return {"environment_variables": env_vars}
@app.get("/debug/token-test")
async def debug_token_test():
"""Test both HF_TOKEN_LC and HF_TOKEN_LC2 to see which can access v1.0 models."""
import requests
import os
# Get both tokens and clean them
hf_token_lc = os.getenv("HF_TOKEN_LC")
hf_token_lc2 = os.getenv("HF_TOKEN_LC2")
# Clean tokens (remove whitespace/newlines)
if hf_token_lc:
hf_token_lc = hf_token_lc.strip()
if hf_token_lc2:
hf_token_lc2 = hf_token_lc2.strip()
if not hf_token_lc and not hf_token_lc2:
return {"error": "No HF_TOKEN_LC or HF_TOKEN_LC2 found"}
test_model = "LinguaCustodia/qwen3-8b-fin-v1.0"
results = {}
# Test both tokens
tokens_to_test = {}
if hf_token_lc:
tokens_to_test["HF_TOKEN_LC"] = hf_token_lc
if hf_token_lc2:
tokens_to_test["HF_TOKEN_LC2"] = hf_token_lc2
for token_name, token_value in tokens_to_test.items():
headers = {"Authorization": f"Bearer {token_value}"}
token_results = {}
try:
# Test 1: HuggingFace Hub API
api_url = f"https://huggingface.co/api/models/{test_model}"
response1 = requests.get(api_url, headers=headers, timeout=30)
token_results["hub_api"] = {
"status_code": response1.status_code,
"response": response1.text[:200]
}
# Test 2: Try the collection API
collection_url = "https://huggingface.co/api/collections/LinguaCustodia/llm-pro-finance-suite-68de414b003d9078f76e43fc"
response2 = requests.get(collection_url, headers=headers, timeout=30)
token_results["collection_api"] = {
"status_code": response2.status_code,
"response": response2.text[:200]
}
# Test 3: Try raw config access
config_url = f"https://huggingface.co/{test_model}/raw/main/config.json"
response3 = requests.get(config_url, headers=headers, timeout=30)
token_results["raw_config"] = {
"status_code": response3.status_code,
"response": response3.text[:200]
}
results[token_name] = {
"token_length": len(token_value),
"headers_sent": headers.get("Authorization", "")[:20] + "...",
"test_results": token_results,
"success": any(r["status_code"] == 200 for r in token_results.values())
}
except Exception as e:
results[token_name] = {
"token_length": len(token_value),
"error": str(e),
"success": False
}
return {
"model": test_model,
"token_comparison": results,
"working_token": next((name for name, data in results.items() if data.get("success")), None)
}
@app.get("/backend")
async def backend_info():
"""Get backend information."""
global inference_backend
if inference_backend is None:
return {
"backend_type": "initializing",
"model_loaded": False,
"current_model": "unknown",
"vllm_config": None,
"memory_info": {"gpu_available": False}
}
vllm_config = None
if inference_backend.backend_type == "vllm":
if hasattr(inference_backend, 'platform'):
vllm_config = VLLM_CONFIG_HF if inference_backend.platform == "huggingface" else VLLM_CONFIG_SCW
else:
vllm_config = VLLM_CONFIG_HF # fallback
return {
"backend_type": inference_backend.backend_type,
"model_loaded": inference_backend.engine is not None,
"current_model": getattr(inference_backend.model_config, 'model_id', 'unknown'),
"platform": getattr(inference_backend, 'platform', 'unknown'),
"vllm_config": vllm_config,
"memory_info": inference_backend.get_memory_info()
}
@app.get("/")
async def root():
"""Root endpoint with API information."""
global storage_info
try:
settings = get_app_settings()
model_config = get_model_config(settings.model_name)
return {
"message": f"LinguaCustodia Financial AI API - {ARCHITECTURE}",
"version": "23.0.0",
"status": "running",
"model_loaded": model_loaded,
"current_model": settings.model_name,
"current_model_info": {
"display_name": model_config.display_name,
"model_id": model_config.model_id,
"architecture": model_config.architecture,
"parameters": model_config.parameters,
"memory_gb": model_config.memory_gb,
"vram_gb": model_config.vram_gb,
"vocab_size": model_config.vocab_size,
"eos_token_id": model_config.eos_token_id
},
"endpoints": {
"health": "/health",
"inference": "/inference",
"models": "/models",
"load-model": "/load-model",
"docs": "/docs",
"diagnose": "/diagnose"
},
"storage_info": storage_info,
"architecture": ARCHITECTURE
}
except Exception as e:
logger.error(f"Error in root endpoint: {e}")
return {
"message": f"LinguaCustodia Financial AI API - {ARCHITECTURE}",
"version": "23.0.0",
"status": "running",
"model_loaded": model_loaded,
"current_model": current_model_name,
"error": str(e),
"storage_info": storage_info,
"architecture": ARCHITECTURE
}
@app.get("/models")
async def list_models():
"""List all available models and their configurations."""
try:
settings = get_app_settings()
model_config = get_model_config(settings.model_name)
# Build simplified model info for all models
all_models = {}
for model_name, model_data in MODEL_CONFIG.items():
all_models[model_name] = {
"display_name": model_data["display_name"],
"model_id": model_data["model_id"],
"architecture": model_data["architecture"],
"parameters": model_data["parameters"],
"memory_gb": model_data["memory_gb"],
"vram_gb": model_data["vram_gb"]
}
return {
"current_model": settings.model_name,
"current_model_info": {
"display_name": model_config.display_name,
"model_id": model_config.model_id,
"architecture": model_config.architecture,
"parameters": model_config.parameters,
"memory_gb": model_config.memory_gb,
"vram_gb": model_config.vram_gb,
"vocab_size": model_config.vocab_size,
"eos_token_id": model_config.eos_token_id
},
"available_models": all_models,
"total_models": len(MODEL_CONFIG)
}
except Exception as e:
logger.error(f"Error listing models: {e}")
raise HTTPException(status_code=500, detail=f"Error listing models: {e}")
@app.post("/inference", response_model=InferenceResponse)
async def inference(request: InferenceRequest):
"""Run inference with the loaded model using backend abstraction."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.")
try:
# Use the global inference backend
result = inference_backend.run_inference(
prompt=request.prompt,
max_new_tokens=request.max_new_tokens,
temperature=request.temperature
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result.get("error", "Inference failed"))
return InferenceResponse(
response=result["response"],
model_used=result["model_used"],
success=result["success"],
tokens_generated=result.get("tokens_generated", 0),
generation_params=result.get("generation_params", {})
)
except Exception as e:
logger.error(f"Inference error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/load-model")
async def load_model(model_name: str):
"""Load a specific model by name (async with progress tracking)."""
global inference_backend, model_loading_state
try:
# Check if already loading
if model_loading_state["is_loading"]:
return {
"message": f"Model loading already in progress: {model_loading_state['loading_model']}",
"loading_status": model_loading_state["loading_status"],
"loading_progress": model_loading_state["loading_progress"],
"status": "loading"
}
# Validate model name
if model_name not in MODEL_CONFIG:
available_models = list(MODEL_CONFIG.keys())
raise HTTPException(
status_code=400,
detail=f"Model '{model_name}' not found. Available models: {available_models}"
)
# Set the model name in environment
os.environ['MODEL_NAME'] = model_name
# Get new model configuration
model_info = MODEL_CONFIG[model_name]
new_model_config = get_model_config(model_name)
# Start async model switching (via restart)
asyncio.create_task(load_model_async(model_name, model_info, new_model_config))
return {
"message": f"Model switch to '{model_info['display_name']}' initiated. Service will restart to load the new model.",
"model_name": model_name,
"model_id": model_info["model_id"],
"display_name": model_info["display_name"],
"backend_type": inference_backend.backend_type,
"status": "restart_initiated",
"loading_status": "saving_preference",
"loading_progress": 10,
"note": "vLLM doesn't support runtime model switching. The service will restart with the new model."
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error starting model loading: {e}")
raise HTTPException(status_code=500, detail=f"Error starting model loading: {e}")
@app.get("/loading-status")
async def get_loading_status():
"""Get current model loading status and progress."""
global model_loading_state
# Calculate elapsed time if loading
elapsed_time = None
if model_loading_state["loading_start_time"]:
elapsed_time = time.time() - model_loading_state["loading_start_time"]
return {
"is_loading": model_loading_state["is_loading"],
"loading_model": model_loading_state["loading_model"],
"loading_progress": model_loading_state["loading_progress"],
"loading_status": model_loading_state["loading_status"],
"loading_error": model_loading_state["loading_error"],
"elapsed_time_seconds": elapsed_time,
"estimated_time_remaining": None # Could be calculated based on model size
}
@app.post("/cleanup-storage")
async def cleanup_storage():
"""Clean up persistent storage (admin endpoint)."""
try:
import shutil
if os.path.exists('/data'):
shutil.rmtree('/data')
os.makedirs('/data', exist_ok=True)
return {"message": "Storage cleaned successfully", "status": "success"}
else:
return {"message": "No persistent storage found", "status": "info"}
except Exception as e:
logger.error(f"Storage cleanup error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/sleep")
async def put_to_sleep():
"""Put the backend into sleep mode (for HuggingFace Spaces)."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend not initialized")
try:
success = inference_backend.sleep()
if success:
return {
"message": "Backend put to sleep successfully",
"status": "sleeping",
"backend": inference_backend.backend_type,
"note": "GPU memory released, ready for HuggingFace Space sleep"
}
else:
return {
"message": "Sleep mode not supported or failed",
"status": "error",
"backend": inference_backend.backend_type
}
except Exception as e:
logger.error(f"Error putting backend to sleep: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/wake")
async def wake_up():
"""Wake up the backend from sleep mode."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend not initialized")
try:
success = inference_backend.wake()
if success:
return {
"message": "Backend woken up successfully",
"status": "awake",
"backend": inference_backend.backend_type,
"note": "Ready for inference"
}
else:
return {
"message": "Wake mode not supported or failed",
"status": "error",
"backend": inference_backend.backend_type
}
except Exception as e:
logger.error(f"Error waking up backend: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/diagnose")
async def diagnose():
"""Diagnose system status and configuration."""
global inference_backend
if inference_backend is None:
return {
"python_version": sys.version,
"architecture": ARCHITECTURE,
"model_loaded": False,
"current_model": "unknown",
"backend_type": "initializing",
"available_models": list(MODEL_CONFIG.keys()),
"storage_info": storage_info,
"gpu_info": {"gpu_available": False}
}
return {
"python_version": sys.version,
"architecture": ARCHITECTURE,
"model_loaded": inference_backend.engine is not None,
"current_model": getattr(inference_backend.model_config, 'model_id', 'unknown'),
"backend_type": inference_backend.backend_type,
"available_models": list(MODEL_CONFIG.keys()),
"storage_info": storage_info,
"gpu_info": inference_backend.get_memory_info()
}
# OpenAI-Compatible Endpoints - Helper Functions
def get_stop_tokens_for_model(model_name: str) -> List[str]:
"""Get model-specific stop tokens to prevent hallucinations."""
model_stops = {
"llama3.1-8b": ["<|end_of_text|>", "<|eot_id|>", "<|endoftext|>", "\nUser:", "\nAssistant:", "\nSystem:"],
"qwen": ["<|im_end|>", "<|endoftext|>", "</s>", "\nUser:", "\nAssistant:", "\nSystem:"],
"gemma": ["<end_of_turn>", "<eos>", "</s>", "\nUser:", "\nAssistant:", "\nSystem:"],
}
model_lower = model_name.lower()
for key in model_stops:
if key in model_lower:
return model_stops[key]
# Default comprehensive stop list
return ["<|endoftext|>", "</s>", "<eos>", "\nUser:", "\nAssistant:", "\nSystem:"]
def count_tokens_in_messages(messages: List[Dict[str, str]], model_name: str) -> int:
"""Count total tokens in a list of messages."""
try:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(f"LinguaCustodia/{model_name}")
total_tokens = 0
for message in messages:
content = message.get('content', '')
total_tokens += len(tokenizer.encode(content))
return total_tokens
except Exception:
# Fallback: rough estimation (4 chars per token)
total_chars = sum(len(msg.get('content', '')) for msg in messages)
return total_chars // 4
def manage_chat_context(messages: List[Dict[str, str]], model_name: str, max_context_tokens: int = 3800) -> List[Dict[str, str]]:
"""Manage chat context to stay within token limits."""
# Count total tokens
total_tokens = count_tokens_in_messages(messages, model_name)
# If under limit, return as-is (no truncation needed)
if total_tokens <= max_context_tokens:
return messages
# Only truncate if we're significantly over the limit
# This prevents unnecessary truncation for small overages
if total_tokens <= max_context_tokens + 200: # Allow 200 token buffer
return messages
# Strategy: Keep system message + recent messages
system_msg = messages[0] if messages and messages[0].get('role') == 'system' else None
recent_messages = messages[1:] if system_msg else messages
# Keep only recent messages that fit
result = []
if system_msg:
result.append(system_msg)
current_tokens = count_tokens_in_messages([system_msg] if system_msg else [], model_name)
for message in reversed(recent_messages):
message_tokens = count_tokens_in_messages([message], model_name)
if current_tokens + message_tokens > max_context_tokens:
break
result.insert(1 if system_msg else 0, message)
current_tokens += message_tokens
# Add context truncation notice if we had to truncate
if len(result) < len(messages):
truncation_notice = {
"role": "system",
"content": f"[Context truncated: {len(messages) - len(result)} messages removed to fit token limit]"
}
result.insert(1 if system_msg else 0, truncation_notice)
return result
def format_chat_messages(messages: List[Dict[str, str]], model_name: str) -> str:
"""Format chat messages with proper template to prevent hallucinations."""
# Better prompt formatting for different models
if "llama3.1" in model_name.lower():
# Llama 3.1 chat format
prompt = "<|begin_of_text|>"
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt += f"<|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>"
elif role == "user":
prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>"
elif role == "assistant":
prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>"
prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n"
return prompt
elif "qwen" in model_name.lower():
# Qwen chat format
prompt = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt += f"<|im_start|>system\n{content}<|im_end|>\n"
elif role == "user":
prompt += f"<|im_start|>user\n{content}<|im_end|>\n"
elif role == "assistant":
prompt += f"<|im_start|>assistant\n{content}<|im_end|>\n"
prompt += "<|im_start|>assistant\n"
return prompt
elif "gemma" in model_name.lower():
# Gemma chat format
prompt = "<bos>"
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "user":
prompt += f"<start_of_turn>user\n{content}<end_of_turn>\n"
elif role == "assistant":
prompt += f"<start_of_turn>model\n{content}<end_of_turn>\n"
prompt += "<start_of_turn>model\n"
return prompt
else:
# Fallback: Simple format but with clear delimiters
prompt = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
prompt += f"### {role.capitalize()}\n{content}\n\n"
prompt += "### Assistant\n"
return prompt
async def stream_chat_completion(prompt: str, model: str, temperature: float, max_tokens: int, request_id: str):
"""Generator for streaming chat completions with TRUE delta streaming."""
try:
from vllm import SamplingParams
# Get model-specific stop tokens
stop_tokens = get_stop_tokens_for_model(model)
# Create sampling params with stop tokens
sampling_params = SamplingParams(
temperature=temperature,
max_tokens=max_tokens,
top_p=0.9,
repetition_penalty=1.1, # Increased from 1.05 to prevent repetition
stop=stop_tokens # Add stop tokens to prevent hallucinations
)
# Track previous text to send only deltas
previous_text = ""
# Stream from vLLM
for output in inference_backend.engine.generate([prompt], sampling_params, use_tqdm=False):
if output.outputs:
current_text = output.outputs[0].text
# Calculate delta (only NEW text since last iteration)
if len(current_text) > len(previous_text):
new_text = current_text[len(previous_text):]
# Format as OpenAI SSE chunk with TRUE delta
chunk = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": new_text}, # Only send NEW text
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
previous_text = current_text
# Send final chunk
final_chunk = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}")
error_chunk = {"error": str(e)}
yield f"data: {json.dumps(error_chunk)}\n\n"
@app.post("/v1/chat/completions")
async def openai_chat_completions(request: dict):
"""OpenAI-compatible chat completions endpoint with streaming support."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.")
try:
# Extract messages and parameters
messages = request.get("messages", [])
model = request.get("model", "linguacustodia")
temperature = request.get("temperature", 0.6)
max_tokens = request.get("max_tokens", 512) # Increased from 150 for better responses
stream = request.get("stream", False)
# Manage chat context to stay within token limits
managed_messages = manage_chat_context(messages, model, max_context_tokens=3800)
# Convert messages to prompt using proper chat template
prompt = format_chat_messages(managed_messages, model)
# Generate request ID
request_id = f"chatcmpl-{hash(prompt) % 10000000000}"
# Handle streaming
if stream and inference_backend.backend_type == "vllm":
return StreamingResponse(
stream_chat_completion(prompt, model, temperature, max_tokens, request_id),
media_type="text/event-stream"
)
# Non-streaming response
stop_tokens = get_stop_tokens_for_model(model)
result = inference_backend.run_inference(
prompt=prompt,
temperature=temperature,
max_new_tokens=max_tokens,
stop=stop_tokens,
repetition_penalty=1.1
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result.get("error", "Inference failed"))
# Format OpenAI response
response = {
"id": request_id,
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": result["response"]
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": len(prompt.split()),
"completion_tokens": result.get("tokens_generated", 0),
"total_tokens": len(prompt.split()) + result.get("tokens_generated", 0)
}
}
return response
except Exception as e:
logger.error(f"OpenAI chat completions error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/completions")
async def openai_completions(request: dict):
"""OpenAI-compatible completions endpoint."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend is still initializing. Please wait and try again.")
try:
# Extract parameters
prompt = request.get("prompt", "")
model = request.get("model", "linguacustodia")
temperature = request.get("temperature", 0.6)
max_tokens = request.get("max_tokens", 150)
# Run inference
result = inference_backend.run_inference(
prompt=prompt,
temperature=temperature,
max_new_tokens=max_tokens
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result.get("error", "Inference failed"))
# Format OpenAI response
response = {
"id": f"cmpl-{hash(prompt) % 10000000000}",
"object": "text_completion",
"created": int(__import__("time").time()),
"model": model,
"choices": [{
"text": result["response"],
"index": 0,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": len(prompt.split()),
"completion_tokens": result.get("tokens_generated", 0),
"total_tokens": len(prompt.split()) + result.get("tokens_generated", 0)
}
}
return response
except Exception as e:
logger.error(f"OpenAI completions error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/v1/models")
async def openai_models():
"""OpenAI-compatible models endpoint."""
try:
models = []
for model_name, config in MODEL_CONFIG.items():
models.append({
"id": config["model_id"],
"object": "model",
"created": int(time.time()),
"owned_by": "linguacustodia",
"permission": [],
"root": config["model_id"],
"parent": None
})
return {
"object": "list",
"data": models
}
except Exception as e:
logger.error(f"OpenAI models error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Analytics Endpoints
@app.get("/analytics/performance")
async def analytics_performance():
"""Get performance analytics for the inference backend."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend not initialized")
try:
memory_info = inference_backend.get_memory_info()
# Calculate performance metrics
if memory_info.get("gpu_available"):
gpu_allocated = memory_info.get("gpu_memory_allocated", 0)
gpu_reserved = memory_info.get("gpu_memory_reserved", 0)
gpu_utilization = (gpu_allocated / gpu_reserved * 100) if gpu_reserved > 0 else 0
else:
gpu_utilization = 0
return {
"backend": inference_backend.backend_type,
"model": getattr(inference_backend.model_config, 'model_id', 'unknown'),
"gpu_utilization_percent": round(gpu_utilization, 2),
"memory": {
"gpu_allocated_gb": round(memory_info.get("gpu_memory_allocated", 0) / (1024**3), 2),
"gpu_reserved_gb": round(memory_info.get("gpu_memory_reserved", 0) / (1024**3), 2),
"gpu_available": memory_info.get("gpu_available", False)
},
"platform": {
"deployment": os.getenv('DEPLOYMENT_ENV', 'huggingface'),
"hardware": "L40 GPU (48GB VRAM)" if memory_info.get("gpu_available") else "CPU"
}
}
except Exception as e:
logger.error(f"Performance analytics error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/analytics/costs")
async def analytics_costs():
"""Get token cost analytics based on LinguaCustodia pricing."""
# LinguaCustodia token pricing (estimated based on model size and hardware)
COST_PER_1K_INPUT_TOKENS = 0.0001 # $0.0001 per 1K input tokens
COST_PER_1K_OUTPUT_TOKENS = 0.0003 # $0.0003 per 1K output tokens
# Hardware costs
L40_HOURLY_COST = 1.80 # $1.80/hour for L40 GPU on HuggingFace
return {
"pricing": {
"model": "LinguaCustodia Financial Models",
"input_tokens": {
"cost_per_1k": COST_PER_1K_INPUT_TOKENS,
"currency": "USD"
},
"output_tokens": {
"cost_per_1k": COST_PER_1K_OUTPUT_TOKENS,
"currency": "USD"
}
},
"hardware": {
"type": "L40 GPU (48GB VRAM)",
"cost_per_hour": L40_HOURLY_COST,
"cost_per_day": round(L40_HOURLY_COST * 24, 2),
"cost_per_month": round(L40_HOURLY_COST * 24 * 30, 2),
"currency": "USD"
},
"examples": {
"100k_tokens_input": f"${round(COST_PER_1K_INPUT_TOKENS * 100, 4)}",
"100k_tokens_output": f"${round(COST_PER_1K_OUTPUT_TOKENS * 100, 4)}",
"1m_tokens_total": f"${round((COST_PER_1K_INPUT_TOKENS + COST_PER_1K_OUTPUT_TOKENS) * 500, 2)}"
},
"note": "Costs are estimates. Actual costs may vary based on usage patterns and model selection."
}
@app.get("/analytics/usage")
async def analytics_usage():
"""Get usage statistics for the API."""
global inference_backend
if inference_backend is None:
raise HTTPException(status_code=503, detail="Backend not initialized")
try:
memory_info = inference_backend.get_memory_info()
# Get current model info
model_config = inference_backend.model_config
model_id = getattr(model_config, 'model_id', 'unknown')
return {
"current_session": {
"model_loaded": inference_backend.engine is not None,
"model_id": model_id,
"backend": inference_backend.backend_type,
"uptime_status": "running"
},
"capabilities": {
"streaming": inference_backend.backend_type == "vllm",
"openai_compatible": True,
"max_context_length": 2048 if inference_backend.backend_type == "vllm" else 4096,
"supported_endpoints": [
"/v1/chat/completions",
"/v1/completions",
"/v1/models"
]
},
"performance": {
"gpu_available": memory_info.get("gpu_available", False),
"backend_optimizations": "vLLM with eager mode" if inference_backend.backend_type == "vllm" else "Transformers"
},
"note": "This API provides real-time access to LinguaCustodia financial AI models with OpenAI-compatible interface."
}
except Exception as e:
logger.error(f"Usage analytics error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
port = int(os.getenv("APP_PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)