""" ⚡ Speed-Optimized Multi-Agent RAG System for Complex Questions 병렬 처리, 스마트 캐싱, 동적 파이프라인으로 복잡한 질문도 빠르게 처리 """ import os import json import time import asyncio import hashlib from typing import Optional, List, Dict, Any, Tuple, Generator, AsyncGenerator from datetime import datetime, timedelta from enum import Enum from collections import deque import threading import queue from concurrent.futures import ThreadPoolExecutor, as_completed import aiohttp import requests import gradio as gr from pydantic import BaseModel, Field from dotenv import load_dotenv # 환경변수 로드 load_dotenv() # ============================================================================ # 데이터 모델 정의 # ============================================================================ class AgentRole(Enum): """에이전트 역할 정의""" SUPERVISOR = "supervisor" CREATIVE = "creative" CRITIC = "critic" FINALIZER = "finalizer" class ExecutionMode(Enum): """실행 모드 정의""" PARALLEL = "parallel" # 병렬 처리 SEQUENTIAL = "sequential" # 순차 처리 HYBRID = "hybrid" # 하이브리드 class Message(BaseModel): role: str content: str timestamp: Optional[datetime] = None class AgentResponse(BaseModel): role: AgentRole content: str processing_time: float metadata: Optional[Dict] = None # ============================================================================ # 스마트 캐싱 시스템 # ============================================================================ class SmartCache: """지능형 캐싱 시스템""" def __init__(self, max_size: int = 100, ttl_hours: int = 24): self.cache = {} self.access_count = {} self.timestamps = {} self.max_size = max_size self.ttl = timedelta(hours=ttl_hours) self.reasoning_patterns = self._init_reasoning_patterns() def _init_reasoning_patterns(self) -> Dict: """자주 사용되는 추론 패턴 초기화""" return { "analysis": { "structure": ["현황 분석", "핵심 요인", "영향 평가", "전략 제안"], "keywords": ["분석", "평가", "영향", "전략"] }, "comparison": { "structure": ["대상 정의", "비교 기준", "장단점 분석", "결론"], "keywords": ["비교", "차이", "장단점", "vs"] }, "creative": { "structure": ["문제 정의", "창의적 접근", "구현 방법", "예상 효과"], "keywords": ["창의적", "혁신적", "새로운", "아이디어"] }, "technical": { "structure": ["기술 개요", "핵심 원리", "구현 상세", "실용 예시"], "keywords": ["기술", "구현", "코드", "시스템"] } } def get_query_hash(self, query: str) -> str: """쿼리 해시 생성""" return hashlib.md5(query.encode()).hexdigest() def get(self, query: str) -> Optional[Dict]: """캐시에서 조회""" query_hash = self.get_query_hash(query) if query_hash in self.cache: # TTL 체크 if datetime.now() - self.timestamps[query_hash] < self.ttl: self.access_count[query_hash] += 1 return self.cache[query_hash] else: # 만료된 캐시 삭제 del self.cache[query_hash] del self.timestamps[query_hash] del self.access_count[query_hash] return None def set(self, query: str, response: Dict): """캐시에 저장""" query_hash = self.get_query_hash(query) # 캐시 크기 관리 if len(self.cache) >= self.max_size: # LRU 정책: 가장 적게 사용된 항목 제거 least_used = min(self.access_count, key=self.access_count.get) del self.cache[least_used] del self.timestamps[least_used] del self.access_count[least_used] self.cache[query_hash] = response self.timestamps[query_hash] = datetime.now() self.access_count[query_hash] = 1 def get_reasoning_pattern(self, query: str) -> Optional[Dict]: """쿼리에 적합한 추론 패턴 반환""" query_lower = query.lower() for pattern_type, pattern_data in self.reasoning_patterns.items(): if any(keyword in query_lower for keyword in pattern_data["keywords"]): return { "type": pattern_type, "structure": pattern_data["structure"] } return None # ============================================================================ # 병렬 처리 최적화 Brave Search # ============================================================================ class AsyncBraveSearch: """비동기 Brave 검색 클라이언트""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("BRAVE_SEARCH_API_KEY") self.base_url = "https://api.search.brave.com/res/v1/web/search" async def search_async(self, query: str, count: int = 5) -> List[Dict]: """비동기 검색""" if not self.api_key: return [] headers = { "Accept": "application/json", "X-Subscription-Token": self.api_key } params = { "q": query, "count": count, "text_decorations": False, "search_lang": "ko", "country": "KR" } try: async with aiohttp.ClientSession() as session: async with session.get( self.base_url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=5) ) as response: if response.status == 200: data = await response.json() results = [] if "web" in data and "results" in data["web"]: for item in data["web"]["results"][:count]: results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "description": item.get("description", ""), "age": item.get("age", "") }) return results except: return [] return [] # ============================================================================ # 최적화된 Fireworks 클라이언트 # ============================================================================ class OptimizedFireworksClient: """최적화된 LLM 클라이언트""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("FIREWORKS_API_KEY") if not self.api_key: raise ValueError("FIREWORKS_API_KEY is required!") self.base_url = "https://api.fireworks.ai/inference/v1/chat/completions" self.headers = { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } # 항상 최고 성능 모델 사용 (복잡한 질문 전제) self.model = "accounts/fireworks/models/qwen3-235b-a22b-instruct-2507" async def chat_stream_async( self, messages: List[Dict], **kwargs ) -> AsyncGenerator[str, None]: """비동기 스트리밍 대화""" payload = { "model": self.model, "messages": messages, "max_tokens": kwargs.get("max_tokens", 2000), "temperature": kwargs.get("temperature", 0.7), "top_p": kwargs.get("top_p", 1.0), "top_k": kwargs.get("top_k", 40), "stream": True } try: async with aiohttp.ClientSession() as session: async with session.post( self.base_url, headers={**self.headers, "Accept": "text/event-stream"}, json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as response: async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith("data: "): data_str = line_str[6:] if data_str == "[DONE]": break try: data = json.loads(data_str) if "choices" in data and len(data["choices"]) > 0: delta = data["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except json.JSONDecodeError: continue except Exception as e: yield f"오류: {str(e)}" # ============================================================================ # 경량화된 추론 체인 # ============================================================================ class LightweightReasoningChain: """빠른 추론을 위한 템플릿 기반 시스템""" def __init__(self): self.templates = { "problem_solving": { "steps": ["문제 분해", "핵심 요인", "해결 방안", "구현 전략"], "prompt": "체계적으로 단계별로 분석하고 해결책을 제시하세요." }, "creative_thinking": { "steps": ["기존 접근", "창의적 대안", "혁신 포인트", "실행 방법"], "prompt": "기존 방식을 넘어선 창의적이고 혁신적인 접근을 제시하세요." }, "critical_analysis": { "steps": ["현황 평가", "강점/약점", "기회/위협", "개선 방향"], "prompt": "비판적 관점에서 철저히 분석하고 개선점을 도출하세요." } } def get_reasoning_structure(self, query_type: str) -> Dict: """쿼리 유형에 맞는 추론 구조 반환""" # 기본값은 problem_solving return self.templates.get(query_type, self.templates["problem_solving"]) # ============================================================================ # 조기 종료 메커니즘 # ============================================================================ class QualityChecker: """품질 체크 및 조기 종료 결정""" def __init__(self, min_quality: float = 0.75): self.min_quality = min_quality self.quality_metrics = { "length": 0.2, "structure": 0.3, "completeness": 0.3, "clarity": 0.2 } def evaluate_response(self, response: str, query: str) -> Tuple[float, bool]: """응답 품질 평가""" scores = {} # 길이 평가 scores["length"] = min(len(response) / 1000, 1.0) # 1000자 기준 # 구조 평가 structure_markers = ["1.", "2.", "•", "-", "첫째", "둘째", "결론", "요약"] scores["structure"] = sum(1 for m in structure_markers if m in response) / len(structure_markers) # 완전성 평가 (쿼리 키워드 포함 여부) query_words = set(query.split()) response_words = set(response.split()) scores["completeness"] = len(query_words & response_words) / max(len(query_words), 1) # 명확성 평가 (문장 구조) sentences = response.split('.') avg_sentence_length = sum(len(s.split()) for s in sentences) / max(len(sentences), 1) scores["clarity"] = min(avg_sentence_length / 20, 1.0) # 20단어 기준 # 가중 평균 계산 total_score = sum( scores[metric] * weight for metric, weight in self.quality_metrics.items() ) should_continue = total_score < self.min_quality return total_score, should_continue # ============================================================================ # 스트리밍 최적화 # ============================================================================ class OptimizedStreaming: """스트리밍 버퍼 최적화""" def __init__(self, chunk_size: int = 100, flush_interval: float = 0.1): self.chunk_size = chunk_size self.flush_interval = flush_interval self.buffer = "" self.last_flush = time.time() async def buffer_and_yield( self, stream: AsyncGenerator[str, None] ) -> AsyncGenerator[str, None]: """버퍼링된 스트리밍""" async for chunk in stream: self.buffer += chunk current_time = time.time() if (len(self.buffer) >= self.chunk_size or current_time - self.last_flush >= self.flush_interval): yield self.buffer self.buffer = "" self.last_flush = current_time # 남은 버퍼 플러시 if self.buffer: yield self.buffer # ============================================================================ # 통합 최적화 멀티 에이전트 시스템 # ============================================================================ class SpeedOptimizedMultiAgentSystem: """속도 최적화된 멀티 에이전트 시스템""" def __init__(self): self.llm = OptimizedFireworksClient() self.search = AsyncBraveSearch() self.cache = SmartCache() self.reasoning = LightweightReasoningChain() self.quality_checker = QualityChecker() self.streaming = OptimizedStreaming() # 컴팩트 프롬프트 self.compact_prompts = self._init_compact_prompts() # 병렬 처리 풀 self.executor = ThreadPoolExecutor(max_workers=4) def _init_compact_prompts(self) -> Dict: """압축된 고효율 프롬프트""" return { AgentRole.SUPERVISOR: """[감독자-구조설계] 즉시분석: 핵심의도+필요정보+답변구조 출력: 5개 핵심포인트(각 1문장) 추론체계 명시""", AgentRole.CREATIVE: """[창의성생성자] 입력구조 따라 창의적 확장 실용예시+혁신접근+구체조언 불필요설명 제거""", AgentRole.CRITIC: """[비평자-검증] 신속검토: 정확성/논리성/실용성 개선포인트 3개만 각 2문장 이내""", AgentRole.FINALIZER: """[최종통합] 모든의견 종합→최적답변 명확구조+실용정보+창의균형 핵심먼저+상세는후순위""" } async def parallel_process_agents( self, query: str, search_results: List[Dict], show_progress: bool = True ) -> AsyncGenerator[Tuple[str, str], None]: """병렬 처리 파이프라인""" start_time = time.time() search_context = self._format_search_results(search_results) accumulated_response = "" agent_thoughts = "" # 캐시 확인 cached = self.cache.get(query) if cached: yield cached["response"], "✨ 캐시에서 즉시 로드" return # 추론 패턴 결정 reasoning_pattern = self.cache.get_reasoning_pattern(query) try: # === 1단계: 감독자 + 검색 병렬 실행 === if show_progress: agent_thoughts = "### 🚀 병렬 처리 시작\n" agent_thoughts += "👔 감독자 분석 + 🔍 추가 검색 동시 진행...\n\n" yield accumulated_response, agent_thoughts # 감독자 프롬프트 supervisor_prompt = f""" 질문: {query} 검색결과: {search_context} 추론패턴: {reasoning_pattern} 즉시 핵심구조 5개 제시""" supervisor_response = "" supervisor_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.SUPERVISOR]}, {"role": "user", "content": supervisor_prompt} ], temperature=0.3, max_tokens=500 ) # 감독자 스트리밍 (버퍼링) async for chunk in self.streaming.buffer_and_yield(supervisor_task): supervisor_response += chunk if show_progress and len(supervisor_response) < 300: agent_thoughts = f"### 👔 감독자 분석\n{supervisor_response[:300]}...\n\n" yield accumulated_response, agent_thoughts # === 2단계: 창의성 + 비평 준비 병렬 === if show_progress: agent_thoughts += "### 🎨 창의성 생성자 + 🔍 비평자 준비...\n\n" yield accumulated_response, agent_thoughts # 창의성 생성 시작 creative_prompt = f""" 질문: {query} 감독자구조: {supervisor_response} 검색결과: {search_context} 창의적+실용적 답변 즉시생성""" creative_response = "" creative_partial = "" # 비평자용 부분 응답 critic_started = False critic_response = "" creative_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.CREATIVE]}, {"role": "user", "content": creative_prompt} ], temperature=0.8, max_tokens=1500 ) # 창의성 스트리밍 + 비평자 조기 시작 async for chunk in self.streaming.buffer_and_yield(creative_task): creative_response += chunk creative_partial += chunk # 창의성 응답이 500자 넘으면 비평자 시작 if len(creative_partial) > 500 and not critic_started: critic_started = True # 비평자 비동기 시작 critic_prompt = f""" 원본질문: {query} 창의성답변(일부): {creative_partial} 신속검토→개선점3개""" critic_task = asyncio.create_task( self._run_critic_async(critic_prompt) ) if show_progress: display_creative = creative_response[:400] + "..." if len(creative_response) > 400 else creative_response agent_thoughts = f"### 🎨 창의성 생성자\n{display_creative}\n\n" yield accumulated_response, agent_thoughts # 비평자 결과 대기 if critic_started: critic_response = await critic_task if show_progress: agent_thoughts += f"### 🔍 비평자 검토\n{critic_response[:200]}...\n\n" yield accumulated_response, agent_thoughts # === 3단계: 품질 체크 및 조기 종료 === quality_score, need_more = self.quality_checker.evaluate_response( creative_response, query ) if not need_more and quality_score > 0.85: # 품질이 충분히 높으면 바로 반환 accumulated_response = creative_response if show_progress: agent_thoughts += f"### ✅ 품질 충족 (점수: {quality_score:.2f})\n조기 완료!\n" # 캐시 저장 self.cache.set(query, { "response": accumulated_response, "timestamp": datetime.now() }) yield accumulated_response, agent_thoughts return # === 4단계: 최종 통합 (스트리밍) === if show_progress: agent_thoughts += "### ✅ 최종 통합 중...\n\n" yield accumulated_response, agent_thoughts final_prompt = f""" 질문: {query} 창의성답변: {creative_response} 비평피드백: {critic_response} 감독자구조: {supervisor_response} 최종통합→완벽답변""" final_task = self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.FINALIZER]}, {"role": "user", "content": final_prompt} ], temperature=0.5, max_tokens=2500 ) # 최종 답변 스트리밍 accumulated_response = "" async for chunk in self.streaming.buffer_and_yield(final_task): accumulated_response += chunk yield accumulated_response, agent_thoughts # 처리 시간 추가 processing_time = time.time() - start_time accumulated_response += f"\n\n---\n⚡ 처리 시간: {processing_time:.1f}초" # 캐시 저장 self.cache.set(query, { "response": accumulated_response, "timestamp": datetime.now() }) yield accumulated_response, agent_thoughts except Exception as e: error_msg = f"❌ 오류 발생: {str(e)}" yield error_msg, agent_thoughts async def _run_critic_async(self, prompt: str) -> str: """비평자 비동기 실행""" try: response = "" async for chunk in self.llm.chat_stream_async( messages=[ {"role": "system", "content": self.compact_prompts[AgentRole.CRITIC]}, {"role": "user", "content": prompt} ], temperature=0.2, max_tokens=500 ): response += chunk return response except: return "비평 처리 중 오류" def _format_search_results(self, results: List[Dict]) -> str: """검색 결과 압축 포맷""" if not results: return "검색결과없음" formatted = [] for i, r in enumerate(results[:3], 1): # 상위 3개만 formatted.append(f"[{i}]{r.get('title','')[:50]}:{r.get('description','')[:100]}") return " | ".join(formatted) # ============================================================================ # Gradio UI (최적화 버전) # ============================================================================ def create_optimized_gradio_interface(): """최적화된 Gradio 인터페이스""" # 시스템 초기화 system = SpeedOptimizedMultiAgentSystem() def process_query_optimized( message: str, history: List[Dict], use_search: bool, show_agent_thoughts: bool, search_count: int ): """최적화된 쿼리 처리 - 동기 버전""" if not message: yield history, "", "" return # 비동기 함수를 동기적으로 실행 try: import nest_asyncio nest_asyncio.apply() except ImportError: pass # nest_asyncio가 없어도 진행 def run_async_function(coro): """비동기 함수를 동기적으로 실행하는 헬퍼""" try: loop = asyncio.get_event_loop() if loop.is_running(): # 이미 실행 중인 루프가 있으면 새 스레드에서 실행 import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, coro) return future.result() else: return loop.run_until_complete(coro) except RuntimeError: # 루프가 없으면 새로 생성 return asyncio.run(coro) try: # 검색 수행 (동기화) search_results = [] search_display = "" if use_search: # 검색 상태 표시 history_with_message = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": "⚡ 고속 처리 중..."} ] yield history_with_message, "", "" # 비동기 검색을 동기적으로 실행 search_results = run_async_function( system.search.search_async(message, count=search_count) ) if search_results: search_display = "## 📚 참고 자료\n\n" for i, result in enumerate(search_results[:3], 1): search_display += f"**{i}. [{result['title'][:50]}]({result['url']})**\n" search_display += f" {result['description'][:100]}...\n\n" # 사용자 메시지 추가 current_history = history + [{"role": "user", "content": message}] # 병렬 처리 실행을 동기적으로 수집 async def collect_responses(): responses = [] async for response, thoughts in system.parallel_process_agents( query=message, search_results=search_results, show_progress=show_agent_thoughts ): responses.append((response, thoughts)) return responses # 모든 응답 수집 all_responses = run_async_function(collect_responses()) # 수집된 응답을 yield for response, thoughts in all_responses: updated_history = current_history + [ {"role": "assistant", "content": response} ] yield updated_history, thoughts, search_display except Exception as e: error_history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": f"❌ 오류: {str(e)}"} ] yield error_history, "", "" # Gradio 인터페이스 with gr.Blocks( title="⚡ Speed-Optimized Multi-Agent System", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px !important; margin: auto !important; } """ ) as demo: gr.Markdown(""" # ⚡ 고속 Multi-Agent RAG System ### 복잡한 질문도 5초 이내 처리 목표 **최적화 기술:** - 🚀 병렬 처리: 에이전트 동시 실행 - 💾 스마트 캐싱: 자주 묻는 패턴 즉시 응답 - ⚡ 스트리밍 버퍼: 네트워크 최적화 - 🎯 조기 종료: 품질 충족 시 즉시 완료 """) with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( height=500, label="💬 대화", type="messages" ) msg = gr.Textbox( label="복잡한 질문 입력", placeholder="분석, 전략, 창의적 해결이 필요한 복잡한 질문을 입력하세요...", lines=3 ) with gr.Row(): submit = gr.Button("⚡ 고속 처리", variant="primary") clear = gr.Button("🔄 초기화") with gr.Accordion("🤖 에이전트 처리 과정", open=False): agent_thoughts = gr.Markdown() with gr.Accordion("📚 검색 소스", open=False): search_sources = gr.Markdown() with gr.Column(scale=1): gr.Markdown("### ⚙️ 설정") use_search = gr.Checkbox( label="🔍 웹 검색 사용", value=True ) show_agent_thoughts = gr.Checkbox( label="🧠 처리 과정 표시", value=True ) search_count = gr.Slider( minimum=3, maximum=10, value=5, step=1, label="검색 결과 수" ) gr.Markdown(""" ### ⚡ 최적화 상태 **활성화된 최적화:** - ✅ 병렬 처리 - ✅ 스마트 캐싱 - ✅ 버퍼 스트리밍 - ✅ 조기 종료 - ✅ 압축 프롬프트 **예상 처리 시간:** - 캐시 히트: < 1초 - 일반 질문: 3-5초 - 복잡한 질문: 5-8초 """) # 복잡한 질문 예제 gr.Examples( examples=[ "AI 기술이 향후 10년간 한국 경제에 미칠 영향을 다각도로 분석하고 대응 전략을 제시해줘", "스타트업이 대기업과 경쟁하기 위한 혁신적인 전략을 단계별로 수립해줘", "기후변화 대응을 위한 창의적인 비즈니스 모델 5가지를 구체적으로 설계해줘", "양자컴퓨터가 현재 암호화 체계에 미칠 영향과 대안을 기술적으로 분석해줘", "메타버스 시대의 교육 혁신 방안을 실제 구현 가능한 수준으로 제안해줘" ], inputs=msg ) # 이벤트 바인딩 submit.click( process_query_optimized, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count], outputs=[chatbot, agent_thoughts, search_sources] ).then( lambda: "", None, msg ) msg.submit( process_query_optimized, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count], outputs=[chatbot, agent_thoughts, search_sources] ).then( lambda: "", None, msg ) clear.click( lambda: ([], "", ""), None, [chatbot, agent_thoughts, search_sources] ) return demo # ============================================================================ # 메인 실행 # ============================================================================ if __name__ == "__main__": print(""" ╔══════════════════════════════════════════════════════════════╗ ║ ⚡ Speed-Optimized Multi-Agent RAG System ⚡ ║ ║ ║ ║ 복잡한 질문도 5초 이내 처리하는 고속 AI 시스템 ║ ║ ║ ║ 최적화 기술: ║ ║ • 병렬 처리 파이프라인 ║ ║ • 스마트 캐싱 시스템 ║ ║ • 스트리밍 버퍼 최적화 ║ ║ • 품질 기반 조기 종료 ║ ║ • 압축 프롬프트 엔지니어링 ║ ╚══════════════════════════════════════════════════════════════╝ """) # API 키 확인 if not os.getenv("FIREWORKS_API_KEY"): print("\n⚠️ FIREWORKS_API_KEY가 설정되지 않았습니다.") if not os.getenv("BRAVE_SEARCH_API_KEY"): print("\n⚠️ BRAVE_SEARCH_API_KEY가 설정되지 않았습니다.") # Gradio 앱 실행 demo = create_optimized_gradio_interface() is_hf_spaces = os.getenv("SPACE_ID") is not None if is_hf_spaces: print("\n🤗 Hugging Face Spaces에서 최적화 모드로 실행 중...") demo.launch(server_name="0.0.0.0", server_port=7860) else: print("\n💻 로컬 환경에서 최적화 모드로 실행 중...") demo.launch(server_name="0.0.0.0", server_port=7860, share=False)