Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Multi-environment chatbot: Detects and adapts to different hardware environments | |
| Supports: Local (Mac/Linux/Windows), HF Spaces (CPU Basic/Upgrade, ZeroGPU) | |
| """ | |
| import os | |
| import platform | |
| # IMPORTANT: Import spaces FIRST before any CUDA-related packages (torch, transformers) | |
| # This prevents "CUDA has been initialized" error on ZeroGPU | |
| try: | |
| import spaces | |
| ZEROGPU_AVAILABLE = True | |
| except ImportError: | |
| ZEROGPU_AVAILABLE = False | |
| # Now safe to import CUDA-related packages | |
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from huggingface_hub import snapshot_download | |
| import torch | |
| # ============================================================================ | |
| # Hardware Environment Detection | |
| # ============================================================================ | |
| def test_cuda_compatibility(): | |
| """ | |
| Test if CUDA actually works on this GPU. | |
| Returns: True if CUDA works, False otherwise | |
| Note: RTX 5080 and other Blackwell GPUs (sm_120) are supported with PyTorch nightly builds (CUDA 12.8+) | |
| """ | |
| if not torch.cuda.is_available(): | |
| return False | |
| try: | |
| # Try a simple tensor operation to verify CUDA works | |
| x = torch.randn(10, 10).cuda() | |
| y = torch.randn(10, 10).cuda() | |
| z = torch.matmul(x, y) | |
| z.cpu() | |
| return True | |
| except Exception as e: | |
| print(f"⚠️ CUDA test failed: {e}") | |
| print(f" Will fall back to CPU mode") | |
| return False | |
| def detect_hardware_environment(): | |
| """ | |
| Comprehensive hardware environment detection | |
| Returns: | |
| dict: { | |
| 'platform': 'hf_spaces' | 'local', | |
| 'hardware': 'zerogpu' | 'cpu_upgrade' | 'cpu_basic' | 'local_gpu' | 'local_cpu', | |
| 'gpu_available': bool, | |
| 'gpu_name': str or None, | |
| 'cpu_count': int, | |
| 'os': 'Darwin' | 'Linux' | 'Windows', | |
| 'description': str, | |
| 'cuda_compatible': bool | |
| } | |
| """ | |
| env_info = { | |
| 'platform': 'local', | |
| 'hardware': 'local_cpu', | |
| 'gpu_available': False, | |
| 'gpu_name': None, | |
| 'cpu_count': os.cpu_count() or 1, | |
| 'os': platform.system(), | |
| 'description': '', | |
| 'cuda_compatible': False | |
| } | |
| # Check if running on HF Spaces | |
| is_hf_spaces = os.environ.get('SPACE_ID') is not None | |
| if is_hf_spaces: | |
| env_info['platform'] = 'hf_spaces' | |
| space_id = os.environ.get('SPACE_ID', 'unknown') | |
| # Check for ZeroGPU using already-imported status | |
| if ZEROGPU_AVAILABLE: | |
| env_info['hardware'] = 'zerogpu' | |
| env_info['gpu_available'] = True | |
| env_info['gpu_name'] = 'NVIDIA H200 (ZeroGPU)' | |
| env_info['description'] = f"🚀 HF Spaces - ZeroGPU ({space_id})" | |
| env_info['cuda_compatible'] = True | |
| else: | |
| # Check CPU tier by memory/CPU count | |
| cpu_count = env_info['cpu_count'] | |
| if cpu_count >= 8: | |
| env_info['hardware'] = 'cpu_upgrade' | |
| env_info['description'] = f"⚙️ HF Spaces - CPU Upgrade ({cpu_count} vCPU, 32GB RAM)" | |
| else: | |
| env_info['hardware'] = 'cpu_basic' | |
| env_info['description'] = f"💻 HF Spaces - CPU Basic ({cpu_count} vCPU, 16GB RAM)" | |
| else: | |
| # Local environment detection | |
| if torch.cuda.is_available(): | |
| # CUDA is available, test if it actually works | |
| cuda_works = test_cuda_compatibility() | |
| try: | |
| gpu_name = torch.cuda.get_device_name(0) | |
| except: | |
| gpu_name = 'CUDA GPU' | |
| if cuda_works: | |
| env_info['hardware'] = 'local_gpu' | |
| env_info['gpu_available'] = True | |
| env_info['gpu_name'] = gpu_name | |
| env_info['description'] = f"🖥️ Local - GPU ({gpu_name})" | |
| env_info['cuda_compatible'] = True | |
| else: | |
| # CUDA detected but tensor operations failed | |
| env_info['hardware'] = 'local_cpu' | |
| env_info['gpu_available'] = False | |
| env_info['gpu_name'] = gpu_name + " (CUDA error - using CPU)" | |
| env_info['description'] = f"⚠️ Local - CPU fallback ({gpu_name} CUDA error)" | |
| env_info['cuda_compatible'] = False | |
| elif torch.backends.mps.is_available(): | |
| env_info['hardware'] = 'local_gpu' | |
| env_info['gpu_available'] = True | |
| env_info['gpu_name'] = 'Apple Silicon GPU (MPS)' | |
| env_info['description'] = f"🍎 Local - Apple Silicon GPU" | |
| env_info['cuda_compatible'] = False | |
| else: | |
| env_info['hardware'] = 'local_cpu' | |
| env_info['description'] = f"💻 Local - CPU ({env_info['os']}, {env_info['cpu_count']} cores)" | |
| env_info['cuda_compatible'] = False | |
| return env_info | |
| # Detect hardware environment | |
| HW_ENV = detect_hardware_environment() | |
| # Note: ZEROGPU_AVAILABLE already set at import time to prevent CUDA initialization errors | |
| # Print environment info | |
| print("=" * 60) | |
| print("Hardware Environment Detection") | |
| print("=" * 60) | |
| print(f"Platform: {HW_ENV['platform']}") | |
| print(f"Hardware: {HW_ENV['hardware']}") | |
| print(f"GPU Available: {HW_ENV['gpu_available']}") | |
| if HW_ENV['gpu_name']: | |
| print(f"GPU Name: {HW_ENV['gpu_name']}") | |
| print(f"CPU Cores: {HW_ENV['cpu_count']}") | |
| print(f"OS: {HW_ENV['os']}") | |
| print(f"Description: {HW_ENV['description']}") | |
| print("=" * 60) | |
| # Load environment variables from .env file | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() # Load .env file into environment | |
| print("✅ .env file loaded") | |
| except ImportError: | |
| print("⚠️ python-dotenv not installed, using system environment variables only") | |
| # Get HF token from environment | |
| HF_TOKEN = os.getenv("HF_TOKEN", None) | |
| if HF_TOKEN: | |
| print(f"✅ HF_TOKEN loaded (length: {len(HF_TOKEN)} chars)") | |
| else: | |
| print("⚠️ HF_TOKEN not found in environment - some models may not be accessible") | |
| # Model configurations | |
| # Note: Gated models (marked with 🔒) require HF access approval at https://huggingface.co/[model-name] | |
| MODEL_CONFIGS = [ | |
| { | |
| "MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "EXAONE 3.5 7.8B Instruct ⭐ (파라미터 대비 최고 효율)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "EXAONE 3.5 2.4B Instruct ⚡ (초경량, 빠른 응답)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "beomi/Llama-3-Open-Ko-8B", | |
| "MODEL_CONFIG": { | |
| "name": "Llama-3 Open-Ko 8B 🔥 (Llama 3 생태계)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "Qwen/Qwen2.5-7B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "Qwen2.5 7B Instruct (한글 지시응답 우수)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "Qwen/Qwen2.5-14B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "Qwen2.5 14B Instruct (다국어·한글 강점, 여유 GPU 권장)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "meta-llama/Llama-3.1-8B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "Llama 3.1 8B Instruct 🔒 (커뮤니티 Ko 튜닝 활발, 승인 필요)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "meta-llama/Llama-3.1-70B-Instruct", | |
| "MODEL_CONFIG": { | |
| "name": "Llama 3.1 70B Instruct 🔒 (대규모·한글 품질 우수, 승인 필요)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "01-ai/Yi-1.5-9B-Chat", | |
| "MODEL_CONFIG": { | |
| "name": "Yi 1.5 9B Chat (다국어/한글 안정적 대화)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "01-ai/Yi-1.5-34B-Chat", | |
| "MODEL_CONFIG": { | |
| "name": "Yi 1.5 34B Chat (긴 문맥·한글 생성 강점)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "MODEL_CONFIG": { | |
| "name": "Mistral 7B Instruct v0.3 (경량·한글 커뮤니티 튜닝 多)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "upstage/SOLAR-10.7B-Instruct-v1.0", | |
| "MODEL_CONFIG": { | |
| "name": "Solar 10.7B Instruct v1.0 (한국어 강점, 실전 지시응답)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "EleutherAI/polyglot-ko-5.8b", | |
| "MODEL_CONFIG": { | |
| "name": "Polyglot-Ko 5.8B (한국어 중심 베이스)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| { | |
| "MODEL_NAME": "CohereForAI/aya-23-8B", | |
| "MODEL_CONFIG": { | |
| "name": "Aya-23 8B 🔒 (다국어·한국어 지원 양호, 승인 필요)", | |
| "max_length": 150, | |
| }, | |
| }, | |
| ] | |
| # Default model | |
| current_model_index = 0 | |
| loaded_model_name = None # Track which model is currently loaded | |
| # Global model cache | |
| model = None | |
| tokenizer = None | |
| # Dynamic model count | |
| TOTAL_MODEL_COUNT = len(MODEL_CONFIGS) | |
| PUBLIC_MODEL_COUNT = sum(1 for cfg in MODEL_CONFIGS if "🔒" not in cfg["MODEL_CONFIG"]["name"]) | |
| GATED_MODEL_COUNT = TOTAL_MODEL_COUNT - PUBLIC_MODEL_COUNT | |
| def check_model_cached(model_name): | |
| """Check if model is already downloaded in HF cache""" | |
| try: | |
| from huggingface_hub import scan_cache_dir | |
| cache_info = scan_cache_dir() | |
| # Check if model exists in cache | |
| for repo in cache_info.repos: | |
| if repo.repo_id == model_name: | |
| return True | |
| return False | |
| except Exception as e: | |
| # If unable to check cache, assume not cached | |
| print(f" ⚠️ Unable to check cache: {e}") | |
| return False | |
| def load_model_once(model_index=None): | |
| """Load model and tokenizer based on selected index (lazy loading)""" | |
| global model, tokenizer, current_model_index, loaded_model_name | |
| if model_index is None: | |
| model_index = current_model_index | |
| # Get model config | |
| model_name = MODEL_CONFIGS[model_index]["MODEL_NAME"] | |
| # Check if we need to reload (different model or not loaded yet) | |
| if loaded_model_name != model_name: | |
| print(f"🔄 Loading model: {model_name}") | |
| print(f" Previous model: {loaded_model_name or 'None'}") | |
| # Check if model is already cached | |
| is_cached = check_model_cached(model_name) | |
| if is_cached: | |
| print(f" ✅ Model found in cache, loading from disk...") | |
| else: | |
| print(f" 📥 Model not in cache, will download (~4-14GB depending on model)...") | |
| # Clear previous model | |
| if model is not None: | |
| print(f" 🗑️ Unloading previous model from memory...") | |
| del model | |
| del tokenizer | |
| if HW_ENV['cuda_compatible']: | |
| torch.cuda.empty_cache() | |
| # Load tokenizer | |
| print(f" 📝 Loading tokenizer...") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN, | |
| trust_remote_code=True, | |
| ) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Detect device - use hardware environment detection | |
| use_gpu = HW_ENV['gpu_available'] and HW_ENV['cuda_compatible'] | |
| device = "cuda" if use_gpu else "cpu" | |
| print(f"📍 Using device: {device}") | |
| # Load model with appropriate settings | |
| if is_cached: | |
| print(f" 📀 Loading model from disk cache (15-30 seconds)...") | |
| else: | |
| print(f" 🌐 Downloading model from network (5-20 minutes, first time only)...") | |
| if device == "cuda": | |
| # GPU available and compatible | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN, | |
| dtype=torch.float16, # Use float16 for GPU | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True, | |
| device_map="auto", | |
| ) | |
| else: | |
| # CPU only | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| token=HF_TOKEN, | |
| dtype=torch.float32, # Use float32 for CPU | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True, | |
| ) | |
| model.to(device) | |
| model.eval() | |
| current_model_index = model_index | |
| loaded_model_name = model_name | |
| print(f"✅ Model {model_name} loaded successfully") | |
| else: | |
| print(f"ℹ️ Model {model_name} already loaded, reusing...") | |
| return model, tokenizer | |
| def generate_response_impl(message, history): | |
| """Core generation logic (same for both ZeroGPU and CPU)""" | |
| if not message or not message.strip(): | |
| return history | |
| try: | |
| # Ensure model is loaded | |
| current_model, current_tokenizer = load_model_once() | |
| if current_model is None or current_tokenizer is None: | |
| return history + [{"role": "assistant", "content": "❌ 모델을 로드할 수 없습니다."}] | |
| # Get device | |
| device = next(current_model.parameters()).device | |
| # Build conversation context (last 3 turns) | |
| conversation = "" | |
| for msg in history[-6:]: # Last 3 turns (6 messages: 3 user + 3 assistant) | |
| if msg["role"] == "user": | |
| conversation += f"사용자: {msg['content']}\n" | |
| elif msg["role"] == "assistant": | |
| conversation += f"어시스턴트: {msg['content']}\n" | |
| conversation += f"사용자: {message}\n어시스턴트:" | |
| # Tokenize with attention_mask | |
| encoded = current_tokenizer( | |
| conversation, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True, | |
| ) | |
| inputs = encoded['input_ids'].to(device) | |
| attention_mask = encoded['attention_mask'].to(device) | |
| # Get current model config | |
| model_config = MODEL_CONFIGS[current_model_index]["MODEL_CONFIG"] | |
| # Generate response | |
| with torch.no_grad(): | |
| outputs = current_model.generate( | |
| inputs, | |
| attention_mask=attention_mask, | |
| max_new_tokens=model_config["max_length"], | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=current_tokenizer.pad_token_id, | |
| eos_token_id=current_tokenizer.eos_token_id, | |
| ) | |
| # Decode response | |
| full_response = current_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Extract only the assistant's response | |
| if "어시스턴트:" in full_response: | |
| response = full_response.split("어시스턴트:")[-1].strip() | |
| else: | |
| response = full_response[len(conversation):].strip() | |
| if not response: | |
| response = "죄송합니다. 응답을 생성할 수 없었습니다." | |
| return history + [{"role": "assistant", "content": response}] | |
| except Exception as e: | |
| import traceback | |
| error_msg = str(e) | |
| print("=" * 50) | |
| print(f"Error: {error_msg}") | |
| print(traceback.format_exc()) | |
| print("=" * 50) | |
| return history + [{"role": "assistant", "content": f"❌ 오류: {error_msg[:200]}"}] | |
| # Conditionally apply ZeroGPU decorator | |
| if ZEROGPU_AVAILABLE: | |
| def generate_response(message, history): | |
| """GPU-accelerated response generation (ZeroGPU mode)""" | |
| return generate_response_impl(message, history) | |
| else: | |
| def generate_response(message, history): | |
| """Standard response generation (CPU Upgrade mode)""" | |
| return generate_response_impl(message, history) | |
| def chat_wrapper(message, history): | |
| """Wrapper for Gradio ChatInterface""" | |
| # When type="messages", history includes user message already from Gradio | |
| # So we add it first, then generate response | |
| updated_history = history + [{"role": "user", "content": message}] | |
| response_history = generate_response(message, updated_history) | |
| return response_history | |
| print(f"✅ App initialized - {HW_ENV['description']}") | |
| # Custom CSS for button alignment | |
| custom_css = """ | |
| .input-row { | |
| align-items: center !important; | |
| } | |
| .input-row > div:last-child button { | |
| height: 100% !important; | |
| min-height: 42px !important; | |
| } | |
| """ | |
| # Create Gradio interface | |
| with gr.Blocks(title="🤖 Multi-Model Chatbot", css=custom_css) as demo: | |
| # Dynamic header based on hardware environment | |
| header = f""" | |
| # 🤖 다중 모델 챗봇 {HW_ENV['description']} | |
| **환경 정보**: | |
| - **플랫폼**: {HW_ENV['platform'].upper().replace('_', ' ')} | |
| - **하드웨어**: {HW_ENV['hardware'].upper().replace('_', ' ')} | |
| - **GPU**: {'✅ ' + HW_ENV['gpu_name'] if HW_ENV['gpu_available'] else '❌ CPU only'} | |
| - **CPU 코어**: {HW_ENV['cpu_count']} | |
| - **운영체제**: {HW_ENV['os']} | |
| **모델 선택**: | |
| - 🎯 {TOTAL_MODEL_COUNT}가지 한글 최적화 모델 ({PUBLIC_MODEL_COUNT} Public + {GATED_MODEL_COUNT} Gated) | |
| - 🔄 모델 전환 시 자동 재로딩 (채팅 히스토리 초기화) | |
| - ⏱️ 첫 응답은 모델 로딩 시간 포함 | |
| **테스트 예시**: | |
| - "안녕하세요" | |
| - "인공지능에 대해 설명해주세요" | |
| - "한국의 수도는 어디인가요?" | |
| """ | |
| # Add hardware-specific features | |
| if HW_ENV['hardware'] == 'zerogpu': | |
| header += """ | |
| **ZeroGPU 특징**: | |
| - ⚡ 초고속 응답 (3-5초, GPU 가속) | |
| - 🚀 NVIDIA H200 자동 할당 | |
| - 💰 PRO 구독 시 하루 25분 무료 | |
| """ | |
| elif HW_ENV['hardware'] == 'cpu_upgrade': | |
| header += """ | |
| **CPU Upgrade 특징**: | |
| - ⏰ 무제한 사용 시간 | |
| - ⏳ CPU 환경 (응답 30초~1분) | |
| - 💰 시간당 $0.03 (월 약 $22) | |
| """ | |
| elif HW_ENV['hardware'] == 'cpu_basic': | |
| header += """ | |
| **CPU Basic 특징**: | |
| - 💡 무료 티어 | |
| - ⏳ CPU 환경 (응답 1~2분) | |
| - 🔒 경량 모델 권장 (EXAONE 2.4B, Mistral 7B) | |
| """ | |
| elif HW_ENV['hardware'] == 'local_gpu': | |
| header += f""" | |
| **로컬 GPU 특징**: | |
| - 🖥️ 개인 GPU: {HW_ENV['gpu_name']} | |
| - ⚡ 빠른 응답 (GPU 가속) | |
| - 🔓 무제한 사용 | |
| """ | |
| else: # local_cpu | |
| header += """ | |
| **로컬 CPU 특징**: | |
| - 💻 로컬 개발 환경 | |
| - ⏳ CPU 환경 (느린 응답) | |
| - 🔒 경량 모델 권장 | |
| """ | |
| gr.Markdown(header) | |
| # Model selector | |
| model_choices = [f"{cfg['MODEL_CONFIG']['name']}" for cfg in MODEL_CONFIGS] | |
| model_dropdown = gr.Dropdown( | |
| choices=model_choices, | |
| value=model_choices[0], | |
| label="🤖 모델 선택", | |
| interactive=True, | |
| ) | |
| chatbot = gr.Chatbot(height=400, type="messages", show_label=False) | |
| with gr.Row(elem_classes="input-row"): | |
| msg = gr.Textbox( | |
| placeholder="한글로 메시지를 입력하세요...", | |
| show_label=False, | |
| scale=9, | |
| container=False, | |
| ) | |
| btn = gr.Button("전송", scale=1, variant="primary", min_width=80) | |
| clear = gr.Button("🗑️ 대화 초기화", size="sm") | |
| def change_model(selected_model): | |
| """Handle model change""" | |
| global current_model_index | |
| # Find index of selected model | |
| for idx, cfg in enumerate(MODEL_CONFIGS): | |
| if cfg['MODEL_CONFIG']['name'] == selected_model: | |
| current_model_index = idx | |
| break | |
| # Clear chat history when changing model | |
| return [] | |
| def submit(message, history): | |
| global loaded_model_name, current_model_index | |
| # Immediately show user message | |
| updated_history = history + [{"role": "user", "content": message}] | |
| yield updated_history, "" | |
| # Check if model needs to be loaded | |
| selected_model_name = MODEL_CONFIGS[current_model_index]["MODEL_NAME"] | |
| if loaded_model_name != selected_model_name: | |
| # Check if model is cached | |
| is_cached = check_model_cached(selected_model_name) | |
| if is_cached: | |
| # Model is cached, just loading from disk | |
| loading_history = updated_history + [{"role": "assistant", "content": "💾 캐시된 모델 디스크에서 로딩 중... (15-30초, 다운로드 안 함)"}] | |
| else: | |
| # Model needs to be downloaded | |
| loading_history = updated_history + [{"role": "assistant", "content": "📥 모델 다운로드 시작... (4-14GB, 첫 사용 시 5-20분 소요)"}] | |
| yield loading_history, "" | |
| else: | |
| # Show "thinking" indicator | |
| thinking_history = updated_history + [{"role": "assistant", "content": "🤔 응답 생성 중..."}] | |
| yield thinking_history, "" | |
| # Generate and add bot response (this will load model if needed) | |
| final_history = chat_wrapper(message, history) | |
| yield final_history, "" | |
| # Event handlers | |
| model_dropdown.change(change_model, inputs=[model_dropdown], outputs=[chatbot]) | |
| btn.click(submit, [msg, chatbot], [chatbot, msg]) | |
| msg.submit(submit, [msg, chatbot], [chatbot, msg]) | |
| clear.click(lambda: [], outputs=chatbot) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |