from langgraph.graph import StateGraph, END from typing import TypedDict, List, Dict from .research_agent import ResearchAgent from .verification_agent import VerificationAgent from .relevance_checker import RelevanceChecker from langchain_core.documents.base import Document # from langchain.retrievers import EnsembleRetriever from langchain_classic.retrievers.ensemble import EnsembleRetriever import logging logger = logging.getLogger(__name__) class AgentState(TypedDict): question: str documents: List[Document] draft_answer: str verification_report: str is_relevant: bool retriever: EnsembleRetriever class AgentWorkflow: def __init__(self): self.researcher = ResearchAgent() self.verifier = VerificationAgent() self.relevance_checker = RelevanceChecker() self.compiled_workflow = self.build_workflow() # Compile once during initialization def build_workflow(self): """Create and compile the multi-agent workflow.""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("check_relevance", self._check_relevance_step) workflow.add_node("research", self._research_step) workflow.add_node("verify", self._verification_step) # Define edges workflow.set_entry_point("check_relevance") workflow.add_conditional_edges( "check_relevance", self._decide_after_relevance_check, { "relevant": "research", "irrelevant": END } ) workflow.add_edge("research", "verify") workflow.add_conditional_edges( "verify", self._decide_next_step, { "re_research": "research", "end": END } ) return workflow.compile() def _check_relevance_step(self, state: AgentState) -> Dict: retriever = state["retriever"] classification = self.relevance_checker.check( question=state["question"], retriever=retriever, k=20 ) if classification == "CAN_ANSWER": # We have enough info to proceed return {"is_relevant": True} elif classification == "PARTIAL": # There's partial coverage, but we can still proceed return { "is_relevant": True } else: # classification == "NO_MATCH" # Generate verification report for out-of-context questions verification_report = self.verifier.generate_out_of_context_report() return { "is_relevant": False, "draft_answer": "This question isn't related (or there's no data) for your query. Please ask another question relevant to the uploaded document(s).", "verification_report": verification_report } def _decide_after_relevance_check(self, state: AgentState) -> str: decision = "relevant" if state["is_relevant"] else "irrelevant" print(f"[DEBUG] _decide_after_relevance_check -> {decision}") return decision def full_pipeline(self, question: str, retriever: EnsembleRetriever): try: print(f"[DEBUG] Starting full_pipeline with question='{question}'") documents = retriever.invoke(question) logger.info(f"Retrieved {len(documents)} relevant documents (from .invoke)") # print(documents) initial_state = AgentState( question=question, documents=documents, draft_answer="", verification_report="", is_relevant=False, retriever=retriever ) final_state = self.compiled_workflow.invoke(initial_state) return { "draft_answer": final_state["draft_answer"], "verification_report": final_state["verification_report"] } except Exception as e: logger.error(f"Workflow execution failed: {e}") raise def _research_step(self, state: AgentState) -> Dict: print(f"[DEBUG] Entered _research_step with question='{state['question']}'") result = self.researcher.generate(state["question"], state["documents"]) print("[DEBUG] Researcher returned draft answer.") return {"draft_answer": result["draft_answer"]} def _verification_step(self, state: AgentState) -> Dict: print("[DEBUG] Entered _verification_step. Verifying the draft answer...") result = self.verifier.check(state["draft_answer"], state["documents"]) print("[DEBUG] VerificationAgent returned a verification report.") return {"verification_report": result["verification_report"]} def _decide_next_step(self, state: AgentState) -> str: verification_report = state["verification_report"] print(f"[DEBUG] _decide_next_step with verification_report='{verification_report}'") if "Supported: NO" in verification_report or "Relevant: NO" in verification_report: logger.info("[DEBUG] Verification indicates re-research needed.") return "re_research" else: logger.info("[DEBUG] Verification successful, ending workflow.") return "end"