|
|
""" |
|
|
PDF Parser Service for RAG Chatbot |
|
|
Extracts text from PDF and splits into chunks for indexing |
|
|
""" |
|
|
|
|
|
import pypdfium2 as pdfium |
|
|
from typing import List, Dict, Optional |
|
|
import re |
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PDFChunk: |
|
|
"""Represents a chunk of text from PDF""" |
|
|
text: str |
|
|
page_number: int |
|
|
chunk_index: int |
|
|
metadata: Dict |
|
|
|
|
|
|
|
|
class PDFParser: |
|
|
"""Parse PDF files and prepare for RAG indexing""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
chunk_size: int = 500, |
|
|
chunk_overlap: int = 50, |
|
|
min_chunk_size: int = 50 |
|
|
): |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
self.min_chunk_size = min_chunk_size |
|
|
|
|
|
def extract_text_from_pdf(self, pdf_path: str) -> Dict[int, str]: |
|
|
""" |
|
|
Extract text from PDF file |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping page number to text content |
|
|
""" |
|
|
pdf_text = {} |
|
|
|
|
|
try: |
|
|
pdf = pdfium.PdfDocument(pdf_path) |
|
|
|
|
|
for page_num in range(len(pdf)): |
|
|
page = pdf[page_num] |
|
|
textpage = page.get_textpage() |
|
|
text = textpage.get_text_range() |
|
|
|
|
|
|
|
|
text = self._clean_text(text) |
|
|
pdf_text[page_num + 1] = text |
|
|
|
|
|
return pdf_text |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Error reading PDF: {str(e)}") |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean extracted text""" |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
|
|
|
text = text.replace('\x00', '') |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def chunk_text(self, text: str, page_number: int) -> List[PDFChunk]: |
|
|
""" |
|
|
Split text into overlapping chunks |
|
|
|
|
|
Args: |
|
|
text: Text to chunk |
|
|
page_number: Page number this text came from |
|
|
|
|
|
Returns: |
|
|
List of PDFChunk objects |
|
|
""" |
|
|
|
|
|
words = text.split() |
|
|
|
|
|
if len(words) < self.min_chunk_size: |
|
|
|
|
|
if len(words) > 0: |
|
|
return [PDFChunk( |
|
|
text=text, |
|
|
page_number=page_number, |
|
|
chunk_index=0, |
|
|
metadata={'page': page_number, 'chunk': 0} |
|
|
)] |
|
|
return [] |
|
|
|
|
|
chunks = [] |
|
|
chunk_index = 0 |
|
|
start = 0 |
|
|
|
|
|
while start < len(words): |
|
|
|
|
|
end = min(start + self.chunk_size, len(words)) |
|
|
chunk_words = words[start:end] |
|
|
chunk_text = ' '.join(chunk_words) |
|
|
|
|
|
chunks.append(PDFChunk( |
|
|
text=chunk_text, |
|
|
page_number=page_number, |
|
|
chunk_index=chunk_index, |
|
|
metadata={ |
|
|
'page': page_number, |
|
|
'chunk': chunk_index, |
|
|
'start_word': start, |
|
|
'end_word': end |
|
|
} |
|
|
)) |
|
|
|
|
|
chunk_index += 1 |
|
|
|
|
|
|
|
|
start = end - self.chunk_overlap |
|
|
|
|
|
|
|
|
if start >= len(words) - self.min_chunk_size: |
|
|
break |
|
|
|
|
|
return chunks |
|
|
|
|
|
def parse_pdf( |
|
|
self, |
|
|
pdf_path: str, |
|
|
document_metadata: Optional[Dict] = None |
|
|
) -> List[PDFChunk]: |
|
|
""" |
|
|
Parse entire PDF into chunks |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
document_metadata: Additional metadata for the document |
|
|
|
|
|
Returns: |
|
|
List of all chunks from the PDF |
|
|
""" |
|
|
|
|
|
pages_text = self.extract_text_from_pdf(pdf_path) |
|
|
|
|
|
|
|
|
all_chunks = [] |
|
|
for page_num, text in pages_text.items(): |
|
|
chunks = self.chunk_text(text, page_num) |
|
|
|
|
|
|
|
|
if document_metadata: |
|
|
for chunk in chunks: |
|
|
chunk.metadata.update(document_metadata) |
|
|
|
|
|
all_chunks.extend(chunks) |
|
|
|
|
|
return all_chunks |
|
|
|
|
|
def parse_pdf_bytes( |
|
|
self, |
|
|
pdf_bytes: bytes, |
|
|
document_metadata: Optional[Dict] = None |
|
|
) -> List[PDFChunk]: |
|
|
""" |
|
|
Parse PDF from bytes (for uploaded files) |
|
|
|
|
|
Args: |
|
|
pdf_bytes: PDF file as bytes |
|
|
document_metadata: Additional metadata |
|
|
|
|
|
Returns: |
|
|
List of chunks |
|
|
""" |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: |
|
|
tmp.write(pdf_bytes) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
try: |
|
|
chunks = self.parse_pdf(tmp_path, document_metadata) |
|
|
return chunks |
|
|
finally: |
|
|
|
|
|
if os.path.exists(tmp_path): |
|
|
os.unlink(tmp_path) |
|
|
|
|
|
def get_pdf_info(self, pdf_path: str) -> Dict: |
|
|
""" |
|
|
Get basic info about PDF |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
|
|
|
Returns: |
|
|
Dictionary with PDF information |
|
|
""" |
|
|
try: |
|
|
pdf = pdfium.PdfDocument(pdf_path) |
|
|
|
|
|
info = { |
|
|
'num_pages': len(pdf), |
|
|
'file_path': pdf_path, |
|
|
} |
|
|
|
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"Error reading PDF info: {str(e)}") |
|
|
|
|
|
|
|
|
class PDFIndexer: |
|
|
"""Index PDF chunks into RAG system""" |
|
|
|
|
|
def __init__(self, embedding_service, qdrant_service, documents_collection): |
|
|
self.embedding_service = embedding_service |
|
|
self.qdrant_service = qdrant_service |
|
|
self.documents_collection = documents_collection |
|
|
self.parser = PDFParser() |
|
|
|
|
|
def index_pdf( |
|
|
self, |
|
|
pdf_path: str, |
|
|
document_id: str, |
|
|
document_metadata: Optional[Dict] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Index entire PDF into RAG system |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
document_id: Unique ID for this document |
|
|
document_metadata: Additional metadata (title, author, etc.) |
|
|
|
|
|
Returns: |
|
|
Indexing results |
|
|
""" |
|
|
|
|
|
chunks = self.parser.parse_pdf(pdf_path, document_metadata) |
|
|
|
|
|
|
|
|
indexed_count = 0 |
|
|
chunk_ids = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
chunk_id = f"{document_id}_p{chunk.page_number}_c{chunk.chunk_index}" |
|
|
|
|
|
|
|
|
embedding = self.embedding_service.encode_text(chunk.text) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'text': chunk.text, |
|
|
'document_id': document_id, |
|
|
'page': chunk.page_number, |
|
|
'chunk_index': chunk.chunk_index, |
|
|
'source': 'pdf', |
|
|
**chunk.metadata |
|
|
} |
|
|
|
|
|
|
|
|
self.qdrant_service.index_data( |
|
|
doc_id=chunk_id, |
|
|
embedding=embedding, |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
chunk_ids.append(chunk_id) |
|
|
indexed_count += 1 |
|
|
|
|
|
|
|
|
doc_info = { |
|
|
'document_id': document_id, |
|
|
'type': 'pdf', |
|
|
'file_path': pdf_path, |
|
|
'num_chunks': indexed_count, |
|
|
'chunk_ids': chunk_ids, |
|
|
'metadata': document_metadata or {}, |
|
|
'pdf_info': self.parser.get_pdf_info(pdf_path) |
|
|
} |
|
|
self.documents_collection.insert_one(doc_info) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'document_id': document_id, |
|
|
'chunks_indexed': indexed_count, |
|
|
'chunk_ids': chunk_ids[:5] |
|
|
} |
|
|
|
|
|
def index_pdf_bytes( |
|
|
self, |
|
|
pdf_bytes: bytes, |
|
|
document_id: str, |
|
|
filename: str, |
|
|
document_metadata: Optional[Dict] = None |
|
|
) -> Dict: |
|
|
""" |
|
|
Index PDF from bytes (for uploaded files) |
|
|
|
|
|
Args: |
|
|
pdf_bytes: PDF file as bytes |
|
|
document_id: Unique ID for this document |
|
|
filename: Original filename |
|
|
document_metadata: Additional metadata |
|
|
|
|
|
Returns: |
|
|
Indexing results |
|
|
""" |
|
|
|
|
|
metadata = document_metadata or {} |
|
|
metadata['filename'] = filename |
|
|
|
|
|
chunks = self.parser.parse_pdf_bytes(pdf_bytes, metadata) |
|
|
|
|
|
|
|
|
indexed_count = 0 |
|
|
chunk_ids = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
chunk_id = f"{document_id}_p{chunk.page_number}_c{chunk.chunk_index}" |
|
|
|
|
|
|
|
|
embedding = self.embedding_service.encode_text(chunk.text) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'text': chunk.text, |
|
|
'document_id': document_id, |
|
|
'page': chunk.page_number, |
|
|
'chunk_index': chunk.chunk_index, |
|
|
'source': 'pdf', |
|
|
'filename': filename, |
|
|
**chunk.metadata |
|
|
} |
|
|
|
|
|
|
|
|
self.qdrant_service.index_data( |
|
|
doc_id=chunk_id, |
|
|
embedding=embedding, |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
chunk_ids.append(chunk_id) |
|
|
indexed_count += 1 |
|
|
|
|
|
|
|
|
doc_info = { |
|
|
'document_id': document_id, |
|
|
'type': 'pdf', |
|
|
'filename': filename, |
|
|
'num_chunks': indexed_count, |
|
|
'chunk_ids': chunk_ids, |
|
|
'metadata': metadata |
|
|
} |
|
|
self.documents_collection.insert_one(doc_info) |
|
|
|
|
|
return { |
|
|
'success': True, |
|
|
'document_id': document_id, |
|
|
'filename': filename, |
|
|
'chunks_indexed': indexed_count, |
|
|
'chunk_ids': chunk_ids[:5] |
|
|
} |
|
|
|