""" Multi-environment chatbot: Detects and adapts to different hardware environments Supports: Local (Mac/Linux/Windows), HF Spaces (CPU Basic/Upgrade, ZeroGPU) """ import os import platform # IMPORTANT: Import spaces FIRST before any CUDA-related packages (torch, transformers) # This prevents "CUDA has been initialized" error on ZeroGPU try: import spaces ZEROGPU_AVAILABLE = True except ImportError: ZEROGPU_AVAILABLE = False # Now safe to import CUDA-related packages import gradio as gr from transformers import AutoModelForCausalLM, AutoTokenizer from huggingface_hub import snapshot_download import torch # ============================================================================ # Hardware Environment Detection # ============================================================================ def test_cuda_compatibility(): """ Test if CUDA actually works on this GPU. Returns: True if CUDA works, False otherwise Note: RTX 5080 and other Blackwell GPUs (sm_120) are supported with PyTorch nightly builds (CUDA 12.8+) """ if not torch.cuda.is_available(): return False try: # Try a simple tensor operation to verify CUDA works x = torch.randn(10, 10).cuda() y = torch.randn(10, 10).cuda() z = torch.matmul(x, y) z.cpu() return True except Exception as e: print(f"โš ๏ธ CUDA test failed: {e}") print(f" Will fall back to CPU mode") return False def detect_hardware_environment(): """ Comprehensive hardware environment detection Returns: dict: { 'platform': 'hf_spaces' | 'local', 'hardware': 'zerogpu' | 'cpu_upgrade' | 'cpu_basic' | 'local_gpu' | 'local_cpu', 'gpu_available': bool, 'gpu_name': str or None, 'cpu_count': int, 'os': 'Darwin' | 'Linux' | 'Windows', 'description': str, 'cuda_compatible': bool } """ env_info = { 'platform': 'local', 'hardware': 'local_cpu', 'gpu_available': False, 'gpu_name': None, 'cpu_count': os.cpu_count() or 1, 'os': platform.system(), 'description': '', 'cuda_compatible': False } # Check if running on HF Spaces is_hf_spaces = os.environ.get('SPACE_ID') is not None if is_hf_spaces: env_info['platform'] = 'hf_spaces' space_id = os.environ.get('SPACE_ID', 'unknown') # Check for ZeroGPU using already-imported status if ZEROGPU_AVAILABLE: env_info['hardware'] = 'zerogpu' env_info['gpu_available'] = True env_info['gpu_name'] = 'NVIDIA H200 (ZeroGPU)' env_info['description'] = f"๐Ÿš€ HF Spaces - ZeroGPU ({space_id})" env_info['cuda_compatible'] = True else: # Check CPU tier by memory/CPU count cpu_count = env_info['cpu_count'] if cpu_count >= 8: env_info['hardware'] = 'cpu_upgrade' env_info['description'] = f"โš™๏ธ HF Spaces - CPU Upgrade ({cpu_count} vCPU, 32GB RAM)" else: env_info['hardware'] = 'cpu_basic' env_info['description'] = f"๐Ÿ’ป HF Spaces - CPU Basic ({cpu_count} vCPU, 16GB RAM)" else: # Local environment detection if torch.cuda.is_available(): # CUDA is available, test if it actually works cuda_works = test_cuda_compatibility() try: gpu_name = torch.cuda.get_device_name(0) except: gpu_name = 'CUDA GPU' if cuda_works: env_info['hardware'] = 'local_gpu' env_info['gpu_available'] = True env_info['gpu_name'] = gpu_name env_info['description'] = f"๐Ÿ–ฅ๏ธ Local - GPU ({gpu_name})" env_info['cuda_compatible'] = True else: # CUDA detected but tensor operations failed env_info['hardware'] = 'local_cpu' env_info['gpu_available'] = False env_info['gpu_name'] = gpu_name + " (CUDA error - using CPU)" env_info['description'] = f"โš ๏ธ Local - CPU fallback ({gpu_name} CUDA error)" env_info['cuda_compatible'] = False elif torch.backends.mps.is_available(): env_info['hardware'] = 'local_gpu' env_info['gpu_available'] = True env_info['gpu_name'] = 'Apple Silicon GPU (MPS)' env_info['description'] = f"๐ŸŽ Local - Apple Silicon GPU" env_info['cuda_compatible'] = False else: env_info['hardware'] = 'local_cpu' env_info['description'] = f"๐Ÿ’ป Local - CPU ({env_info['os']}, {env_info['cpu_count']} cores)" env_info['cuda_compatible'] = False return env_info # Detect hardware environment HW_ENV = detect_hardware_environment() # Note: ZEROGPU_AVAILABLE already set at import time to prevent CUDA initialization errors # Print environment info print("=" * 60) print("Hardware Environment Detection") print("=" * 60) print(f"Platform: {HW_ENV['platform']}") print(f"Hardware: {HW_ENV['hardware']}") print(f"GPU Available: {HW_ENV['gpu_available']}") if HW_ENV['gpu_name']: print(f"GPU Name: {HW_ENV['gpu_name']}") print(f"CPU Cores: {HW_ENV['cpu_count']}") print(f"OS: {HW_ENV['os']}") print(f"Description: {HW_ENV['description']}") print("=" * 60) # Load environment variables from .env file try: from dotenv import load_dotenv load_dotenv() # Load .env file into environment print("โœ… .env file loaded") except ImportError: print("โš ๏ธ python-dotenv not installed, using system environment variables only") # Get HF token from environment HF_TOKEN = os.getenv("HF_TOKEN", None) if HF_TOKEN: print(f"โœ… HF_TOKEN loaded (length: {len(HF_TOKEN)} chars)") else: print("โš ๏ธ HF_TOKEN not found in environment - some models may not be accessible") # Model configurations # Note: Gated models (marked with ๐Ÿ”’) require HF access approval at https://huggingface.co/[model-name] MODEL_CONFIGS = [ { "MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct", "MODEL_CONFIG": { "name": "EXAONE 3.5 7.8B Instruct โญ (ํŒŒ๋ผ๋ฏธํ„ฐ ๋Œ€๋น„ ์ตœ๊ณ  ํšจ์œจ)", "max_length": 150, }, }, { "MODEL_NAME": "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct", "MODEL_CONFIG": { "name": "EXAONE 3.5 2.4B Instruct โšก (์ดˆ๊ฒฝ๋Ÿ‰, ๋น ๋ฅธ ์‘๋‹ต)", "max_length": 150, }, }, { "MODEL_NAME": "beomi/Llama-3-Open-Ko-8B", "MODEL_CONFIG": { "name": "Llama-3 Open-Ko 8B ๐Ÿ”ฅ (Llama 3 ์ƒํƒœ๊ณ„)", "max_length": 150, }, }, { "MODEL_NAME": "Qwen/Qwen2.5-7B-Instruct", "MODEL_CONFIG": { "name": "Qwen2.5 7B Instruct (ํ•œ๊ธ€ ์ง€์‹œ์‘๋‹ต ์šฐ์ˆ˜)", "max_length": 150, }, }, { "MODEL_NAME": "Qwen/Qwen2.5-14B-Instruct", "MODEL_CONFIG": { "name": "Qwen2.5 14B Instruct (๋‹ค๊ตญ์–ดยทํ•œ๊ธ€ ๊ฐ•์ , ์—ฌ์œ  GPU ๊ถŒ์žฅ)", "max_length": 150, }, }, { "MODEL_NAME": "meta-llama/Llama-3.1-8B-Instruct", "MODEL_CONFIG": { "name": "Llama 3.1 8B Instruct ๐Ÿ”’ (์ปค๋ฎค๋‹ˆํ‹ฐ Ko ํŠœ๋‹ ํ™œ๋ฐœ, ์Šน์ธ ํ•„์š”)", "max_length": 150, }, }, { "MODEL_NAME": "meta-llama/Llama-3.1-70B-Instruct", "MODEL_CONFIG": { "name": "Llama 3.1 70B Instruct ๐Ÿ”’ (๋Œ€๊ทœ๋ชจยทํ•œ๊ธ€ ํ’ˆ์งˆ ์šฐ์ˆ˜, ์Šน์ธ ํ•„์š”)", "max_length": 150, }, }, { "MODEL_NAME": "01-ai/Yi-1.5-9B-Chat", "MODEL_CONFIG": { "name": "Yi 1.5 9B Chat (๋‹ค๊ตญ์–ด/ํ•œ๊ธ€ ์•ˆ์ •์  ๋Œ€ํ™”)", "max_length": 150, }, }, { "MODEL_NAME": "01-ai/Yi-1.5-34B-Chat", "MODEL_CONFIG": { "name": "Yi 1.5 34B Chat (๊ธด ๋ฌธ๋งฅยทํ•œ๊ธ€ ์ƒ์„ฑ ๊ฐ•์ )", "max_length": 150, }, }, { "MODEL_NAME": "mistralai/Mistral-7B-Instruct-v0.3", "MODEL_CONFIG": { "name": "Mistral 7B Instruct v0.3 (๊ฒฝ๋Ÿ‰ยทํ•œ๊ธ€ ์ปค๋ฎค๋‹ˆํ‹ฐ ํŠœ๋‹ ๅคš)", "max_length": 150, }, }, { "MODEL_NAME": "upstage/SOLAR-10.7B-Instruct-v1.0", "MODEL_CONFIG": { "name": "Solar 10.7B Instruct v1.0 (ํ•œ๊ตญ์–ด ๊ฐ•์ , ์‹ค์ „ ์ง€์‹œ์‘๋‹ต)", "max_length": 150, }, }, { "MODEL_NAME": "EleutherAI/polyglot-ko-5.8b", "MODEL_CONFIG": { "name": "Polyglot-Ko 5.8B (ํ•œ๊ตญ์–ด ์ค‘์‹ฌ ๋ฒ ์ด์Šค)", "max_length": 150, }, }, { "MODEL_NAME": "CohereForAI/aya-23-8B", "MODEL_CONFIG": { "name": "Aya-23 8B ๐Ÿ”’ (๋‹ค๊ตญ์–ดยทํ•œ๊ตญ์–ด ์ง€์› ์–‘ํ˜ธ, ์Šน์ธ ํ•„์š”)", "max_length": 150, }, }, ] # Default model current_model_index = 0 loaded_model_name = None # Track which model is currently loaded # Global model cache model = None tokenizer = None # Dynamic model count TOTAL_MODEL_COUNT = len(MODEL_CONFIGS) PUBLIC_MODEL_COUNT = sum(1 for cfg in MODEL_CONFIGS if "๐Ÿ”’" not in cfg["MODEL_CONFIG"]["name"]) GATED_MODEL_COUNT = TOTAL_MODEL_COUNT - PUBLIC_MODEL_COUNT def check_model_cached(model_name): """Check if model is already downloaded in HF cache""" try: from huggingface_hub import scan_cache_dir cache_info = scan_cache_dir() # Check if model exists in cache for repo in cache_info.repos: if repo.repo_id == model_name: return True return False except Exception as e: # If unable to check cache, assume not cached print(f" โš ๏ธ Unable to check cache: {e}") return False def load_model_once(model_index=None): """Load model and tokenizer based on selected index (lazy loading)""" global model, tokenizer, current_model_index, loaded_model_name if model_index is None: model_index = current_model_index # Get model config model_name = MODEL_CONFIGS[model_index]["MODEL_NAME"] # Check if we need to reload (different model or not loaded yet) if loaded_model_name != model_name: print(f"๐Ÿ”„ Loading model: {model_name}") print(f" Previous model: {loaded_model_name or 'None'}") # Check if model is already cached is_cached = check_model_cached(model_name) if is_cached: print(f" โœ… Model found in cache, loading from disk...") else: print(f" ๐Ÿ“ฅ Model not in cache, will download (~4-14GB depending on model)...") # Clear previous model if model is not None: print(f" ๐Ÿ—‘๏ธ Unloading previous model from memory...") del model del tokenizer if HW_ENV['cuda_compatible']: torch.cuda.empty_cache() # Load tokenizer print(f" ๐Ÿ“ Loading tokenizer...") tokenizer = AutoTokenizer.from_pretrained( model_name, token=HF_TOKEN, trust_remote_code=True, ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Detect device - use hardware environment detection use_gpu = HW_ENV['gpu_available'] and HW_ENV['cuda_compatible'] device = "cuda" if use_gpu else "cpu" print(f"๐Ÿ“ Using device: {device}") # Load model with appropriate settings if is_cached: print(f" ๐Ÿ“€ Loading model from disk cache (15-30 seconds)...") else: print(f" ๐ŸŒ Downloading model from network (5-20 minutes, first time only)...") if device == "cuda": # GPU available and compatible model = AutoModelForCausalLM.from_pretrained( model_name, token=HF_TOKEN, dtype=torch.float16, # Use float16 for GPU low_cpu_mem_usage=True, trust_remote_code=True, device_map="auto", ) else: # CPU only model = AutoModelForCausalLM.from_pretrained( model_name, token=HF_TOKEN, dtype=torch.float32, # Use float32 for CPU low_cpu_mem_usage=True, trust_remote_code=True, ) model.to(device) model.eval() current_model_index = model_index loaded_model_name = model_name print(f"โœ… Model {model_name} loaded successfully") else: print(f"โ„น๏ธ Model {model_name} already loaded, reusing...") return model, tokenizer def generate_response_impl(message, history): """Core generation logic (same for both ZeroGPU and CPU)""" if not message or not message.strip(): return history try: # Ensure model is loaded current_model, current_tokenizer = load_model_once() if current_model is None or current_tokenizer is None: return history + [{"role": "assistant", "content": "โŒ ๋ชจ๋ธ์„ ๋กœ๋“œํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."}] # Get device device = next(current_model.parameters()).device # Build conversation context (last 3 turns) conversation = "" for msg in history[-6:]: # Last 3 turns (6 messages: 3 user + 3 assistant) if msg["role"] == "user": conversation += f"์‚ฌ์šฉ์ž: {msg['content']}\n" elif msg["role"] == "assistant": conversation += f"์–ด์‹œ์Šคํ„ดํŠธ: {msg['content']}\n" conversation += f"์‚ฌ์šฉ์ž: {message}\n์–ด์‹œ์Šคํ„ดํŠธ:" # Tokenize with attention_mask encoded = current_tokenizer( conversation, return_tensors="pt", truncation=True, max_length=512, padding=True, ) inputs = encoded['input_ids'].to(device) attention_mask = encoded['attention_mask'].to(device) # Get current model config model_config = MODEL_CONFIGS[current_model_index]["MODEL_CONFIG"] # Generate response with torch.no_grad(): outputs = current_model.generate( inputs, attention_mask=attention_mask, max_new_tokens=model_config["max_length"], temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=current_tokenizer.pad_token_id, eos_token_id=current_tokenizer.eos_token_id, ) # Decode response full_response = current_tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract only the assistant's response if "์–ด์‹œ์Šคํ„ดํŠธ:" in full_response: response = full_response.split("์–ด์‹œ์Šคํ„ดํŠธ:")[-1].strip() else: response = full_response[len(conversation):].strip() if not response: response = "์ฃ„์†กํ•ฉ๋‹ˆ๋‹ค. ์‘๋‹ต์„ ์ƒ์„ฑํ•  ์ˆ˜ ์—†์—ˆ์Šต๋‹ˆ๋‹ค." return history + [{"role": "assistant", "content": response}] except Exception as e: import traceback error_msg = str(e) print("=" * 50) print(f"Error: {error_msg}") print(traceback.format_exc()) print("=" * 50) return history + [{"role": "assistant", "content": f"โŒ ์˜ค๋ฅ˜: {error_msg[:200]}"}] # Conditionally apply ZeroGPU decorator if ZEROGPU_AVAILABLE: @spaces.GPU(duration=120) def generate_response(message, history): """GPU-accelerated response generation (ZeroGPU mode)""" return generate_response_impl(message, history) else: def generate_response(message, history): """Standard response generation (CPU Upgrade mode)""" return generate_response_impl(message, history) def chat_wrapper(message, history): """Wrapper for Gradio ChatInterface""" # When type="messages", history includes user message already from Gradio # So we add it first, then generate response updated_history = history + [{"role": "user", "content": message}] response_history = generate_response(message, updated_history) return response_history print(f"โœ… App initialized - {HW_ENV['description']}") # Custom CSS for button alignment custom_css = """ .input-row { align-items: center !important; } .input-row > div:last-child button { height: 100% !important; min-height: 42px !important; } """ # Create Gradio interface with gr.Blocks(title="๐Ÿค– Multi-Model Chatbot", css=custom_css) as demo: # Dynamic header based on hardware environment header = f""" # ๐Ÿค– ๋‹ค์ค‘ ๋ชจ๋ธ ์ฑ—๋ด‡ {HW_ENV['description']} **ํ™˜๊ฒฝ ์ •๋ณด**: - **ํ”Œ๋žซํผ**: {HW_ENV['platform'].upper().replace('_', ' ')} - **ํ•˜๋“œ์›จ์–ด**: {HW_ENV['hardware'].upper().replace('_', ' ')} - **GPU**: {'โœ… ' + HW_ENV['gpu_name'] if HW_ENV['gpu_available'] else 'โŒ CPU only'} - **CPU ์ฝ”์–ด**: {HW_ENV['cpu_count']} - **์šด์˜์ฒด์ œ**: {HW_ENV['os']} **๋ชจ๋ธ ์„ ํƒ**: - ๐ŸŽฏ {TOTAL_MODEL_COUNT}๊ฐ€์ง€ ํ•œ๊ธ€ ์ตœ์ ํ™” ๋ชจ๋ธ ({PUBLIC_MODEL_COUNT} Public + {GATED_MODEL_COUNT} Gated) - ๐Ÿ”„ ๋ชจ๋ธ ์ „ํ™˜ ์‹œ ์ž๋™ ์žฌ๋กœ๋”ฉ (์ฑ„ํŒ… ํžˆ์Šคํ† ๋ฆฌ ์ดˆ๊ธฐํ™”) - โฑ๏ธ ์ฒซ ์‘๋‹ต์€ ๋ชจ๋ธ ๋กœ๋”ฉ ์‹œ๊ฐ„ ํฌํ•จ **ํ…Œ์ŠคํŠธ ์˜ˆ์‹œ**: - "์•ˆ๋…•ํ•˜์„ธ์š”" - "์ธ๊ณต์ง€๋Šฅ์— ๋Œ€ํ•ด ์„ค๋ช…ํ•ด์ฃผ์„ธ์š”" - "ํ•œ๊ตญ์˜ ์ˆ˜๋„๋Š” ์–ด๋””์ธ๊ฐ€์š”?" """ # Add hardware-specific features if HW_ENV['hardware'] == 'zerogpu': header += """ **ZeroGPU ํŠน์ง•**: - โšก ์ดˆ๊ณ ์† ์‘๋‹ต (3-5์ดˆ, GPU ๊ฐ€์†) - ๐Ÿš€ NVIDIA H200 ์ž๋™ ํ• ๋‹น - ๐Ÿ’ฐ PRO ๊ตฌ๋… ์‹œ ํ•˜๋ฃจ 25๋ถ„ ๋ฌด๋ฃŒ """ elif HW_ENV['hardware'] == 'cpu_upgrade': header += """ **CPU Upgrade ํŠน์ง•**: - โฐ ๋ฌด์ œํ•œ ์‚ฌ์šฉ ์‹œ๊ฐ„ - โณ CPU ํ™˜๊ฒฝ (์‘๋‹ต 30์ดˆ~1๋ถ„) - ๐Ÿ’ฐ ์‹œ๊ฐ„๋‹น $0.03 (์›” ์•ฝ $22) """ elif HW_ENV['hardware'] == 'cpu_basic': header += """ **CPU Basic ํŠน์ง•**: - ๐Ÿ’ก ๋ฌด๋ฃŒ ํ‹ฐ์–ด - โณ CPU ํ™˜๊ฒฝ (์‘๋‹ต 1~2๋ถ„) - ๐Ÿ”’ ๊ฒฝ๋Ÿ‰ ๋ชจ๋ธ ๊ถŒ์žฅ (EXAONE 2.4B, Mistral 7B) """ elif HW_ENV['hardware'] == 'local_gpu': header += f""" **๋กœ์ปฌ GPU ํŠน์ง•**: - ๐Ÿ–ฅ๏ธ ๊ฐœ์ธ GPU: {HW_ENV['gpu_name']} - โšก ๋น ๋ฅธ ์‘๋‹ต (GPU ๊ฐ€์†) - ๐Ÿ”“ ๋ฌด์ œํ•œ ์‚ฌ์šฉ """ else: # local_cpu header += """ **๋กœ์ปฌ CPU ํŠน์ง•**: - ๐Ÿ’ป ๋กœ์ปฌ ๊ฐœ๋ฐœ ํ™˜๊ฒฝ - โณ CPU ํ™˜๊ฒฝ (๋А๋ฆฐ ์‘๋‹ต) - ๐Ÿ”’ ๊ฒฝ๋Ÿ‰ ๋ชจ๋ธ ๊ถŒ์žฅ """ gr.Markdown(header) # Model selector model_choices = [f"{cfg['MODEL_CONFIG']['name']}" for cfg in MODEL_CONFIGS] model_dropdown = gr.Dropdown( choices=model_choices, value=model_choices[0], label="๐Ÿค– ๋ชจ๋ธ ์„ ํƒ", interactive=True, ) chatbot = gr.Chatbot(height=400, type="messages", show_label=False) with gr.Row(elem_classes="input-row"): msg = gr.Textbox( placeholder="ํ•œ๊ธ€๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ž…๋ ฅํ•˜์„ธ์š”...", show_label=False, scale=9, container=False, ) btn = gr.Button("์ „์†ก", scale=1, variant="primary", min_width=80) clear = gr.Button("๐Ÿ—‘๏ธ ๋Œ€ํ™” ์ดˆ๊ธฐํ™”", size="sm") def change_model(selected_model): """Handle model change""" global current_model_index # Find index of selected model for idx, cfg in enumerate(MODEL_CONFIGS): if cfg['MODEL_CONFIG']['name'] == selected_model: current_model_index = idx break # Clear chat history when changing model return [] def submit(message, history): global loaded_model_name, current_model_index # Immediately show user message updated_history = history + [{"role": "user", "content": message}] yield updated_history, "" # Check if model needs to be loaded selected_model_name = MODEL_CONFIGS[current_model_index]["MODEL_NAME"] if loaded_model_name != selected_model_name: # Check if model is cached is_cached = check_model_cached(selected_model_name) if is_cached: # Model is cached, just loading from disk loading_history = updated_history + [{"role": "assistant", "content": "๐Ÿ’พ ์บ์‹œ๋œ ๋ชจ๋ธ ๋””์Šคํฌ์—์„œ ๋กœ๋”ฉ ์ค‘... (15-30์ดˆ, ๋‹ค์šด๋กœ๋“œ ์•ˆ ํ•จ)"}] else: # Model needs to be downloaded loading_history = updated_history + [{"role": "assistant", "content": "๐Ÿ“ฅ ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ ์‹œ์ž‘... (4-14GB, ์ฒซ ์‚ฌ์šฉ ์‹œ 5-20๋ถ„ ์†Œ์š”)"}] yield loading_history, "" else: # Show "thinking" indicator thinking_history = updated_history + [{"role": "assistant", "content": "๐Ÿค” ์‘๋‹ต ์ƒ์„ฑ ์ค‘..."}] yield thinking_history, "" # Generate and add bot response (this will load model if needed) final_history = chat_wrapper(message, history) yield final_history, "" # Event handlers model_dropdown.change(change_model, inputs=[model_dropdown], outputs=[chatbot]) btn.click(submit, [msg, chatbot], [chatbot, msg]) msg.submit(submit, [msg, chatbot], [chatbot, msg]) clear.click(lambda: [], outputs=chatbot) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)