#!/usr/bin/env python3 """ Dual LLM Wavecaster Integration ============================== Integrates the second trained LLM with the dual LLM orchestrator and wavecaster system for advanced AI capabilities. """ import json import asyncio import time from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime # Import our systems try: from kgirl.dual_llm_orchestrator import DualLLMOrchestrator, HTTPConfig, OrchestratorSettings DUAL_LLM_AVAILABLE = True except ImportError: DUAL_LLM_AVAILABLE = False print("⚠️ Dual LLM orchestrator not available") try: from kgirl.distributed_knowledge_base import DistributedKnowledgeBase, KnowledgeBaseConfig KNOWLEDGE_BASE_AVAILABLE = True except ImportError: KNOWLEDGE_BASE_AVAILABLE = False print("⚠️ Distributed knowledge base not available") try: from enhanced_tokenizer_minimal import MinimalEnhancedTokenizer ENHANCED_TOKENIZER_AVAILABLE = True except ImportError: ENHANCED_TOKENIZER_AVAILABLE = False print("⚠️ Enhanced tokenizer not available") class DualLLMWavecasterIntegration: """Integrates dual LLM system with wavecaster capabilities.""" def __init__(self): self.orchestrator = None self.knowledge_base = None self.enhanced_tokenizer = None self.second_llm_config = None self.integration_config = None self._initialize_components() def _initialize_components(self): """Initialize all integration components.""" print("🚀 Initializing Dual LLM Wavecaster Integration...") # Load configurations self._load_configurations() # Initialize enhanced tokenizer if ENHANCED_TOKENIZER_AVAILABLE: try: self.enhanced_tokenizer = MinimalEnhancedTokenizer() print("✅ Enhanced Tokenizer initialized") except Exception as e: print(f"❌ Enhanced Tokenizer failed: {e}") # Initialize knowledge base if KNOWLEDGE_BASE_AVAILABLE: try: config = KnowledgeBaseConfig( db_path="second_llm_knowledge.db", faiss_index_path="second_llm_faiss_index", embedding_dimension=384 ) self.knowledge_base = DistributedKnowledgeBase(config) print("✅ Distributed Knowledge Base initialized") except Exception as e: print(f"❌ Knowledge Base failed: {e}") # Initialize dual LLM orchestrator if DUAL_LLM_AVAILABLE and self.integration_config: try: self._setup_dual_llm_orchestrator() print("✅ Dual LLM Orchestrator initialized") except Exception as e: print(f"❌ Dual LLM Orchestrator failed: {e}") def _load_configurations(self): """Load second LLM and integration configurations.""" print("📁 Loading configurations...") # Load second LLM config if Path("second_llm_config.json").exists(): with open("second_llm_config.json", 'r') as f: self.second_llm_config = json.load(f) print("✅ Second LLM configuration loaded") else: print("⚠️ Second LLM configuration not found") # Load integration config if Path("dual_llm_integration_config.json").exists(): with open("dual_llm_integration_config.json", 'r') as f: self.integration_config = json.load(f) print("✅ Integration configuration loaded") else: print("⚠️ Integration configuration not found") def _setup_dual_llm_orchestrator(self): """Setup the dual LLM orchestrator.""" if not self.integration_config: return dual_llm_setup = self.integration_config.get("dual_llm_setup", {}) # Create local LLM configs (primary LLM) local_configs = [dual_llm_setup.get("primary_llm", {})] # Create resource LLM config (secondary LLM) remote_config = dual_llm_setup.get("secondary_llm", {}) # Create orchestrator settings settings = dual_llm_setup.get("orchestrator_settings", {}) # Create orchestrator from kgirl.dual_llm_orchestrator import create_orchestrator self.orchestrator = create_orchestrator( local_configs=local_configs, remote_config=remote_config, settings=settings ) async def initialize_knowledge_base(self): """Initialize the knowledge base.""" if self.knowledge_base: try: await self.knowledge_base.initialize() print("✅ Knowledge base initialized") return True except Exception as e: print(f"❌ Knowledge base initialization failed: {e}") return False return False async def search_knowledge_for_context(self, query: str, k: int = 5) -> List[Dict[str, Any]]: """Search knowledge base for relevant context.""" if not self.knowledge_base: return [] try: # Create query embedding using enhanced tokenizer if self.enhanced_tokenizer: tokenizer_result = await self.enhanced_tokenizer.tokenize(query) query_embedding = tokenizer_result.embeddings else: # Fallback embedding import numpy as np query_embedding = np.random.randn(384) # Search knowledge base knowledge_nodes = await self.knowledge_base.search_knowledge( query=query, query_embedding=query_embedding, k=k ) # Convert to context format context = [] for node in knowledge_nodes: context.append({ "content": node.content, "source": node.source, "coherence_score": node.coherence_score, "metadata": node.metadata }) return context except Exception as e: print(f"⚠️ Knowledge search failed: {e}") return [] async def enhanced_wavecaster_query(self, user_query: str, resource_paths: List[str] = None, inline_resources: List[str] = None) -> Dict[str, Any]: """Enhanced wavecaster query with dual LLM and knowledge integration.""" print(f"🌊 Processing wavecaster query: {user_query[:100]}...") start_time = time.time() # Step 1: Search knowledge base for relevant context knowledge_context = await self.search_knowledge_for_context(user_query) # Step 2: Enhance inline resources with knowledge context enhanced_inline_resources = inline_resources or [] if knowledge_context: knowledge_summary = "\n".join([ f"Knowledge Context {i+1}: {ctx['content'][:200]}..." for i, ctx in enumerate(knowledge_context[:3]) ]) enhanced_inline_resources.append(f"RELEVANT KNOWLEDGE:\n{knowledge_summary}") # Step 3: Process with enhanced tokenizer for analysis tokenizer_analysis = None if self.enhanced_tokenizer: try: tokenizer_result = await self.enhanced_tokenizer.tokenize(user_query) tokenizer_analysis = { "content_type": tokenizer_result.semantic_features.get("content_type", "general"), "entities": len(tokenizer_result.entities), "math_expressions": len(tokenizer_result.math_expressions), "complexity_score": tokenizer_result.semantic_features.get("avg_word_length", 0), "processing_time": tokenizer_result.processing_time } # Add tokenizer insights to inline resources if tokenizer_analysis["content_type"] in ["academic", "code", "mathematical"]: enhanced_inline_resources.append( f"CONTENT ANALYSIS: {tokenizer_analysis['content_type']} content detected with " f"{tokenizer_analysis['entities']} entities and {tokenizer_analysis['math_expressions']} math expressions." ) except Exception as e: print(f"⚠️ Tokenizer analysis failed: {e}") # Step 4: Process with dual LLM orchestrator if self.orchestrator: try: result = await self.orchestrator.run_async( user_prompt=user_query, resource_paths=resource_paths or [], inline_resources=enhanced_inline_resources ) # Step 5: Enhance result with wavecaster metadata enhanced_result = { "query": user_query, "response": result, "wavecaster_metadata": { "processing_time": time.time() - start_time, "knowledge_context_count": len(knowledge_context), "tokenizer_analysis": tokenizer_analysis, "enhanced_resources_count": len(enhanced_inline_resources), "timestamp": datetime.now().isoformat() }, "knowledge_context": knowledge_context, "specialization_used": self.second_llm_config.get("specialization", "general") if self.second_llm_config else "general" } print(f"✅ Wavecaster query completed in {time.time() - start_time:.2f}s") return enhanced_result except Exception as e: print(f"❌ Dual LLM processing failed: {e}") return { "query": user_query, "error": str(e), "wavecaster_metadata": { "processing_time": time.time() - start_time, "timestamp": datetime.now().isoformat() } } else: # Fallback response return { "query": user_query, "response": { "summary": "Dual LLM orchestrator not available", "final": "I'm sorry, the dual LLM system is not currently available. Please check the configuration.", "prompt": user_query }, "wavecaster_metadata": { "processing_time": time.time() - start_time, "timestamp": datetime.now().isoformat() } } async def batch_wavecaster_queries(self, queries: List[str]) -> List[Dict[str, Any]]: """Process multiple wavecaster queries in batch.""" print(f"🌊 Processing {len(queries)} wavecaster queries in batch...") results = [] for i, query in enumerate(queries): print(f" Processing query {i+1}/{len(queries)}") result = await self.enhanced_wavecaster_query(query) results.append(result) # Small delay to prevent overwhelming await asyncio.sleep(0.1) print(f"✅ Batch processing completed: {len(results)} results") return results def create_wavecaster_demo_queries(self) -> List[str]: """Create demo queries for wavecaster testing.""" demo_queries = [ "Explain the relationship between quantum computing and neural networks", "Analyze the mathematical foundations of holographic memory systems", "Create a SQL query for finding high-value customers with complex joins", "Describe the emergent properties of distributed knowledge systems", "What are the key principles of wavecaster technology?", "How does the dual LLM system enhance cognitive processing?", "Analyze the fractal patterns in artificial intelligence systems", "Explain the dimensional entanglement framework in LiMp", "What is the role of semantic embeddings in advanced tokenization?", "Describe the integration between matrix neurons and LLM systems" ] return demo_queries async def run_wavecaster_demo(self) -> Dict[str, Any]: """Run a comprehensive wavecaster demo.""" print("🚀 Running Dual LLM Wavecaster Demo") print("=" * 40) # Initialize knowledge base kb_initialized = await self.initialize_knowledge_base() # Create demo queries demo_queries = self.create_wavecaster_demo_queries() # Process queries print(f"📝 Processing {len(demo_queries)} demo queries...") results = await self.batch_wavecaster_queries(demo_queries) # Analyze results demo_analysis = { "total_queries": len(demo_queries), "successful_queries": len([r for r in results if "error" not in r]), "failed_queries": len([r for r in results if "error" in r]), "average_processing_time": sum( r["wavecaster_metadata"]["processing_time"] for r in results if "wavecaster_metadata" in r ) / len(results) if results else 0, "knowledge_base_used": kb_initialized, "specialization": self.second_llm_config.get("specialization", "general") if self.second_llm_config else "general", "results": results } # Save demo results with open("wavecaster_demo_results.json", 'w', encoding='utf-8') as f: json.dump(demo_analysis, f, indent=2, ensure_ascii=False) print(f"\n📊 Wavecaster Demo Summary:") print(f" ✅ Successful queries: {demo_analysis['successful_queries']}") print(f" ❌ Failed queries: {demo_analysis['failed_queries']}") print(f" ⏱️ Average processing time: {demo_analysis['average_processing_time']:.2f}s") print(f" 🗄️ Knowledge base: {'✅ Used' if kb_initialized else '❌ Not used'}") print(f" 🎯 Specialization: {demo_analysis['specialization']}") return demo_analysis def save_integration_status(self, demo_results: Dict[str, Any]): """Save integration status and capabilities.""" integration_status = { "dual_llm_wavecaster": { "orchestrator_available": self.orchestrator is not None, "knowledge_base_available": self.knowledge_base is not None, "enhanced_tokenizer_available": self.enhanced_tokenizer is not None, "second_llm_config_loaded": self.second_llm_config is not None, "integration_config_loaded": self.integration_config is not None }, "capabilities": { "dual_llm_processing": self.orchestrator is not None, "knowledge_enhanced_queries": self.knowledge_base is not None, "semantic_analysis": self.enhanced_tokenizer is not None, "batch_processing": True, "specialized_analysis": self.second_llm_config.get("specialization") if self.second_llm_config else None }, "demo_results": demo_results, "timestamp": datetime.now().isoformat() } with open("dual_llm_wavecaster_status.json", 'w', encoding='utf-8') as f: json.dump(integration_status, f, indent=2, ensure_ascii=False) print("✅ Integration status saved to dual_llm_wavecaster_status.json") async def main(): """Main function to run dual LLM wavecaster integration.""" print("🚀 Dual LLM Wavecaster Integration") print("=" * 40) # Initialize integration integration = DualLLMWavecasterIntegration() # Run demo demo_results = await integration.run_wavecaster_demo() # Save status integration.save_integration_status(demo_results) print("\n🎉 Dual LLM Wavecaster Integration Complete!") print("🔗 System ready for advanced AI applications!") return demo_results if __name__ == "__main__": asyncio.run(main())