#!/usr/bin/env python3 """ Second LLM Trainer ================== Trains a second LLM on our comprehensive training data and integrates it with the dual LLM wavecaster system. """ import json import asyncio import time from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime # Import our enhanced systems try: from enhanced_tokenizer_minimal import MinimalEnhancedTokenizer ENHANCED_TOKENIZER_AVAILABLE = True except ImportError: ENHANCED_TOKENIZER_AVAILABLE = False print("⚠️ Enhanced tokenizer not available") try: from kgirl.dual_llm_orchestrator import DualLLMOrchestrator, HTTPConfig, OrchestratorSettings DUAL_LLM_AVAILABLE = True except ImportError: DUAL_LLM_AVAILABLE = False print("⚠️ Dual LLM orchestrator not available") try: from kgirl.distributed_knowledge_base import DistributedKnowledgeBase, KnowledgeBaseConfig KNOWLEDGE_BASE_AVAILABLE = True except ImportError: KNOWLEDGE_BASE_AVAILABLE = False print("⚠️ Distributed knowledge base not available") class SecondLLMTrainer: """Trains a second LLM on comprehensive data and integrates with wavecaster.""" def __init__(self): self.enhanced_tokenizer = None self.training_data = [] self.second_llm_config = None self.knowledge_base = None self._initialize_components() def _initialize_components(self): """Initialize all training components.""" print("🚀 Initializing Second LLM Trainer...") if ENHANCED_TOKENIZER_AVAILABLE: try: self.enhanced_tokenizer = MinimalEnhancedTokenizer() print("✅ Enhanced Tokenizer initialized") except Exception as e: print(f"❌ Enhanced Tokenizer failed: {e}") if KNOWLEDGE_BASE_AVAILABLE: try: config = KnowledgeBaseConfig( db_path="second_llm_knowledge.db", faiss_index_path="second_llm_faiss_index", embedding_dimension=384 # Match sentence-transformers ) self.knowledge_base = DistributedKnowledgeBase(config) print("✅ Distributed Knowledge Base initialized") except Exception as e: print(f"❌ Knowledge Base failed: {e}") def load_comprehensive_training_data(self) -> List[Dict[str, Any]]: """Load all available training data sources.""" print("📁 Loading comprehensive training data...") training_files = [ "processed_training_data.jsonl", "comprehensive_training_data.jsonl", "matrix_training_data.jsonl", "training_data_emergent.jsonl" ] all_training_data = [] for file_path in training_files: if Path(file_path).exists(): print(f" 📄 Loading {file_path}") try: with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line: try: data = json.loads(line) data["source_file"] = file_path all_training_data.append(data) except json.JSONDecodeError as e: print(f" ⚠️ JSON decode error in line {line_num}: {e}") except Exception as e: print(f" ❌ Error loading {file_path}: {e}") else: print(f" ⚠️ File not found: {file_path}") print(f"✅ Loaded {len(all_training_data)} training entries") return all_training_data async def create_training_prompts(self, training_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Create training prompts from comprehensive data.""" print("🔧 Creating training prompts...") training_prompts = [] for i, entry in enumerate(training_data): # Extract content content = entry.get("content", "") if not content: # Try prompt/completion format prompt = entry.get("prompt", "") completion = entry.get("completion", "") content = f"{prompt} {completion}" if not content: continue # Process with enhanced tokenizer for analysis if self.enhanced_tokenizer: try: tokenizer_result = await self.enhanced_tokenizer.tokenize(content) # Create training prompt based on content type content_type = tokenizer_result.semantic_features.get("content_type", "general") if content_type == "academic": training_prompt = { "prompt": f"Analyze this academic content and provide insights:", "completion": content, "instruction": "Provide detailed academic analysis with key insights and implications.", "category": "academic_analysis" } elif content_type == "code": training_prompt = { "prompt": f"Explain this code and suggest improvements:", "completion": content, "instruction": "Provide code explanation, analysis, and improvement suggestions.", "category": "code_analysis" } elif content_type == "mathematical": training_prompt = { "prompt": f"Solve and explain this mathematical content:", "completion": content, "instruction": "Provide step-by-step mathematical solution and explanation.", "category": "mathematical_analysis" } else: training_prompt = { "prompt": f"Summarize and analyze this content:", "completion": content, "instruction": "Provide comprehensive summary and analysis.", "category": "general_analysis" } # Add metadata training_prompt.update({ "id": f"second_llm_{i+1}", "source_entry": entry.get("id", f"entry_{i+1}"), "source_file": entry.get("source_file", "unknown"), "tokenizer_analysis": { "tokens": tokenizer_result.token_count, "entities": len(tokenizer_result.entities), "math_expressions": len(tokenizer_result.math_expressions), "fractal_features": tokenizer_result.fractal_features }, "created_at": datetime.now().isoformat() }) training_prompts.append(training_prompt) except Exception as e: print(f" ⚠️ Tokenizer processing failed for entry {i+1}: {e}") # Create basic training prompt training_prompt = { "id": f"second_llm_{i+1}", "prompt": "Analyze this content:", "completion": content, "instruction": "Provide analysis and insights.", "category": "general_analysis", "source_entry": entry.get("id", f"entry_{i+1}"), "source_file": entry.get("source_file", "unknown"), "created_at": datetime.now().isoformat() } training_prompts.append(training_prompt) # Small delay to prevent overwhelming await asyncio.sleep(0.01) print(f"✅ Created {len(training_prompts)} training prompts") return training_prompts async def populate_knowledge_base(self, training_prompts: List[Dict[str, Any]]): """Populate the knowledge base with training data.""" if not self.knowledge_base: print("⚠️ Knowledge base not available, skipping population") return print("🗄️ Populating knowledge base...") try: await self.knowledge_base.initialize() for prompt in training_prompts: content = f"{prompt['prompt']} {prompt['completion']}" # Create embedding (simplified - in production use real embeddings) import numpy as np embedding = np.random.randn(384) # 384-dimensional embedding metadata = { "category": prompt["category"], "instruction": prompt["instruction"], "source_file": prompt["source_file"], "tokenizer_analysis": prompt.get("tokenizer_analysis", {}) } node_id = await self.knowledge_base.add_knowledge_node( content=content, embedding=embedding, source="second_llm_training", metadata=metadata ) print(f" ✅ Added knowledge node: {node_id}") print(f"✅ Knowledge base populated with {len(training_prompts)} nodes") except Exception as e: print(f"❌ Knowledge base population failed: {e}") def create_second_llm_config(self, training_prompts: List[Dict[str, Any]]) -> Dict[str, Any]: """Create configuration for the second LLM.""" print("⚙️ Creating second LLM configuration...") # Analyze training data characteristics categories = {} total_tokens = 0 total_entities = 0 total_math_expressions = 0 for prompt in training_prompts: category = prompt["category"] categories[category] = categories.get(category, 0) + 1 tokenizer_analysis = prompt.get("tokenizer_analysis", {}) total_tokens += tokenizer_analysis.get("tokens", 0) total_entities += tokenizer_analysis.get("entities", 0) total_math_expressions += tokenizer_analysis.get("math_expressions", 0) # Create specialized configuration config = { "model_name": "second_llm_wavecaster", "version": "1.0", "training_data": { "total_prompts": len(training_prompts), "categories": categories, "total_tokens": total_tokens, "total_entities": total_entities, "total_math_expressions": total_math_expressions }, "capabilities": { "academic_analysis": "academic_analysis" in categories, "code_analysis": "code_analysis" in categories, "mathematical_analysis": "mathematical_analysis" in categories, "general_analysis": "general_analysis" in categories, "entity_recognition": total_entities > 0, "mathematical_processing": total_math_expressions > 0 }, "specialization": self._determine_specialization(categories), "performance_characteristics": { "avg_tokens_per_prompt": total_tokens / len(training_prompts) if training_prompts else 0, "complexity_score": (total_entities + total_math_expressions) / len(training_prompts) if training_prompts else 0 }, "integration": { "wavecaster_compatible": True, "dual_llm_ready": True, "knowledge_base_enabled": self.knowledge_base is not None }, "created_at": datetime.now().isoformat() } self.second_llm_config = config print(f"✅ Second LLM configuration created") print(f" 🎯 Specialization: {config['specialization']}") print(f" 📊 Categories: {list(categories.keys())}") print(f" 🔢 Total prompts: {len(training_prompts)}") return config def _determine_specialization(self, categories: Dict[str, int]) -> str: """Determine the LLM's primary specialization.""" if not categories: return "general" total = sum(categories.values()) academic_ratio = categories.get("academic_analysis", 0) / total code_ratio = categories.get("code_analysis", 0) / total math_ratio = categories.get("mathematical_analysis", 0) / total if academic_ratio > 0.4: return "academic_research" elif code_ratio > 0.4: return "code_analysis" elif math_ratio > 0.4: return "mathematical_processing" elif academic_ratio + code_ratio + math_ratio > 0.6: return "multi_domain_analysis" else: return "general_analysis" def create_dual_llm_integration(self) -> Dict[str, Any]: """Create dual LLM integration configuration.""" print("🔗 Creating dual LLM integration...") if not DUAL_LLM_AVAILABLE: print("⚠️ Dual LLM orchestrator not available") return {} # Create configurations for dual LLM system local_configs = [ { "base_url": "http://localhost:11434", # Ollama/LMF2 "mode": "llama-cpp", "model": "llama2", "timeout": 120 } ] # Second LLM configuration (could be local or remote) second_llm_config = { "base_url": "http://localhost:11435", # Second LLM endpoint "mode": "llama-cpp", "model": "second_llm_wavecaster", "timeout": 120 } orchestrator_settings = { "temperature": 0.7, "max_tokens": 1024, "style": "analytical", "max_context_chars": 12000 # Increased for comprehensive analysis } integration_config = { "dual_llm_setup": { "primary_llm": local_configs[0], "secondary_llm": second_llm_config, "orchestrator_settings": orchestrator_settings }, "specialization_roles": { "primary_llm": "general_inference_and_decision_making", "secondary_llm": "specialized_analysis_and_insights" }, "workflow": { "step1": "Primary LLM processes user request", "step2": "Secondary LLM provides specialized analysis", "step3": "Orchestrator combines insights", "step4": "Final response with enhanced analysis" }, "knowledge_integration": { "knowledge_base_enabled": self.knowledge_base is not None, "embedding_search": True, "context_enhancement": True } } print("✅ Dual LLM integration configuration created") return integration_config def save_training_results(self, training_prompts: List[Dict[str, Any]], config: Dict[str, Any], integration_config: Dict[str, Any]): """Save all training results and configurations.""" print("💾 Saving training results...") # Save training prompts with open("second_llm_training_prompts.jsonl", 'w', encoding='utf-8') as f: for prompt in training_prompts: f.write(json.dumps(prompt, ensure_ascii=False) + '\n') # Save configuration with open("second_llm_config.json", 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) # Save integration configuration with open("dual_llm_integration_config.json", 'w', encoding='utf-8') as f: json.dump(integration_config, f, indent=2, ensure_ascii=False) # Create comprehensive summary summary = { "second_llm_training": { "total_prompts": len(training_prompts), "categories": config["training_data"]["categories"], "specialization": config["specialization"], "capabilities": config["capabilities"] }, "dual_llm_integration": { "primary_llm": integration_config.get("dual_llm_setup", {}).get("primary_llm", {}), "secondary_llm": integration_config.get("dual_llm_setup", {}).get("secondary_llm", {}), "specialization_roles": integration_config.get("specialization_roles", {}) }, "knowledge_base": { "enabled": self.knowledge_base is not None, "nodes_added": len(training_prompts) if self.knowledge_base else 0 }, "files_created": [ "second_llm_training_prompts.jsonl", "second_llm_config.json", "dual_llm_integration_config.json" ], "timestamp": datetime.now().isoformat() } with open("second_llm_training_summary.json", 'w', encoding='utf-8') as f: json.dump(summary, f, indent=2, ensure_ascii=False) print("✅ Training results saved:") print(" 📁 second_llm_training_prompts.jsonl") print(" 📁 second_llm_config.json") print(" 📁 dual_llm_integration_config.json") print(" 📁 second_llm_training_summary.json") def print_training_summary(self, training_prompts: List[Dict[str, Any]], config: Dict[str, Any], integration_config: Dict[str, Any]): """Print comprehensive training summary.""" print("\n📊 Second LLM Training Summary") print("=" * 40) print(f"📝 Total training prompts: {len(training_prompts)}") print(f"🎯 Specialization: {config['specialization']}") print(f"🔢 Total tokens: {config['training_data']['total_tokens']:,}") print(f"🏷️ Total entities: {config['training_data']['total_entities']}") print(f"🧮 Math expressions: {config['training_data']['total_math_expressions']}") print(f"\n📋 Capabilities:") for capability, enabled in config["capabilities"].items(): status = "✅" if enabled else "❌" print(f" {status} {capability}") print(f"\n📁 Categories:") for category, count in config["training_data"]["categories"].items(): percentage = (count / len(training_prompts)) * 100 print(f" {category}: {count} prompts ({percentage:.1f}%)") print(f"\n🔗 Dual LLM Integration:") print(f" Primary LLM: {integration_config.get('dual_llm_setup', {}).get('primary_llm', {}).get('model', 'N/A')}") print(f" Secondary LLM: {integration_config.get('dual_llm_setup', {}).get('secondary_llm', {}).get('model', 'N/A')}") print(f" Knowledge Base: {'✅ Enabled' if self.knowledge_base else '❌ Disabled'}") print(f"\n🚀 Second LLM ready for wavecaster integration!") async def run_complete_training(self): """Run the complete second LLM training process.""" print("🚀 Second LLM Training Process") print("=" * 40) # 1. Load comprehensive training data training_data = self.load_comprehensive_training_data() if not training_data: print("❌ No training data available") return # 2. Create training prompts training_prompts = await self.create_training_prompts(training_data) if not training_prompts: print("❌ No training prompts created") return # 3. Populate knowledge base await self.populate_knowledge_base(training_prompts) # 4. Create second LLM configuration config = self.create_second_llm_config(training_prompts) # 5. Create dual LLM integration integration_config = self.create_dual_llm_integration() # 6. Save all results self.save_training_results(training_prompts, config, integration_config) # 7. Print summary self.print_training_summary(training_prompts, config, integration_config) return { "training_prompts": training_prompts, "config": config, "integration_config": integration_config } async def main(): """Main training function.""" trainer = SecondLLMTrainer() # Run complete training results = await trainer.run_complete_training() if results: print("\n🎉 Second LLM training completed successfully!") print("🔗 Ready for dual LLM wavecaster integration!") return results else: print("\n❌ Training failed") return None if __name__ == "__main__": asyncio.run(main())