from langchain_community.vectorstores import Chroma from langchain_community.retrievers import BM25Retriever # from langchain.retrievers import EnsembleRetriever from langchain_classic.retrievers.ensemble import EnsembleRetriever from langchain_community.embeddings import HuggingFaceEmbeddings from config.settings import settings import logging import os import hashlib logger = logging.getLogger(__name__) class RetrieverBuilder: def __init__(self): """Initialize the retriever builder with local embeddings.""" # Use sentence-transformers for local embeddings self.embeddings = HuggingFaceEmbeddings( model_name=settings.EMBEDDING_MODEL, model_kwargs={'device': 'cpu'}, # Use 'cuda' if you have GPU encode_kwargs={'normalize_embeddings': True} ) logger.info(f"Initialized local embeddings: {settings.EMBEDDING_MODEL}") def build_hybrid_retriever(self, docs): """Build a hybrid retriever using BM25 and vector-based retrieval. Reuses existing ChromaDB if available and only adds new documents. """ try: # Check if ChromaDB already exists chroma_db_file = os.path.join(settings.CHROMA_DB_PATH, "chroma.sqlite3") chroma_exists = os.path.exists(settings.CHROMA_DB_PATH) and os.path.exists(chroma_db_file) if chroma_exists: logger.info(f"Loading existing ChromaDB from {settings.CHROMA_DB_PATH}") try: # Load existing vector store vector_store = Chroma( persist_directory=settings.CHROMA_DB_PATH, embedding_function=self.embeddings, collection_name=settings.CHROMA_COLLECTION_NAME ) # Get existing document IDs to check for new documents try: existing_data = vector_store.get() existing_ids = set(existing_data.get('ids', [])) if existing_data else set() logger.info(f"Found {len(existing_ids)} existing documents in ChromaDB") except Exception as e: logger.warning(f"Could not retrieve existing IDs from ChromaDB: {e}. Treating as empty.") existing_ids = set() # Filter out documents that already exist (based on content hash) new_docs = [] doc_ids = [] for doc in docs: # Generate a simple ID based on content hash doc_id = hashlib.md5(doc.page_content.encode()).hexdigest() if doc_id not in existing_ids: new_docs.append(doc) doc_ids.append(doc_id) if new_docs: logger.info(f"Adding {len(new_docs)} new documents to ChromaDB") vector_store.add_documents(new_docs, ids=doc_ids) vector_store.persist() else: logger.info("No new documents to add. Using existing ChromaDB.") except Exception as e: logger.warning(f"Failed to load existing ChromaDB: {e}. Creating new one.") # Fall back to creating new DB vector_store = Chroma.from_documents( documents=docs, embedding=self.embeddings, persist_directory=settings.CHROMA_DB_PATH, collection_name=settings.CHROMA_COLLECTION_NAME ) else: logger.info(f"Creating new ChromaDB at {settings.CHROMA_DB_PATH}") # Create new Chroma vector store vector_store = Chroma.from_documents( documents=docs, embedding=self.embeddings, persist_directory=settings.CHROMA_DB_PATH, collection_name=settings.CHROMA_COLLECTION_NAME ) logger.info("Vector store created successfully.") # Create BM25 retriever bm25 = BM25Retriever.from_documents(docs) logger.info("BM25 retriever created successfully.") # Create vector-based retriever vector_retriever = vector_store.as_retriever(search_kwargs={"k": settings.VECTOR_SEARCH_K}) logger.info("Vector retriever created successfully.") # Combine retrievers into a hybrid retriever hybrid_retriever = EnsembleRetriever( retrievers=[bm25, vector_retriever], weights=settings.HYBRID_RETRIEVER_WEIGHTS ) logger.info("Hybrid retriever created successfully.") return hybrid_retriever except Exception as e: logger.error(f"Failed to build hybrid retriever: {e}") raise