Spaces:
Running
Running
| import os | |
| import time | |
| import random | |
| import requests | |
| from openai import OpenAI | |
| from typing import Dict, List, Optional | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # def get_api_keys(service_name: str, key_names: List[str]) -> List[str]: | |
| # """Get API keys from multiple sources""" | |
| # keys = [] | |
| # # 1. HuggingFace Spaces Secrets (Primary) | |
| # for key_name in key_names: | |
| # # Try HF-specific naming first | |
| # hf_key_name = f"HF_{service_name.upper()}_{key_name}" | |
| # key = os.getenv(hf_key_name) | |
| # if key and key.strip(): | |
| # keys.append(key.strip()) | |
| # logger.info(f"β Found {service_name} key in HuggingFace secrets") | |
| # break # Use first valid key | |
| # # 2. Standard Environment Variables (fallback) | |
| # if not keys: | |
| # for key_name in key_names: | |
| # key = os.getenv(key_name) or os.getenv(key_name.upper()) | |
| # if key and key.strip(): | |
| # keys.append(key.strip()) | |
| # logger.info(f"β Found {service_name} key in environment") | |
| # break | |
| # # 3. Streamlit Secrets (last resort) | |
| # if not keys: | |
| # try: | |
| # import streamlit as st | |
| # if hasattr(st, 'secrets') and service_name in st.secrets: | |
| # secrets = st.secrets[service_name] | |
| # for key_name in key_names: | |
| # key = secrets.get(key_name) | |
| # if key and key.strip(): | |
| # keys.append(key.strip()) | |
| # logger.info(f"β Found {service_name} key in Streamlit secrets") | |
| # break | |
| # except Exception: | |
| # pass | |
| # if not keys: | |
| # logger.warning(f"β No {service_name} API keys found") | |
| # return keys | |
| def get_api_keys(service_name: str, key_names: List[str]) -> List[str]: | |
| """Get API keys from multiple sources with priority for HuggingFace Spaces""" | |
| keys = [] | |
| # 1. HuggingFace Spaces Secrets (Primary) - multiple keys | |
| for i in range(1, 6): # Check for HF_GROQ_API_KEY_1 through HF_GROQ_API_KEY_5 | |
| hf_key_name = f"HF_{service_name.upper()}_API_KEY_{i}" | |
| key = os.getenv(hf_key_name) | |
| if key and key.strip(): | |
| keys.append(key.strip()) | |
| logger.info(f"β Found {service_name} key {i} in HuggingFace secrets") | |
| # Also check for single key (backward compatibility) | |
| if not keys: | |
| single_key_name = f"HF_{service_name.upper()}_API_KEY" | |
| key = os.getenv(single_key_name) | |
| if key and key.strip(): | |
| keys.append(key.strip()) | |
| logger.info(f"β Found {service_name} key in HuggingFace secrets") | |
| # 2. Standard Environment Variables (fallback) | |
| if not keys: | |
| for key_name in key_names: | |
| key = os.getenv(key_name) or os.getenv(key_name.upper()) | |
| if key and key.strip(): | |
| keys.append(key.strip()) | |
| logger.info(f"β Found {service_name} key in environment") | |
| # 3. Streamlit Secrets (last resort) | |
| if not keys: | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'secrets') and service_name in st.secrets: | |
| secrets = st.secrets[service_name] | |
| for key_name in key_names: | |
| key = secrets.get(key_name) | |
| if key and key.strip(): | |
| keys.append(key.strip()) | |
| logger.info(f"β Found {service_name} key in Streamlit secrets") | |
| except Exception: | |
| pass | |
| if not keys: | |
| logger.warning(f"β No {service_name} API keys found") | |
| else: | |
| logger.info(f"β Found {len(keys)} {service_name} API keys") | |
| return keys | |
| # def get_groq_api_keys(): | |
| # """Get Groq API keys for all environments""" | |
| # return get_api_keys("groq", ["api_key", "api_key_1", "api_key_2"]) | |
| def get_groq_api_keys(): | |
| """Get Groq API keys for all environments""" | |
| # Look for multiple HF keys first, then fallback | |
| return get_api_keys("groq", ["api_key", "api_key_1", "api_key_2", "api_key_3"]) | |
| def get_ollama_url(): | |
| """Get Ollama URL from multiple sources""" | |
| # 1. HuggingFace Spaces | |
| hf_url = os.getenv("HF_OLLAMA_URL") | |
| if hf_url: | |
| # Clean the URL - remove quotes if present | |
| hf_url = hf_url.strip('"\'') | |
| logger.info("β Found Ollama URL in HuggingFace secrets") | |
| return hf_url | |
| # 2. Environment Variables | |
| env_url = os.getenv("OLLAMA_URL") or os.getenv("MODEL_URL") | |
| if env_url: | |
| env_url = env_url.strip('"\'') | |
| logger.info("β Found Ollama URL in environment") | |
| return env_url | |
| # 3. Streamlit Secrets | |
| try: | |
| import streamlit as st | |
| if hasattr(st, 'secrets') and 'ollama' in st.secrets: | |
| url = st.secrets["ollama"].get("url") | |
| if url: | |
| url = url.strip('"\'') | |
| logger.info("β Found Ollama URL in Streamlit secrets") | |
| return url | |
| except (ImportError, AttributeError): | |
| pass | |
| logger.warning("β οΈ No Ollama URL configured - local models will not be available") | |
| return None | |
| class MultiGroqGenerator: | |
| def __init__(self): | |
| self.providers = self._initialize_groq_providers() | |
| self.models = self._get_best_models() | |
| self.max_retries = 3 | |
| self.retry_delay = 2 # seconds | |
| def _initialize_groq_providers(self): | |
| """Initialize multiple Groq API providers with different keys""" | |
| providers = [] | |
| # Get all Groq API keys | |
| groq_keys = get_groq_api_keys() | |
| # Filter out None values and create providers | |
| for i, key in enumerate(groq_keys): | |
| if key and key.strip(): | |
| providers.append({ | |
| 'name': f'Groq-{i+1}', | |
| 'client': OpenAI( | |
| api_key=key.strip(), | |
| base_url="https://api.groq.com/openai/v1" | |
| ), | |
| 'weight': 10, | |
| 'fail_count': 0, | |
| 'last_used': 0 | |
| }) | |
| if not providers: | |
| logger.warning("β No Groq API keys found") | |
| return [] | |
| logger.info(f"β Initialized {len(providers)} Groq providers") | |
| return providers | |
| def _get_best_models(self): | |
| """Select optimal models for educational content""" | |
| return [ | |
| { | |
| 'id': 'llama-3.3-70b-versatile', | |
| 'name': 'Llama 3.3 70B', | |
| 'weight': 10, | |
| 'max_tokens': 32768, | |
| 'description': 'Best for complex explanations' | |
| }, | |
| { | |
| 'id': 'meta-llama/llama-4-maverick-17b-128e-instruct', | |
| 'name': 'Llama 4 Maverick 17B', | |
| 'weight': 9, | |
| 'max_tokens': 128000, | |
| 'description': 'Large context for big documents' | |
| }, | |
| { | |
| 'id': 'llama-3.1-8b-instant', | |
| 'name': 'Llama 3.1 8B Instant', | |
| 'weight': 8, | |
| 'max_tokens': 32768, | |
| 'description': 'Fast for most content' | |
| }, | |
| ] | |
| def _select_provider(self): | |
| """Select provider based on weight and fail history""" | |
| if not self.providers: | |
| return None | |
| available_providers = [ | |
| p for p in self.providers | |
| if p['fail_count'] < 3 and (time.time() - p['last_used']) > 30 | |
| ] | |
| if not available_providers: | |
| available_providers = self.providers | |
| for p in available_providers: | |
| p['fail_count'] = max(0, p['fail_count'] - 1) | |
| weights = [p['weight'] for p in available_providers] | |
| selected = random.choices(available_providers, weights=weights, k=1)[0] | |
| selected['last_used'] = time.time() | |
| return selected | |
| def _select_model(self, prompt_length: int): | |
| """Select optimal model based on prompt size""" | |
| approx_tokens = prompt_length // 4 | |
| if approx_tokens > 20000: | |
| return self.models[1] # Maverick for huge docs | |
| elif approx_tokens > 10000: | |
| return self.models[1] # Maverick for large docs | |
| elif approx_tokens > 6000: | |
| return self.models[0] # 70B for medium-large | |
| elif approx_tokens > 3000: | |
| return self.models[0] # 70B for quality | |
| else: | |
| return self.models[2] # 8B for speed | |
| def generate(self, prompt: str) -> str: | |
| """Generate content with automatic failover""" | |
| if not self.providers: | |
| return "β Groq Error: No API keys configured. Please set GROQ_API_KEY in HuggingFace secrets or environment variables." | |
| last_error = None | |
| prompt_length = len(prompt) | |
| for attempt in range(self.max_retries + 1): | |
| provider = self._select_provider() | |
| model = self._select_model(prompt_length) | |
| if not provider: | |
| return "β Groq Error: No available providers" | |
| try: | |
| logger.info(f"π Attempt {attempt + 1} with {provider['name']} using {model['name']}...") | |
| result = self._call_groq(provider, model, prompt) | |
| if result and not result.startswith(("[Error", "[RateLimit]", "[Quota]", "[Auth]", "[Empty]", "[ModelNotFound]")): | |
| logger.info(f"β Success with {provider['name']} + {model['name']}") | |
| provider['weight'] = min(20, provider['weight'] + 1) | |
| provider['fail_count'] = max(0, provider['fail_count'] - 1) | |
| return result | |
| else: | |
| logger.warning(f"β Provider returned: {result}") | |
| if "[ModelNotFound]" in result: | |
| continue | |
| except Exception as e: | |
| last_error = str(e) | |
| logger.error(f"β {provider['name']} + {model['name']} failed: {last_error}") | |
| provider['weight'] = max(1, provider['weight'] - 2) | |
| provider['fail_count'] += 1 | |
| if attempt < self.max_retries: | |
| delay = self.retry_delay * (2 ** attempt) | |
| logger.info(f"β° Waiting {delay}s before retry...") | |
| time.sleep(delay) | |
| return self._fallback_generate(prompt) | |
| def generate_large_content(self, prompt: str) -> str: | |
| """Handle large content generation for Groq - compatibility method""" | |
| logger.info("π· Using Groq for large content generation...") | |
| # For Groq, we can handle large content directly due to large context windows | |
| # Just use the normal generate method with optimized model selection | |
| prompt_length = len(prompt) | |
| if prompt_length > 20000: # Very large prompt | |
| logger.info("π Large prompt detected, optimizing for Groq Maverick...") | |
| # Temporarily prioritize Maverick for large contexts | |
| original_models = self.models.copy() | |
| self.models = [self.models[1]] # Maverick has 128K context | |
| try: | |
| result = self.generate(prompt) | |
| return result | |
| finally: | |
| self.models = original_models # Restore original models | |
| else: | |
| # Use normal generation | |
| return self.generate(prompt) | |
| def _fallback_generate(self, prompt: str) -> str: | |
| """Fallback generation with simpler model selection""" | |
| logger.info("π Trying fallback generation...") | |
| fallback_models = [self.models[2], self.models[0]] | |
| for model in fallback_models: | |
| for provider in self.providers: | |
| try: | |
| logger.info(f"π Fallback with {provider['name']} using {model['name']}...") | |
| result = self._call_groq(provider, model, prompt) | |
| if result and not result.startswith(("[Error", "[RateLimit]", "[Quota]", "[Auth]", "[Empty]", "[ModelNotFound]")): | |
| logger.info(f"β Fallback success with {provider['name']} + {model['name']}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Fallback failed: {e}") | |
| continue | |
| return self._get_user_friendly_error("All models failed") | |
| def _call_groq(self, provider, model, prompt: str) -> str: | |
| """Call Groq API with specific provider and model""" | |
| try: | |
| prompt_tokens_approx = len(prompt) // 4 | |
| available_tokens = model['max_tokens'] - prompt_tokens_approx - 500 | |
| max_response_tokens = max(1000, min(8000, available_tokens)) | |
| response = provider['client'].chat.completions.create( | |
| model=model['id'], | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=max_response_tokens, | |
| top_p=0.9 | |
| ) | |
| if (response and response.choices and len(response.choices) > 0 and | |
| response.choices[0].message and response.choices[0].message.content): | |
| content = response.choices[0].message.content.strip() | |
| return content if content else "[Empty] No content generated" | |
| else: | |
| return "[Empty] Invalid response structure" | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "rate limit" in error_msg or "429" in error_msg: | |
| return f"[RateLimit] {provider['name']} rate limit exceeded" | |
| elif "quota" in error_msg: | |
| return f"[Quota] {provider['name']} quota exceeded" | |
| elif "authentication" in error_msg: | |
| return f"[Auth] {provider['name']} authentication failed" | |
| elif "context length" in error_msg: | |
| return f"[Length] {provider['name']} content too long" | |
| elif "model not found" in error_msg: | |
| return f"[ModelNotFound] {provider['name']}: {str(e)}" | |
| else: | |
| return f"[Error] {provider['name']}: {str(e)}" | |
| def _get_user_friendly_error(self, technical_error: str) -> str: | |
| """Convert technical errors to user-friendly messages""" | |
| error_lower = technical_error.lower() | |
| if "rate limit" in error_lower: | |
| return "π« **Service Busy** - Please wait a few minutes and try again" | |
| elif "quota" in error_lower: | |
| return "π **Daily Limit Reached** - Try again tomorrow" | |
| elif "length" in error_lower: | |
| return "π **Content Too Large** - Please break into smaller sections" | |
| else: | |
| return "β **Temporary Issue** - Please try again shortly" | |
| def get_service_status(self) -> dict: | |
| """Get current status of all providers""" | |
| status = { | |
| 'total_providers': len(self.providers), | |
| 'healthy_providers': len([p for p in self.providers if p['fail_count'] < 2]), | |
| 'providers': [], | |
| 'models': [m['name'] for m in self.models] | |
| } | |
| for provider in self.providers: | |
| if provider['fail_count'] >= 3: | |
| status_text = "π΄ Limited" | |
| elif provider['fail_count'] >= 1: | |
| status_text = "π‘ Slow" | |
| else: | |
| status_text = "π’ Good" | |
| status['providers'].append({ | |
| 'name': provider['name'], | |
| 'status': status_text, | |
| 'failures': provider['fail_count'] | |
| }) | |
| return status | |
| class HFGenerator: | |
| """Phi-3 Generator with Auto-Pull, Smart Chunking, and Context Preservation""" | |
| def __init__(self, base_url: str = None): | |
| # Use environment variable or Streamlit secret as default | |
| self.base_url = base_url or get_ollama_url() | |
| self.model = "phi3:mini" | |
| self.current_requests = 0 | |
| self.max_concurrent = 2 | |
| self.model_available = False | |
| # Only try to connect if base_url is provided | |
| if self.base_url: | |
| self._ensure_model_available() | |
| else: | |
| logger.warning("β οΈ Ollama URL not configured - Phi-3 will not be available") | |
| def _ensure_model_available(self): | |
| """Check if model is available and pull if needed""" | |
| try: | |
| response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| if response.status_code == 200: | |
| models = response.json().get('models', []) | |
| self.model_available = any(model['name'] == self.model for model in models) | |
| if not self.model_available: | |
| logger.info(f"π Model {self.model} not found, pulling...") | |
| self._pull_model() | |
| else: | |
| logger.info(f"β Model {self.model} is available") | |
| else: | |
| logger.warning(f"β Could not check models: {response.status_code}") | |
| except Exception as e: | |
| logger.error(f"β Error checking models: {e}") | |
| def _pull_model(self): | |
| """Pull the Phi-3 model if not available""" | |
| try: | |
| logger.info(f"π₯ Pulling {self.model}... This may take a few minutes.") | |
| payload = {"name": self.model} | |
| response = requests.post( | |
| f"{self.base_url}/api/pull", | |
| json=payload, | |
| timeout=300 # 5 minute timeout for pull | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"β Successfully pulled {self.model}") | |
| self.model_available = True | |
| return True | |
| else: | |
| logger.error(f"β Failed to pull model: {response.text}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Error pulling model: {e}") | |
| return False | |
| def _estimate_tokens(self, text: str) -> int: | |
| """Rough token estimation""" | |
| return len(text) // 4 | |
| def _chunk_content(self, content: str, max_tokens: int = 2500) -> list: | |
| """Split large content into manageable chunks""" | |
| paragraphs = content.split('\n\n') | |
| chunks = [] | |
| current_chunk = "" | |
| current_tokens = 0 | |
| for paragraph in paragraphs: | |
| para_tokens = self._estimate_tokens(paragraph) | |
| if para_tokens > max_tokens: | |
| sentences = paragraph.split('. ') | |
| for sentence in sentences: | |
| sent_tokens = self._estimate_tokens(sentence) | |
| if current_tokens + sent_tokens > max_tokens: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence | |
| current_tokens = sent_tokens | |
| else: | |
| current_chunk += " " + sentence | |
| current_tokens += sent_tokens | |
| else: | |
| if current_tokens + para_tokens > max_tokens: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = paragraph | |
| current_tokens = para_tokens | |
| else: | |
| current_chunk += "\n\n" + paragraph | |
| current_tokens += para_tokens | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def _create_context_summary(self, previous_chunks: list) -> str: | |
| """Create a context summary from previous chunks""" | |
| if not previous_chunks: | |
| return "" | |
| context_prompt = f""" | |
| Here's a summary of previous sections: | |
| {chr(10).join(previous_chunks)} | |
| Provide a brief summary (2-3 sentences) of key points to help understand the next section. | |
| """ | |
| try: | |
| payload = { | |
| "model": self.model, | |
| "messages": [{"role": "user", "content": context_prompt}], | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.3, | |
| "top_p": 0.8, | |
| "num_predict": 200 | |
| } | |
| } | |
| response = requests.post(f"{self.base_url}/api/chat", json=payload, timeout=30) | |
| if response.status_code == 200: | |
| return response.json()['message']['content'].strip() | |
| return f"Previous sections covered: {', '.join(previous_chunks[:2])}..." | |
| except Exception: | |
| return f"Context from {len(previous_chunks)} previous sections" | |
| def _create_chunk_summary(self, content: str) -> str: | |
| """Create a very brief summary of a chunk's content""" | |
| try: | |
| payload = { | |
| "model": self.model, | |
| "messages": [{"role": "user", "content": f"Summarize key points in 1-2 sentences: {content}"}], | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.3, | |
| "top_p": 0.8, | |
| "num_predict": 100 | |
| } | |
| } | |
| response = requests.post(f"{self.base_url}/api/chat", json=payload, timeout=20) | |
| if response.status_code == 200: | |
| return response.json()['message']['content'].strip() | |
| return content[:100] + "..." | |
| except: | |
| return content[:100] + "..." | |
| def _call_ollama_with_retry(self, payload: dict, max_retries: int = 2) -> Dict: | |
| """Call Ollama API with auto-pull retry""" | |
| for attempt in range(max_retries + 1): | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/api/chat", | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code == 200: | |
| return {"success": True, "data": response.json()} | |
| elif response.status_code == 404 and "not found" in response.text.lower(): | |
| logger.info(f"π Model not found, attempting to pull... (attempt {attempt + 1})") | |
| if self._pull_model(): | |
| continue # Retry after successful pull | |
| else: | |
| return {"success": False, "error": "Failed to pull model"} | |
| else: | |
| return {"success": False, "error": f"API error {response.status_code}: {response.text}"} | |
| except requests.exceptions.Timeout: | |
| if attempt < max_retries: | |
| logger.info(f"β° Timeout, retrying... (attempt {attempt + 1})") | |
| time.sleep(2) | |
| else: | |
| return {"success": False, "error": "Request timeout"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Connection failed: {str(e)}"} | |
| return {"success": False, "error": "All retries failed"} | |
| def generate(self, prompt: str, user_type: str = "student", | |
| academic_level: str = "undergraduate", | |
| content_type: str = "simplified_explanation") -> str: | |
| """Generate educational content with auto-pull and smart features""" | |
| # Check if Ollama is configured | |
| if not self.base_url: | |
| return "β Phi-3 Error: Ollama URL not configured. Please set MODEL_URL environment variable or add to HuggingFace secrets." | |
| # Check if we need to pull model first | |
| if not self.model_available: | |
| logger.info("π Model not available, pulling before generation...") | |
| if not self._pull_model(): | |
| return f"β Phi-3 Error: Phi-3 model is not available and failed to pull. Please check the Ollama server." | |
| estimated_tokens = self._estimate_tokens(prompt) | |
| # Auto-detect large documents and use chunking | |
| if estimated_tokens > 3000: | |
| result = self.generate_large_content_with_context(prompt, user_type, academic_level, content_type) | |
| if isinstance(result, dict): | |
| return result.get("content", f"β Phi-3 Error: {result.get('error', 'Unknown error')}") | |
| return result | |
| # Queue management | |
| if self.current_requests >= self.max_concurrent: | |
| queue_position = self.current_requests - self.max_concurrent + 1 | |
| estimated_wait = queue_position * 7 | |
| return f"β Phi-3 Error: Service busy. You're #{queue_position} in queue (~{estimated_wait}s)" | |
| self.current_requests += 1 | |
| try: | |
| # FIXED: Increased token allocation for complete responses | |
| if estimated_tokens > 2000: | |
| max_output_tokens = 2000 # Increased from 500 | |
| elif estimated_tokens > 1000: | |
| max_output_tokens = 2500 # Increased from 800 | |
| else: | |
| max_output_tokens = 3000 # Increased from 1000 | |
| payload = { | |
| "model": self.model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "stream": False, | |
| "options": { | |
| "temperature": 0.7, | |
| "top_p": 0.9, | |
| "num_predict": max_output_tokens | |
| } | |
| } | |
| start_time = time.time() | |
| result = self._call_ollama_with_retry(payload) | |
| inference_time = time.time() - start_time | |
| if result["success"]: | |
| data = result["data"] | |
| content = data['message']['content'].strip() | |
| # Check if content was cut off and retry with more tokens if needed | |
| if self._is_content_cut_off(content): | |
| logger.info("β οΈ Content appears cut off, retrying with more tokens...") | |
| payload["options"]["num_predict"] = 4000 # Max tokens for Phi-3 | |
| retry_result = self._call_ollama_with_retry(payload) | |
| if retry_result["success"]: | |
| data = retry_result["data"] | |
| content = data['message']['content'].strip() | |
| return content | |
| else: | |
| return f"β Phi-3 Error: {result['error']}" | |
| except Exception as e: | |
| return f"β Phi-3 Error: {str(e)}" | |
| finally: | |
| self.current_requests -= 1 | |
| def _is_content_cut_off(self, content: str) -> bool: | |
| """Check if content appears to be cut off mid-sentence""" | |
| if not content or len(content.strip()) < 100: | |
| return True | |
| # Check if it ends with proper punctuation | |
| if content.strip().endswith(('.', '!', '?', '."', '!"', '?"')): | |
| return False | |
| # Check if it ends with incomplete sentence markers | |
| if any(content.strip().endswith(marker) for marker in [',', ';', ':', '-', 'β', 'β']): | |
| return True | |
| # Check if it ends with an incomplete word or thought | |
| last_paragraph = content.strip().split('\n')[-1] | |
| if len(last_paragraph.split()) < 5: # Very short last paragraph | |
| return True | |
| return False | |
| def generate_large_content_with_context(self, prompt: str, user_type: str = "student", | |
| academic_level: str = "undergraduate", | |
| content_type: str = "simplified_explanation") -> str: | |
| """Handle large documents with context preservation""" | |
| estimated_tokens = self._estimate_tokens(prompt) | |
| if estimated_tokens <= 3000: | |
| return self.generate(prompt, user_type, academic_level, content_type) | |
| chunks = self._chunk_content(prompt, max_tokens=2500) | |
| if len(chunks) > 6: | |
| return f"β Phi-3 Error: Document too large ({estimated_tokens} tokens, {len(chunks)} chunks). Please use Groq or break into smaller sections." | |
| all_results = [] | |
| previous_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| logger.info(f"π Processing chunk {i+1}/{len(chunks)} with context...") | |
| context_summary = self._create_context_summary(previous_summaries) | |
| if context_summary: | |
| chunk_prompt = f"""Part {i+1} of {len(chunks)} - Building on previous context: | |
| **PREVIOUS CONTEXT:** | |
| {context_summary} | |
| **CURRENT SECTION:** | |
| {chunk} | |
| Analyze this section while connecting to the overall context.""" | |
| else: | |
| chunk_prompt = f"""Part {i+1} of {len(chunks)}: | |
| **CONTENT:** | |
| {chunk} | |
| Please analyze this section.""" | |
| chunk_result = self.generate(chunk_prompt, user_type, academic_level, content_type) | |
| if "β Phi-3 Error:" not in chunk_result: | |
| chunk_summary = self._create_chunk_summary(chunk_result) | |
| previous_summaries.append(chunk_summary) | |
| all_results.append({ | |
| "chunk_number": i+1, | |
| "content": chunk_result, | |
| "context_used": bool(context_summary) | |
| }) | |
| else: | |
| return f"β Phi-3 Error: Failed to process chunk {i+1}: {chunk_result}" | |
| if i < len(chunks) - 1: | |
| time.sleep(1) | |
| # Combine results | |
| combined_content = "\n\n".join([f"## Part {r['chunk_number']}\n{r['content']}" for r in all_results]) | |
| return combined_content | |
| def health_check(self) -> Dict: | |
| """Comprehensive health check""" | |
| if not self.base_url: | |
| return { | |
| "server_healthy": False, | |
| "model_available": False, | |
| "error": "Ollama URL not configured" | |
| } | |
| try: | |
| response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| if response.status_code == 200: | |
| models = response.json().get('models', []) | |
| model_available = any(model['name'] == self.model for model in models) | |
| return { | |
| "server_healthy": True, | |
| "model_available": model_available, | |
| "available_models": [model['name'] for model in models], | |
| "model_required": self.model | |
| } | |
| else: | |
| return { | |
| "server_healthy": False, | |
| "model_available": False, | |
| "error": f"Server returned {response.status_code}" | |
| } | |
| except Exception as e: | |
| return { | |
| "server_healthy": False, | |
| "model_available": False, | |
| "error": str(e) | |
| } | |
| def get_available_models(self): | |
| """Get list of available models""" | |
| try: | |
| response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| if response.status_code == 200: | |
| return [model['name'] for model in response.json().get('models', [])] | |
| return [] | |
| except: | |
| return [] | |
| def get_queue_status(self): | |
| """Get current queue status""" | |
| return { | |
| "current_requests": self.current_requests, | |
| "max_concurrent": self.max_concurrent, | |
| "available_slots": max(0, self.max_concurrent - self.current_requests) | |
| } | |
| # Backward compatibility | |
| class GroqGenerator(MultiGroqGenerator): | |
| def __init__(self, model="llama-3.3-70b-versatile"): | |
| super().__init__() | |
| class ModelManager: | |
| """Unified model manager that handles both Groq and Phi-3 models""" | |
| def __init__(self): | |
| self.groq_generator = MultiGroqGenerator() | |
| self.phi3_generator = HFGenerator() | |
| def generate(self, prompt: str, model_choice: str = "phi3", **kwargs) -> str: | |
| """Generate content using selected model""" | |
| logger.info(f"π― Using model: {model_choice}") | |
| if model_choice == "phi3": | |
| # Handle Phi-3 generation | |
| user_type = kwargs.get('user_type', 'student') | |
| academic_level = kwargs.get('student_level', 'undergraduate') | |
| content_type = kwargs.get('content_type', 'simplified_explanation') | |
| result = self.phi3_generator.generate(prompt, user_type, academic_level, content_type) | |
| return result | |
| else: | |
| # Use Groq for comparison - check if this is a large content request | |
| is_large_content = len(prompt) > 8000 | |
| if is_large_content: | |
| return self.groq_generator.generate_large_content(prompt) | |
| else: | |
| return self.groq_generator.generate(prompt) | |
| def get_service_status(self) -> dict: | |
| """Get clean research-focused status""" | |
| groq_status = self.groq_generator.get_service_status() | |
| phi3_health = self.phi3_generator.health_check() | |
| # Clean Groq status | |
| clean_groq_status = { | |
| 'healthy_providers': groq_status['healthy_providers'], | |
| 'total_providers': groq_status['total_providers'], | |
| 'providers': [ | |
| { | |
| 'name': provider['name'], | |
| 'failures': provider['failures'] | |
| } | |
| for provider in groq_status['providers'] | |
| ] | |
| } | |
| # Enhanced Phi-3 status | |
| enhanced_phi3_status = { | |
| 'server_healthy': phi3_health['server_healthy'], | |
| 'model_available': phi3_health['model_available'], | |
| 'available_models': phi3_health['available_models'], | |
| 'model_required': phi3_health['model_required'] | |
| } | |
| return { | |
| "groq": clean_groq_status, | |
| "phi3": enhanced_phi3_status | |
| } | |
| # Global model manager instance | |
| model_manager = ModelManager() | |
| # Setup function for your Streamlit app | |
| def setup_generators(): | |
| """Setup both generators with health checks""" | |
| logger.info("π§ Setting up generators...") | |
| groq_generator = MultiGroqGenerator() | |
| phi3_generator = HFGenerator() | |
| phi3_health = phi3_generator.health_check() | |
| logger.info(f"π₯ Phi-3 Health: {phi3_health}") | |
| if not phi3_health["server_healthy"]: | |
| logger.error("β Phi-3 server is not accessible") | |
| elif not phi3_health["model_available"]: | |
| logger.info("π Phi-3 model needs to be pulled on first use") | |
| return { | |
| "groq": groq_generator, | |
| "phi3": phi3_generator | |
| } | |
| # Test function | |
| def test_generators(): | |
| """Test both generators""" | |
| logger.info("π§ͺ Testing Generators...") | |
| generators = setup_generators() | |
| # Test Groq | |
| logger.info("π· Testing Groq...") | |
| groq_result = generators["groq"].generate("Explain photosynthesis briefly") | |
| if not groq_result.startswith("["): | |
| logger.info("β Groq working") | |
| else: | |
| logger.error(f"β Groq failed: {groq_result}") | |
| # Test Phi-3 | |
| logger.info("πΆ Testing Phi-3...") | |
| phi3_result = generators["phi3"].generate("Explain photosynthesis briefly") | |
| if "β Phi-3 Error:" not in phi3_result: | |
| logger.info("β Phi-3 working") | |
| else: | |
| logger.error(f"β Phi-3 failed: {phi3_result}") | |
| # Test health | |
| logger.info("π₯ Health Check:") | |
| logger.info(f"Groq providers: {len(generators['groq'].providers)}") | |
| logger.info(f"Phi-3 healthy: {generators['phi3'].health_check()}") | |
| if __name__ == "__main__": | |
| test_generators() | |
| # import os | |
| # import time | |
| # import random | |
| # import requests | |
| # from openai import OpenAI | |
| # from dotenv import load_dotenv | |
| # from typing import Dict, List | |
| # # Load environment variables once at module level | |
| # load_dotenv() | |
| # class MultiGroqGenerator: | |
| # def __init__(self): | |
| # self.providers = self._initialize_groq_providers() | |
| # self.models = self._get_best_models() | |
| # self.max_retries = 3 | |
| # self.retry_delay = 2 # seconds | |
| # def _initialize_groq_providers(self): | |
| # """Initialize multiple Groq API providers with different keys""" | |
| # providers = [] | |
| # # Get all Groq API keys from environment | |
| # groq_keys = [ | |
| # os.getenv("GROQ_API_KEY_1"), | |
| # os.getenv("GROQ_API_KEY_2"), | |
| # ] | |
| # # Filter out None values and create providers | |
| # for i, key in enumerate(groq_keys): | |
| # if key and key.strip(): | |
| # providers.append({ | |
| # 'name': f'Groq-{i+1}', | |
| # 'client': OpenAI( | |
| # api_key=key.strip(), | |
| # base_url="https://api.groq.com/openai/v1" | |
| # ), | |
| # 'weight': 10, | |
| # 'fail_count': 0, | |
| # 'last_used': 0 | |
| # }) | |
| # if not providers: | |
| # raise ValueError("No Groq API keys found. Please set GROQ_API_KEY_1, GROQ_API_KEY_2, etc.") | |
| # print(f"β Initialized {len(providers)} Groq providers") | |
| # return providers | |
| # def _get_best_models(self): | |
| # """Select optimal models for educational content""" | |
| # return [ | |
| # { | |
| # 'id': 'llama-3.3-70b-versatile', | |
| # 'name': 'Llama 3.3 70B', | |
| # 'weight': 10, | |
| # 'max_tokens': 32768, | |
| # 'description': 'Best for complex explanations' | |
| # }, | |
| # { | |
| # 'id': 'meta-llama/llama-4-maverick-17b-128e-instruct', | |
| # 'name': 'Llama 4 Maverick 17B', | |
| # 'weight': 9, | |
| # 'max_tokens': 128000, | |
| # 'description': 'Large context for big documents' | |
| # }, | |
| # { | |
| # 'id': 'llama-3.1-8b-instant', | |
| # 'name': 'Llama 3.1 8B Instant', | |
| # 'weight': 8, | |
| # 'max_tokens': 32768, | |
| # 'description': 'Fast for most content' | |
| # }, | |
| # ] | |
| # def _select_provider(self): | |
| # """Select provider based on weight and fail history""" | |
| # available_providers = [ | |
| # p for p in self.providers | |
| # if p['fail_count'] < 3 and (time.time() - p['last_used']) > 30 | |
| # ] | |
| # if not available_providers: | |
| # available_providers = self.providers | |
| # for p in available_providers: | |
| # p['fail_count'] = max(0, p['fail_count'] - 1) | |
| # weights = [p['weight'] for p in available_providers] | |
| # selected = random.choices(available_providers, weights=weights, k=1)[0] | |
| # selected['last_used'] = time.time() | |
| # return selected | |
| # def _select_model(self, prompt_length: int): | |
| # """Select optimal model based on prompt size""" | |
| # approx_tokens = prompt_length // 4 | |
| # if approx_tokens > 20000: | |
| # return self.models[1] # Maverick for huge docs | |
| # elif approx_tokens > 10000: | |
| # return self.models[1] # Maverick for large docs | |
| # elif approx_tokens > 6000: | |
| # return self.models[0] # 70B for medium-large | |
| # elif approx_tokens > 3000: | |
| # return self.models[0] # 70B for quality | |
| # else: | |
| # return self.models[2] # 8B for speed | |
| # def generate(self, prompt: str) -> str: | |
| # """Generate content with automatic failover""" | |
| # last_error = None | |
| # prompt_length = len(prompt) | |
| # for attempt in range(self.max_retries + 1): | |
| # provider = self._select_provider() | |
| # model = self._select_model(prompt_length) | |
| # try: | |
| # print(f"π Attempt {attempt + 1} with {provider['name']} using {model['name']}...") | |
| # result = self._call_groq(provider, model, prompt) | |
| # if result and not result.startswith(("[Error", "[RateLimit]", "[Quota]", "[Auth]", "[Empty]", "[ModelNotFound]")): | |
| # print(f"β Success with {provider['name']} + {model['name']}") | |
| # provider['weight'] = min(20, provider['weight'] + 1) | |
| # provider['fail_count'] = max(0, provider['fail_count'] - 1) | |
| # return result | |
| # else: | |
| # print(f"β Provider returned: {result}") | |
| # if "[ModelNotFound]" in result: | |
| # continue | |
| # except Exception as e: | |
| # last_error = str(e) | |
| # print(f"β {provider['name']} + {model['name']} failed: {last_error}") | |
| # provider['weight'] = max(1, provider['weight'] - 2) | |
| # provider['fail_count'] += 1 | |
| # if attempt < self.max_retries: | |
| # delay = self.retry_delay * (2 ** attempt) | |
| # print(f"β° Waiting {delay}s before retry...") | |
| # time.sleep(delay) | |
| # return self._fallback_generate(prompt) | |
| # def generate_large_content(self, prompt: str) -> str: | |
| # """Handle large content generation for Groq - compatibility method""" | |
| # print("π· Using Groq for large content generation...") | |
| # # For Groq, we can handle large content directly due to large context windows | |
| # # Just use the normal generate method with optimized model selection | |
| # prompt_length = len(prompt) | |
| # if prompt_length > 20000: # Very large prompt | |
| # print("π Large prompt detected, optimizing for Groq Maverick...") | |
| # # Temporarily prioritize Maverick for large contexts | |
| # original_models = self.models.copy() | |
| # self.models = [self.models[1]] # Maverick has 128K context | |
| # try: | |
| # result = self.generate(prompt) | |
| # return result | |
| # finally: | |
| # self.models = original_models # Restore original models | |
| # else: | |
| # # Use normal generation | |
| # return self.generate(prompt) | |
| # def _fallback_generate(self, prompt: str) -> str: | |
| # """Fallback generation with simpler model selection""" | |
| # print("π Trying fallback generation...") | |
| # fallback_models = [self.models[2], self.models[0]] | |
| # for model in fallback_models: | |
| # for provider in self.providers: | |
| # try: | |
| # print(f"π Fallback with {provider['name']} using {model['name']}...") | |
| # result = self._call_groq(provider, model, prompt) | |
| # if result and not result.startswith(("[Error", "[RateLimit]", "[Quota]", "[Auth]", "[Empty]", "[ModelNotFound]")): | |
| # print(f"β Fallback success with {provider['name']} + {model['name']}") | |
| # return result | |
| # except Exception as e: | |
| # print(f"β Fallback failed: {e}") | |
| # continue | |
| # return self._get_user_friendly_error("All models failed") | |
| # def _call_groq(self, provider, model, prompt: str) -> str: | |
| # """Call Groq API with specific provider and model""" | |
| # try: | |
| # prompt_tokens_approx = len(prompt) // 4 | |
| # available_tokens = model['max_tokens'] - prompt_tokens_approx - 500 | |
| # max_response_tokens = max(1000, min(8000, available_tokens)) | |
| # response = provider['client'].chat.completions.create( | |
| # model=model['id'], | |
| # messages=[{"role": "user", "content": prompt}], | |
| # temperature=0.7, | |
| # max_tokens=max_response_tokens, | |
| # top_p=0.9 | |
| # ) | |
| # if (response and response.choices and len(response.choices) > 0 and | |
| # response.choices[0].message and response.choices[0].message.content): | |
| # content = response.choices[0].message.content.strip() | |
| # return content if content else "[Empty] No content generated" | |
| # else: | |
| # return "[Empty] Invalid response structure" | |
| # except Exception as e: | |
| # error_msg = str(e).lower() | |
| # if "rate limit" in error_msg or "429" in error_msg: | |
| # return f"[RateLimit] {provider['name']} rate limit exceeded" | |
| # elif "quota" in error_msg: | |
| # return f"[Quota] {provider['name']} quota exceeded" | |
| # elif "authentication" in error_msg: | |
| # return f"[Auth] {provider['name']} authentication failed" | |
| # elif "context length" in error_msg: | |
| # return f"[Length] {provider['name']} content too long" | |
| # elif "model not found" in error_msg: | |
| # return f"[ModelNotFound] {provider['name']}: {str(e)}" | |
| # else: | |
| # return f"[Error] {provider['name']}: {str(e)}" | |
| # def _get_user_friendly_error(self, technical_error: str) -> str: | |
| # """Convert technical errors to user-friendly messages""" | |
| # error_lower = technical_error.lower() | |
| # if "rate limit" in error_lower: | |
| # return "π« **Service Busy** - Please wait a few minutes and try again" | |
| # elif "quota" in error_lower: | |
| # return "π **Daily Limit Reached** - Try again tomorrow" | |
| # elif "length" in error_lower: | |
| # return "π **Content Too Large** - Please break into smaller sections" | |
| # else: | |
| # return "β **Temporary Issue** - Please try again shortly" | |
| # def get_service_status(self) -> dict: | |
| # """Get current status of all providers""" | |
| # status = { | |
| # 'total_providers': len(self.providers), | |
| # 'healthy_providers': len([p for p in self.providers if p['fail_count'] < 2]), | |
| # 'providers': [], | |
| # 'models': [m['name'] for m in self.models] | |
| # } | |
| # for provider in self.providers: | |
| # if provider['fail_count'] >= 3: | |
| # status_text = "π΄ Limited" | |
| # elif provider['fail_count'] >= 1: | |
| # status_text = "π‘ Slow" | |
| # else: | |
| # status_text = "π’ Good" | |
| # status['providers'].append({ | |
| # 'name': provider['name'], | |
| # 'status': status_text, | |
| # 'failures': provider['fail_count'] | |
| # }) | |
| # return status | |
| # class HFGenerator: | |
| # """Phi-3 Generator with Auto-Pull, Smart Chunking, and Context Preservation""" | |
| # def __init__(self, base_url: str = None): | |
| # # Use environment variable as default if no base_url provided | |
| # self.base_url = base_url or os.getenv("MODEL_URL") | |
| # self.model = "phi3:mini" | |
| # self.current_requests = 0 | |
| # self.max_concurrent = 2 | |
| # self.model_available = False | |
| # self._ensure_model_available() | |
| # def _ensure_model_available(self): | |
| # """Check if model is available and pull if needed""" | |
| # try: | |
| # response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| # if response.status_code == 200: | |
| # models = response.json().get('models', []) | |
| # self.model_available = any(model['name'] == self.model for model in models) | |
| # if not self.model_available: | |
| # print(f"π Model {self.model} not found, pulling...") | |
| # self._pull_model() | |
| # else: | |
| # print(f"β Model {self.model} is available") | |
| # else: | |
| # print(f"β Could not check models: {response.status_code}") | |
| # except Exception as e: | |
| # print(f"β Error checking models: {e}") | |
| # def _pull_model(self): | |
| # """Pull the Phi-3 model if not available""" | |
| # try: | |
| # print(f"π₯ Pulling {self.model}... This may take a few minutes.") | |
| # payload = {"name": self.model} | |
| # response = requests.post( | |
| # f"{self.base_url}/api/pull", | |
| # json=payload, | |
| # timeout=300 # 5 minute timeout for pull | |
| # ) | |
| # if response.status_code == 200: | |
| # print(f"β Successfully pulled {self.model}") | |
| # self.model_available = True | |
| # return True | |
| # else: | |
| # print(f"β Failed to pull model: {response.text}") | |
| # return False | |
| # except Exception as e: | |
| # print(f"β Error pulling model: {e}") | |
| # return False | |
| # def _estimate_tokens(self, text: str) -> int: | |
| # """Rough token estimation""" | |
| # return len(text) // 4 | |
| # def _chunk_content(self, content: str, max_tokens: int = 2500) -> list: | |
| # """Split large content into manageable chunks""" | |
| # paragraphs = content.split('\n\n') | |
| # chunks = [] | |
| # current_chunk = "" | |
| # current_tokens = 0 | |
| # for paragraph in paragraphs: | |
| # para_tokens = self._estimate_tokens(paragraph) | |
| # if para_tokens > max_tokens: | |
| # sentences = paragraph.split('. ') | |
| # for sentence in sentences: | |
| # sent_tokens = self._estimate_tokens(sentence) | |
| # if current_tokens + sent_tokens > max_tokens: | |
| # if current_chunk: | |
| # chunks.append(current_chunk.strip()) | |
| # current_chunk = sentence | |
| # current_tokens = sent_tokens | |
| # else: | |
| # current_chunk += " " + sentence | |
| # current_tokens += sent_tokens | |
| # else: | |
| # if current_tokens + para_tokens > max_tokens: | |
| # if current_chunk: | |
| # chunks.append(current_chunk.strip()) | |
| # current_chunk = paragraph | |
| # current_tokens = para_tokens | |
| # else: | |
| # current_chunk += "\n\n" + paragraph | |
| # current_tokens += para_tokens | |
| # if current_chunk: | |
| # chunks.append(current_chunk.strip()) | |
| # return chunks | |
| # def _create_context_summary(self, previous_chunks: list) -> str: | |
| # """Create a context summary from previous chunks""" | |
| # if not previous_chunks: | |
| # return "" | |
| # context_prompt = f""" | |
| # Here's a summary of previous sections: | |
| # {chr(10).join(previous_chunks)} | |
| # Provide a brief summary (2-3 sentences) of key points to help understand the next section. | |
| # """ | |
| # try: | |
| # payload = { | |
| # "model": self.model, | |
| # "messages": [{"role": "user", "content": context_prompt}], | |
| # "stream": False, | |
| # "options": { | |
| # "temperature": 0.3, | |
| # "top_p": 0.8, | |
| # "num_predict": 200 | |
| # } | |
| # } | |
| # response = requests.post(f"{self.base_url}/api/chat", json=payload, timeout=30) | |
| # if response.status_code == 200: | |
| # return response.json()['message']['content'].strip() | |
| # return f"Previous sections covered: {', '.join(previous_chunks[:2])}..." | |
| # except Exception: | |
| # return f"Context from {len(previous_chunks)} previous sections" | |
| # def _create_chunk_summary(self, content: str) -> str: | |
| # """Create a very brief summary of a chunk's content""" | |
| # try: | |
| # payload = { | |
| # "model": self.model, | |
| # "messages": [{"role": "user", "content": f"Summarize key points in 1-2 sentences: {content}"}], | |
| # "stream": False, | |
| # "options": { | |
| # "temperature": 0.3, | |
| # "top_p": 0.8, | |
| # "num_predict": 100 | |
| # } | |
| # } | |
| # response = requests.post(f"{self.base_url}/api/chat", json=payload, timeout=20) | |
| # if response.status_code == 200: | |
| # return response.json()['message']['content'].strip() | |
| # return content[:100] + "..." | |
| # except: | |
| # return content[:100] + "..." | |
| # def _call_ollama_with_retry(self, payload: dict, max_retries: int = 2) -> Dict: | |
| # """Call Ollama API with auto-pull retry""" | |
| # for attempt in range(max_retries + 1): | |
| # try: | |
| # response = requests.post( | |
| # f"{self.base_url}/api/chat", | |
| # json=payload, | |
| # timeout=60 | |
| # ) | |
| # if response.status_code == 200: | |
| # return {"success": True, "data": response.json()} | |
| # elif response.status_code == 404 and "not found" in response.text.lower(): | |
| # print(f"π Model not found, attempting to pull... (attempt {attempt + 1})") | |
| # if self._pull_model(): | |
| # continue # Retry after successful pull | |
| # else: | |
| # return {"success": False, "error": "Failed to pull model"} | |
| # else: | |
| # return {"success": False, "error": f"API error {response.status_code}: {response.text}"} | |
| # except requests.exceptions.Timeout: | |
| # if attempt < max_retries: | |
| # print(f"β° Timeout, retrying... (attempt {attempt + 1})") | |
| # time.sleep(2) | |
| # else: | |
| # return {"success": False, "error": "Request timeout"} | |
| # except Exception as e: | |
| # return {"success": False, "error": f"Connection failed: {str(e)}"} | |
| # return {"success": False, "error": "All retries failed"} | |
| # def generate(self, prompt: str, user_type: str = "student", | |
| # academic_level: str = "undergraduate", | |
| # content_type: str = "simplified_explanation") -> str: | |
| # """Generate educational content with auto-pull and smart features - FIXED to return string""" | |
| # # Check if we need to pull model first | |
| # if not self.model_available: | |
| # print("π Model not available, pulling before generation...") | |
| # if not self._pull_model(): | |
| # return f"β Phi-3 Error: Phi-3 model is not available and failed to pull. Please check the Ollama server." | |
| # estimated_tokens = self._estimate_tokens(prompt) | |
| # # Auto-detect large documents and use chunking | |
| # if estimated_tokens > 3000: | |
| # result = self.generate_large_content_with_context(prompt, user_type, academic_level, content_type) | |
| # if isinstance(result, dict): | |
| # return result.get("content", f"β Phi-3 Error: {result.get('error', 'Unknown error')}") | |
| # return result | |
| # # Queue management | |
| # if self.current_requests >= self.max_concurrent: | |
| # queue_position = self.current_requests - self.max_concurrent + 1 | |
| # estimated_wait = queue_position * 7 | |
| # return f"β Phi-3 Error: Service busy. You're #{queue_position} in queue (~{estimated_wait}s)" | |
| # self.current_requests += 1 | |
| # try: | |
| # # Use the prompt directly without adding instructional wrapper | |
| # # The prompts from tutor_flow and student_flow now tell it to generate content directly | |
| # # FIXED: Increased token allocation for complete responses | |
| # if estimated_tokens > 2000: | |
| # max_output_tokens = 2000 # Increased from 500 | |
| # elif estimated_tokens > 1000: | |
| # max_output_tokens = 2500 # Increased from 800 | |
| # else: | |
| # max_output_tokens = 3000 # Increased from 1000 | |
| # payload = { | |
| # "model": self.model, | |
| # "messages": [{"role": "user", "content": prompt}], | |
| # "stream": False, | |
| # "options": { | |
| # "temperature": 0.7, | |
| # "top_p": 0.9, | |
| # "num_predict": max_output_tokens | |
| # } | |
| # } | |
| # start_time = time.time() | |
| # result = self._call_ollama_with_retry(payload) | |
| # inference_time = time.time() - start_time | |
| # if result["success"]: | |
| # data = result["data"] | |
| # content = data['message']['content'].strip() | |
| # # Check if content was cut off and retry with more tokens if needed | |
| # if self._is_content_cut_off(content): | |
| # print("β οΈ Content appears cut off, retrying with more tokens...") | |
| # payload["options"]["num_predict"] = 4000 # Max tokens for Phi-3 | |
| # retry_result = self._call_ollama_with_retry(payload) | |
| # if retry_result["success"]: | |
| # data = retry_result["data"] | |
| # content = data['message']['content'].strip() | |
| # return content | |
| # else: | |
| # return f"β Phi-3 Error: {result['error']}" | |
| # except Exception as e: | |
| # return f"β Phi-3 Error: {str(e)}" | |
| # finally: | |
| # self.current_requests -= 1 | |
| # def _is_content_cut_off(self, content: str) -> bool: | |
| # """Check if content appears to be cut off mid-sentence""" | |
| # if not content or len(content.strip()) < 100: | |
| # return True | |
| # # Check if it ends with proper punctuation | |
| # if content.strip().endswith(('.', '!', '?', '."', '!"', '?"')): | |
| # return False | |
| # # Check if it ends with incomplete sentence markers | |
| # if any(content.strip().endswith(marker) for marker in [',', ';', ':', '-', 'β', 'β']): | |
| # return True | |
| # # Check if it ends with an incomplete word or thought | |
| # last_paragraph = content.strip().split('\n')[-1] | |
| # if len(last_paragraph.split()) < 5: # Very short last paragraph | |
| # return True | |
| # return False | |
| # def generate_large_content_with_context(self, prompt: str, user_type: str = "student", | |
| # academic_level: str = "undergraduate", | |
| # content_type: str = "simplified_explanation") -> str: | |
| # """Handle large documents with context preservation - FIXED to return string""" | |
| # estimated_tokens = self._estimate_tokens(prompt) | |
| # if estimated_tokens <= 3000: | |
| # return self.generate(prompt, user_type, academic_level, content_type) | |
| # chunks = self._chunk_content(prompt, max_tokens=2500) | |
| # if len(chunks) > 6: | |
| # return f"β Phi-3 Error: Document too large ({estimated_tokens} tokens, {len(chunks)} chunks). Please use Groq or break into smaller sections." | |
| # all_results = [] | |
| # previous_summaries = [] | |
| # for i, chunk in enumerate(chunks): | |
| # print(f"π Processing chunk {i+1}/{len(chunks)} with context...") | |
| # context_summary = self._create_context_summary(previous_summaries) | |
| # if context_summary: | |
| # chunk_prompt = f"""Part {i+1} of {len(chunks)} - Building on previous context: | |
| # **PREVIOUS CONTEXT:** | |
| # {context_summary} | |
| # **CURRENT SECTION:** | |
| # {chunk} | |
| # Analyze this section while connecting to the overall context.""" | |
| # else: | |
| # chunk_prompt = f"""Part {i+1} of {len(chunks)}: | |
| # **CONTENT:** | |
| # {chunk} | |
| # Please analyze this section.""" | |
| # chunk_result = self.generate(chunk_prompt, user_type, academic_level, content_type) | |
| # if "β Phi-3 Error:" not in chunk_result: | |
| # chunk_summary = self._create_chunk_summary(chunk_result) | |
| # previous_summaries.append(chunk_summary) | |
| # all_results.append({ | |
| # "chunk_number": i+1, | |
| # "content": chunk_result, | |
| # "context_used": bool(context_summary) | |
| # }) | |
| # else: | |
| # return f"β Phi-3 Error: Failed to process chunk {i+1}: {chunk_result}" | |
| # if i < len(chunks) - 1: | |
| # time.sleep(1) | |
| # # Combine results | |
| # combined_content = "\n\n".join([f"## Part {r['chunk_number']}\n{r['content']}" for r in all_results]) | |
| # return combined_content | |
| # def health_check(self) -> Dict: | |
| # """Comprehensive health check""" | |
| # try: | |
| # response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| # if response.status_code == 200: | |
| # models = response.json().get('models', []) | |
| # model_available = any(model['name'] == self.model for model in models) | |
| # return { | |
| # "server_healthy": True, | |
| # "model_available": model_available, | |
| # "available_models": [model['name'] for model in models], | |
| # "model_required": self.model | |
| # } | |
| # else: | |
| # return { | |
| # "server_healthy": False, | |
| # "model_available": False, | |
| # "error": f"Server returned {response.status_code}" | |
| # } | |
| # except Exception as e: | |
| # return { | |
| # "server_healthy": False, | |
| # "model_available": False, | |
| # "error": str(e) | |
| # } | |
| # def get_available_models(self): | |
| # """Get list of available models""" | |
| # try: | |
| # response = requests.get(f"{self.base_url}/api/tags", timeout=10) | |
| # if response.status_code == 200: | |
| # return [model['name'] for model in response.json().get('models', [])] | |
| # return [] | |
| # except: | |
| # return [] | |
| # def get_queue_status(self): | |
| # """Get current queue status""" | |
| # return { | |
| # "current_requests": self.current_requests, | |
| # "max_concurrent": self.max_concurrent, | |
| # "available_slots": max(0, self.max_concurrent - self.current_requests) | |
| # } | |
| # # Backward compatibility | |
| # class GroqGenerator(MultiGroqGenerator): | |
| # def __init__(self, model="llama-3.3-70b-versatile"): | |
| # super().__init__() | |
| # class ModelManager: | |
| # """Unified model manager that handles both Groq and Phi-3 models""" | |
| # def __init__(self): | |
| # self.groq_generator = MultiGroqGenerator() | |
| # self.phi3_generator = HFGenerator() | |
| # def generate(self, prompt: str, model_choice: str = "phi3", **kwargs) -> str: | |
| # """Generate content using selected model""" | |
| # print(f"π― Using model: {model_choice}") | |
| # if model_choice == "phi3": | |
| # # Handle Phi-3 generation - FIXED: Now returns string directly | |
| # user_type = kwargs.get('user_type', 'student') | |
| # academic_level = kwargs.get('student_level', 'undergraduate') | |
| # content_type = kwargs.get('content_type', 'simplified_explanation') | |
| # result = self.phi3_generator.generate(prompt, user_type, academic_level, content_type) | |
| # return result | |
| # else: | |
| # # Use Groq for comparison - check if this is a large content request | |
| # is_large_content = len(prompt) > 8000 # You can adjust this threshold | |
| # if is_large_content: | |
| # return self.groq_generator.generate_large_content(prompt) | |
| # else: | |
| # return self.groq_generator.generate(prompt) | |
| # def get_service_status(self) -> dict: | |
| # """Get clean research-focused status""" | |
| # groq_status = self.groq_generator.get_service_status() | |
| # phi3_health = self.phi3_generator.health_check() | |
| # # Clean Groq status - remove model names, focus on providers | |
| # clean_groq_status = { | |
| # 'healthy_providers': groq_status['healthy_providers'], | |
| # 'total_providers': groq_status['total_providers'], | |
| # 'providers': [ | |
| # { | |
| # 'name': provider['name'], | |
| # 'failures': provider['failures'] | |
| # } | |
| # for provider in groq_status['providers'] | |
| # ] | |
| # } | |
| # # Enhanced Phi-3 status | |
| # enhanced_phi3_status = { | |
| # 'server_healthy': phi3_health['server_healthy'], | |
| # 'model_available': phi3_health['model_available'], | |
| # 'available_models': phi3_health['available_models'], | |
| # 'model_required': phi3_health['model_required'] | |
| # } | |
| # return { | |
| # "groq": clean_groq_status, | |
| # "phi3": enhanced_phi3_status | |
| # } | |
| # # Global model manager instance | |
| # model_manager = ModelManager() | |
| # # Setup function for your Streamlit app | |
| # def setup_generators(): | |
| # """Setup both generators with health checks""" | |
| # print("π§ Setting up generators...") | |
| # groq_generator = MultiGroqGenerator() | |
| # phi3_generator = HFGenerator() | |
| # phi3_health = phi3_generator.health_check() | |
| # print(f"π₯ Phi-3 Health: {phi3_health}") | |
| # if not phi3_health["server_healthy"]: | |
| # print("β Phi-3 server is not accessible") | |
| # elif not phi3_health["model_available"]: | |
| # print("π Phi-3 model needs to be pulled on first use") | |
| # return { | |
| # "groq": groq_generator, | |
| # "phi3": phi3_generator | |
| # } | |
| # # Test function | |
| # def test_generators(): | |
| # """Test both generators""" | |
| # print("π§ͺ Testing Generators...") | |
| # generators = setup_generators() | |
| # # Test Groq | |
| # print("\nπ· Testing Groq...") | |
| # groq_result = generators["groq"].generate("Explain photosynthesis briefly") | |
| # if not groq_result.startswith("["): | |
| # print("β Groq working") | |
| # else: | |
| # print("β Groq failed:", groq_result) | |
| # # Test Phi-3 | |
| # print("\nπΆ Testing Phi-3...") | |
| # phi3_result = generators["phi3"].generate("Explain photosynthesis briefly") | |
| # if "β Phi-3 Error:" not in phi3_result: | |
| # print("β Phi-3 working") | |
| # else: | |
| # print("β Phi-3 failed:", phi3_result) | |
| # # Test health | |
| # print("\nπ₯ Health Check:") | |
| # print(f"Groq providers: {len(generators['groq'].providers)}") | |
| # print(f"Phi-3 healthy: {generators['phi3'].health_check()}") | |
| # if __name__ == "__main__": | |
| # test_generators() | |