simple-chat / app.py
alex4cip's picture
docs: Update README with multi-environment support and remove redundant footer
f1ac66c
"""
Multi-environment chatbot: Detects and adapts to different hardware environments
Supports: Local (Mac/Linux/Windows), HF Spaces (CPU Basic/Upgrade, ZeroGPU)
"""
import os
import platform
# IMPORTANT: Import spaces FIRST before any CUDA-related packages (torch, transformers)
# This prevents "CUDA has been initialized" error on ZeroGPU
try:
import spaces
ZEROGPU_AVAILABLE = True
except ImportError:
ZEROGPU_AVAILABLE = False
# Now safe to import CUDA-related packages
import gradio as gr
from transformers import AutoModelForCausalLM, AutoTokenizer
from huggingface_hub import snapshot_download
import torch
# ============================================================================
# Hardware Environment Detection
# ============================================================================
def test_cuda_compatibility():
"""
Test if CUDA actually works on this GPU.
Returns: True if CUDA works, False otherwise
Note: RTX 5080 and other Blackwell GPUs (sm_120) are supported with PyTorch nightly builds (CUDA 12.8+)
"""
if not torch.cuda.is_available():
return False
try:
# Try a simple tensor operation to verify CUDA works
x = torch.randn(10, 10).cuda()
y = torch.randn(10, 10).cuda()
z = torch.matmul(x, y)
z.cpu()
return True
except Exception as e:
print(f"⚠️ CUDA test failed: {e}")
print(f" Will fall back to CPU mode")
return False
def detect_hardware_environment():
"""
Comprehensive hardware environment detection
Returns:
dict: {
'platform': 'hf_spaces' | 'local',
'hardware': 'zerogpu' | 'cpu_upgrade' | 'cpu_basic' | 'local_gpu' | 'local_cpu',
'gpu_available': bool,
'gpu_name': str or None,
'cpu_count': int,
'os': 'Darwin' | 'Linux' | 'Windows',
'description': str,
'cuda_compatible': bool
}
"""
env_info = {
'platform': 'local',
'hardware': 'local_cpu',
'gpu_available': False,
'gpu_name': None,
'cpu_count': os.cpu_count() or 1,
'os': platform.system(),
'description': '',
'cuda_compatible': False
}
# Check if running on HF Spaces
is_hf_spaces = os.environ.get('SPACE_ID') is not None
if is_hf_spaces:
env_info['platform'] = 'hf_spaces'
space_id = os.environ.get('SPACE_ID', 'unknown')
# Check for ZeroGPU using already-imported status
if ZEROGPU_AVAILABLE:
env_info['hardware'] = 'zerogpu'
env_info['gpu_available'] = True
env_info['gpu_name'] = 'NVIDIA H200 (ZeroGPU)'
env_info['description'] = f"🚀 HF Spaces - ZeroGPU ({space_id})"
env_info['cuda_compatible'] = True
else:
# Check CPU tier by memory/CPU count
cpu_count = env_info['cpu_count']
if cpu_count >= 8:
env_info['hardware'] = 'cpu_upgrade'
env_info['description'] = f"⚙️ HF Spaces - CPU Upgrade ({cpu_count} vCPU, 32GB RAM)"
else:
env_info['hardware'] = 'cpu_basic'
env_info['description'] = f"💻 HF Spaces - CPU Basic ({cpu_count} vCPU, 16GB RAM)"
else:
# Local environment detection
if torch.cuda.is_available():
# CUDA is available, test if it actually works
cuda_works = test_cuda_compatibility()
try:
gpu_name = torch.cuda.get_device_name(0)
except:
gpu_name = 'CUDA GPU'
if cuda_works:
env_info['hardware'] = 'local_gpu'
env_info['gpu_available'] = True
env_info['gpu_name'] = gpu_name
env_info['description'] = f"🖥️ Local - GPU ({gpu_name})"
env_info['cuda_compatible'] = True
else:
# CUDA detected but tensor operations failed
env_info['hardware'] = 'local_cpu'
env_info['gpu_available'] = False
env_info['gpu_name'] = gpu_name + " (CUDA error - using CPU)"
env_info['description'] = f"⚠️ Local - CPU fallback ({gpu_name} CUDA error)"
env_info['cuda_compatible'] = False
elif torch.backends.mps.is_available():
env_info['hardware'] = 'local_gpu'
env_info['gpu_available'] = True
env_info['gpu_name'] = 'Apple Silicon GPU (MPS)'
env_info['description'] = f"🍎 Local - Apple Silicon GPU"
env_info['cuda_compatible'] = False
else:
env_info['hardware'] = 'local_cpu'
env_info['description'] = f"💻 Local - CPU ({env_info['os']}, {env_info['cpu_count']} cores)"
env_info['cuda_compatible'] = False
return env_info
# Detect hardware environment
HW_ENV = detect_hardware_environment()
# Note: ZEROGPU_AVAILABLE already set at import time to prevent CUDA initialization errors
# Print environment info
print("=" * 60)
print("Hardware Environment Detection")
print("=" * 60)
print(f"Platform: {HW_ENV['platform']}")
print(f"Hardware: {HW_ENV['hardware']}")
print(f"GPU Available: {HW_ENV['gpu_available']}")
if HW_ENV['gpu_name']:
print(f"GPU Name: {HW_ENV['gpu_name']}")
print(f"CPU Cores: {HW_ENV['cpu_count']}")
print(f"OS: {HW_ENV['os']}")
print(f"Description: {HW_ENV['description']}")
print("=" * 60)
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv() # Load .env file into environment
print("✅ .env file loaded")
except ImportError:
print("⚠️ python-dotenv not installed, using system environment variables only")
# Get HF token from environment
HF_TOKEN = os.getenv("HF_TOKEN", None)
if HF_TOKEN:
print(f"✅ HF_TOKEN loaded (length: {len(HF_TOKEN)} chars)")
else:
print("⚠️ HF_TOKEN not found in environment - some models may not be accessible")
# Model configurations
# Note: Gated models (marked with 🔒) require HF access approval at https://huggingface.co/[model-name]
MODEL_CONFIGS = [
{
"MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct",
"MODEL_CONFIG": {
"name": "EXAONE 3.5 7.8B Instruct ⭐ (파라미터 대비 최고 효율)",
"max_length": 150,
},
},
{
"MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct",
"MODEL_CONFIG": {
"name": "EXAONE 3.5 2.4B Instruct ⚡ (초경량, 빠른 응답)",
"max_length": 150,
},
},
{
"MODEL_NAME": "beomi/Llama-3-Open-Ko-8B",
"MODEL_CONFIG": {
"name": "Llama-3 Open-Ko 8B 🔥 (Llama 3 생태계)",
"max_length": 150,
},
},
{
"MODEL_NAME": "Qwen/Qwen2.5-7B-Instruct",
"MODEL_CONFIG": {
"name": "Qwen2.5 7B Instruct (한글 지시응답 우수)",
"max_length": 150,
},
},
{
"MODEL_NAME": "Qwen/Qwen2.5-14B-Instruct",
"MODEL_CONFIG": {
"name": "Qwen2.5 14B Instruct (다국어·한글 강점, 여유 GPU 권장)",
"max_length": 150,
},
},
{
"MODEL_NAME": "meta-llama/Llama-3.1-8B-Instruct",
"MODEL_CONFIG": {
"name": "Llama 3.1 8B Instruct 🔒 (커뮤니티 Ko 튜닝 활발, 승인 필요)",
"max_length": 150,
},
},
{
"MODEL_NAME": "meta-llama/Llama-3.1-70B-Instruct",
"MODEL_CONFIG": {
"name": "Llama 3.1 70B Instruct 🔒 (대규모·한글 품질 우수, 승인 필요)",
"max_length": 150,
},
},
{
"MODEL_NAME": "01-ai/Yi-1.5-9B-Chat",
"MODEL_CONFIG": {
"name": "Yi 1.5 9B Chat (다국어/한글 안정적 대화)",
"max_length": 150,
},
},
{
"MODEL_NAME": "01-ai/Yi-1.5-34B-Chat",
"MODEL_CONFIG": {
"name": "Yi 1.5 34B Chat (긴 문맥·한글 생성 강점)",
"max_length": 150,
},
},
{
"MODEL_NAME": "mistralai/Mistral-7B-Instruct-v0.3",
"MODEL_CONFIG": {
"name": "Mistral 7B Instruct v0.3 (경량·한글 커뮤니티 튜닝 多)",
"max_length": 150,
},
},
{
"MODEL_NAME": "upstage/SOLAR-10.7B-Instruct-v1.0",
"MODEL_CONFIG": {
"name": "Solar 10.7B Instruct v1.0 (한국어 강점, 실전 지시응답)",
"max_length": 150,
},
},
{
"MODEL_NAME": "EleutherAI/polyglot-ko-5.8b",
"MODEL_CONFIG": {
"name": "Polyglot-Ko 5.8B (한국어 중심 베이스)",
"max_length": 150,
},
},
{
"MODEL_NAME": "CohereForAI/aya-23-8B",
"MODEL_CONFIG": {
"name": "Aya-23 8B 🔒 (다국어·한국어 지원 양호, 승인 필요)",
"max_length": 150,
},
},
]
# Default model
current_model_index = 0
loaded_model_name = None # Track which model is currently loaded
# Global model cache
model = None
tokenizer = None
# Dynamic model count
TOTAL_MODEL_COUNT = len(MODEL_CONFIGS)
PUBLIC_MODEL_COUNT = sum(1 for cfg in MODEL_CONFIGS if "🔒" not in cfg["MODEL_CONFIG"]["name"])
GATED_MODEL_COUNT = TOTAL_MODEL_COUNT - PUBLIC_MODEL_COUNT
def check_model_cached(model_name):
"""Check if model is already downloaded in HF cache"""
try:
from huggingface_hub import scan_cache_dir
cache_info = scan_cache_dir()
# Check if model exists in cache
for repo in cache_info.repos:
if repo.repo_id == model_name:
return True
return False
except Exception as e:
# If unable to check cache, assume not cached
print(f" ⚠️ Unable to check cache: {e}")
return False
def load_model_once(model_index=None):
"""Load model and tokenizer based on selected index (lazy loading)"""
global model, tokenizer, current_model_index, loaded_model_name
if model_index is None:
model_index = current_model_index
# Get model config
model_name = MODEL_CONFIGS[model_index]["MODEL_NAME"]
# Check if we need to reload (different model or not loaded yet)
if loaded_model_name != model_name:
print(f"🔄 Loading model: {model_name}")
print(f" Previous model: {loaded_model_name or 'None'}")
# Check if model is already cached
is_cached = check_model_cached(model_name)
if is_cached:
print(f" ✅ Model found in cache, loading from disk...")
else:
print(f" 📥 Model not in cache, will download (~4-14GB depending on model)...")
# Clear previous model
if model is not None:
print(f" 🗑️ Unloading previous model from memory...")
del model
del tokenizer
if HW_ENV['cuda_compatible']:
torch.cuda.empty_cache()
# Load tokenizer
print(f" 📝 Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(
model_name,
token=HF_TOKEN,
trust_remote_code=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Detect device - use hardware environment detection
use_gpu = HW_ENV['gpu_available'] and HW_ENV['cuda_compatible']
device = "cuda" if use_gpu else "cpu"
print(f"📍 Using device: {device}")
# Load model with appropriate settings
if is_cached:
print(f" 📀 Loading model from disk cache (15-30 seconds)...")
else:
print(f" 🌐 Downloading model from network (5-20 minutes, first time only)...")
if device == "cuda":
# GPU available and compatible
model = AutoModelForCausalLM.from_pretrained(
model_name,
token=HF_TOKEN,
dtype=torch.float16, # Use float16 for GPU
low_cpu_mem_usage=True,
trust_remote_code=True,
device_map="auto",
)
else:
# CPU only
model = AutoModelForCausalLM.from_pretrained(
model_name,
token=HF_TOKEN,
dtype=torch.float32, # Use float32 for CPU
low_cpu_mem_usage=True,
trust_remote_code=True,
)
model.to(device)
model.eval()
current_model_index = model_index
loaded_model_name = model_name
print(f"✅ Model {model_name} loaded successfully")
else:
print(f"ℹ️ Model {model_name} already loaded, reusing...")
return model, tokenizer
def generate_response_impl(message, history):
"""Core generation logic (same for both ZeroGPU and CPU)"""
if not message or not message.strip():
return history
try:
# Ensure model is loaded
current_model, current_tokenizer = load_model_once()
if current_model is None or current_tokenizer is None:
return history + [{"role": "assistant", "content": "❌ 모델을 로드할 수 없습니다."}]
# Get device
device = next(current_model.parameters()).device
# Build conversation context (last 3 turns)
conversation = ""
for msg in history[-6:]: # Last 3 turns (6 messages: 3 user + 3 assistant)
if msg["role"] == "user":
conversation += f"사용자: {msg['content']}\n"
elif msg["role"] == "assistant":
conversation += f"어시스턴트: {msg['content']}\n"
conversation += f"사용자: {message}\n어시스턴트:"
# Tokenize with attention_mask
encoded = current_tokenizer(
conversation,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True,
)
inputs = encoded['input_ids'].to(device)
attention_mask = encoded['attention_mask'].to(device)
# Get current model config
model_config = MODEL_CONFIGS[current_model_index]["MODEL_CONFIG"]
# Generate response
with torch.no_grad():
outputs = current_model.generate(
inputs,
attention_mask=attention_mask,
max_new_tokens=model_config["max_length"],
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=current_tokenizer.pad_token_id,
eos_token_id=current_tokenizer.eos_token_id,
)
# Decode response
full_response = current_tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract only the assistant's response
if "어시스턴트:" in full_response:
response = full_response.split("어시스턴트:")[-1].strip()
else:
response = full_response[len(conversation):].strip()
if not response:
response = "죄송합니다. 응답을 생성할 수 없었습니다."
return history + [{"role": "assistant", "content": response}]
except Exception as e:
import traceback
error_msg = str(e)
print("=" * 50)
print(f"Error: {error_msg}")
print(traceback.format_exc())
print("=" * 50)
return history + [{"role": "assistant", "content": f"❌ 오류: {error_msg[:200]}"}]
# Conditionally apply ZeroGPU decorator
if ZEROGPU_AVAILABLE:
@spaces.GPU(duration=120)
def generate_response(message, history):
"""GPU-accelerated response generation (ZeroGPU mode)"""
return generate_response_impl(message, history)
else:
def generate_response(message, history):
"""Standard response generation (CPU Upgrade mode)"""
return generate_response_impl(message, history)
def chat_wrapper(message, history):
"""Wrapper for Gradio ChatInterface"""
# When type="messages", history includes user message already from Gradio
# So we add it first, then generate response
updated_history = history + [{"role": "user", "content": message}]
response_history = generate_response(message, updated_history)
return response_history
print(f"✅ App initialized - {HW_ENV['description']}")
# Custom CSS for button alignment
custom_css = """
.input-row {
align-items: center !important;
}
.input-row > div:last-child button {
height: 100% !important;
min-height: 42px !important;
}
"""
# Create Gradio interface
with gr.Blocks(title="🤖 Multi-Model Chatbot", css=custom_css) as demo:
# Dynamic header based on hardware environment
header = f"""
# 🤖 다중 모델 챗봇 {HW_ENV['description']}
**환경 정보**:
- **플랫폼**: {HW_ENV['platform'].upper().replace('_', ' ')}
- **하드웨어**: {HW_ENV['hardware'].upper().replace('_', ' ')}
- **GPU**: {'✅ ' + HW_ENV['gpu_name'] if HW_ENV['gpu_available'] else '❌ CPU only'}
- **CPU 코어**: {HW_ENV['cpu_count']}
- **운영체제**: {HW_ENV['os']}
**모델 선택**:
- 🎯 {TOTAL_MODEL_COUNT}가지 한글 최적화 모델 ({PUBLIC_MODEL_COUNT} Public + {GATED_MODEL_COUNT} Gated)
- 🔄 모델 전환 시 자동 재로딩 (채팅 히스토리 초기화)
- ⏱️ 첫 응답은 모델 로딩 시간 포함
**테스트 예시**:
- "안녕하세요"
- "인공지능에 대해 설명해주세요"
- "한국의 수도는 어디인가요?"
"""
# Add hardware-specific features
if HW_ENV['hardware'] == 'zerogpu':
header += """
**ZeroGPU 특징**:
- ⚡ 초고속 응답 (3-5초, GPU 가속)
- 🚀 NVIDIA H200 자동 할당
- 💰 PRO 구독 시 하루 25분 무료
"""
elif HW_ENV['hardware'] == 'cpu_upgrade':
header += """
**CPU Upgrade 특징**:
- ⏰ 무제한 사용 시간
- ⏳ CPU 환경 (응답 30초~1분)
- 💰 시간당 $0.03 (월 약 $22)
"""
elif HW_ENV['hardware'] == 'cpu_basic':
header += """
**CPU Basic 특징**:
- 💡 무료 티어
- ⏳ CPU 환경 (응답 1~2분)
- 🔒 경량 모델 권장 (EXAONE 2.4B, Mistral 7B)
"""
elif HW_ENV['hardware'] == 'local_gpu':
header += f"""
**로컬 GPU 특징**:
- 🖥️ 개인 GPU: {HW_ENV['gpu_name']}
- ⚡ 빠른 응답 (GPU 가속)
- 🔓 무제한 사용
"""
else: # local_cpu
header += """
**로컬 CPU 특징**:
- 💻 로컬 개발 환경
- ⏳ CPU 환경 (느린 응답)
- 🔒 경량 모델 권장
"""
gr.Markdown(header)
# Model selector
model_choices = [f"{cfg['MODEL_CONFIG']['name']}" for cfg in MODEL_CONFIGS]
model_dropdown = gr.Dropdown(
choices=model_choices,
value=model_choices[0],
label="🤖 모델 선택",
interactive=True,
)
chatbot = gr.Chatbot(height=400, type="messages", show_label=False)
with gr.Row(elem_classes="input-row"):
msg = gr.Textbox(
placeholder="한글로 메시지를 입력하세요...",
show_label=False,
scale=9,
container=False,
)
btn = gr.Button("전송", scale=1, variant="primary", min_width=80)
clear = gr.Button("🗑️ 대화 초기화", size="sm")
def change_model(selected_model):
"""Handle model change"""
global current_model_index
# Find index of selected model
for idx, cfg in enumerate(MODEL_CONFIGS):
if cfg['MODEL_CONFIG']['name'] == selected_model:
current_model_index = idx
break
# Clear chat history when changing model
return []
def submit(message, history):
global loaded_model_name, current_model_index
# Immediately show user message
updated_history = history + [{"role": "user", "content": message}]
yield updated_history, ""
# Check if model needs to be loaded
selected_model_name = MODEL_CONFIGS[current_model_index]["MODEL_NAME"]
if loaded_model_name != selected_model_name:
# Check if model is cached
is_cached = check_model_cached(selected_model_name)
if is_cached:
# Model is cached, just loading from disk
loading_history = updated_history + [{"role": "assistant", "content": "💾 캐시된 모델 디스크에서 로딩 중... (15-30초, 다운로드 안 함)"}]
else:
# Model needs to be downloaded
loading_history = updated_history + [{"role": "assistant", "content": "📥 모델 다운로드 시작... (4-14GB, 첫 사용 시 5-20분 소요)"}]
yield loading_history, ""
else:
# Show "thinking" indicator
thinking_history = updated_history + [{"role": "assistant", "content": "🤔 응답 생성 중..."}]
yield thinking_history, ""
# Generate and add bot response (this will load model if needed)
final_history = chat_wrapper(message, history)
yield final_history, ""
# Event handlers
model_dropdown.change(change_model, inputs=[model_dropdown], outputs=[chatbot])
btn.click(submit, [msg, chatbot], [chatbot, msg])
msg.submit(submit, [msg, chatbot], [chatbot, msg])
clear.click(lambda: [], outputs=chatbot)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)