Spaces:
Sleeping
Sleeping
File size: 4,508 Bytes
6acfeaf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
import os
import hashlib
import pickle
from datetime import datetime, timedelta
from pathlib import Path
from typing import List
from docling.document_converter import DocumentConverter
from langchain_text_splitters import MarkdownHeaderTextSplitter
from config import constants
from config.settings import settings
from utils.logging import logger
class DocumentProcessor:
def __init__(self):
self.headers = [("#", "Header 1"), ("##", "Header 2")]
self.cache_dir = Path(settings.CACHE_DIR)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def validate_files(self, files: List) -> None:
"""Validate the total size of the uploaded files."""
total_size = 0
for f in files:
# Handle both Gradio file objects and string paths
file_path = f.name if hasattr(f, 'name') else f
try:
total_size += os.path.getsize(file_path)
except Exception as e:
logger.warning(f"Could not get size for {file_path}: {e}")
continue
if total_size > constants.MAX_TOTAL_SIZE:
raise ValueError(f"Total size exceeds {constants.MAX_TOTAL_SIZE//1024//1024}MB limit")
def process(self, files: List) -> List:
"""Process files with caching for subsequent queries"""
self.validate_files(files)
all_chunks = []
seen_hashes = set()
for file in files:
try:
# Handle both Gradio file objects and string paths
file_path = file.name if hasattr(file, 'name') else file
# Generate content-based hash for caching
with open(file_path, "rb") as f:
file_hash = self._generate_hash(f.read())
cache_path = self.cache_dir / f"{file_hash}.pkl"
if self._is_cache_valid(cache_path):
logger.info(f"Loading from cache: {file_path}")
chunks = self._load_from_cache(cache_path)
else:
logger.info(f"Processing and caching: {file_path}")
chunks = self._process_file(file_path)
self._save_to_cache(chunks, cache_path)
# Deduplicate chunks across files
for chunk in chunks:
chunk_hash = self._generate_hash(chunk.page_content.encode())
if chunk_hash not in seen_hashes:
all_chunks.append(chunk)
seen_hashes.add(chunk_hash)
except Exception as e:
file_path_display = file.name if hasattr(file, 'name') else file
logger.error(f"Failed to process {file_path_display}: {str(e)}")
continue
logger.info(f"Total unique chunks: {len(all_chunks)}")
return all_chunks
def _process_file(self, file) -> List:
"""Original processing logic with Docling"""
# Handle both Gradio file objects and string paths
file_path = file.name if hasattr(file, 'name') else file
if not file_path.endswith(('.pdf', '.docx', '.txt', '.md')):
logger.warning(f"Skipping unsupported file type: {file_path}")
return []
converter = DocumentConverter()
markdown = converter.convert(file_path).document.export_to_markdown()
splitter = MarkdownHeaderTextSplitter(self.headers)
return splitter.split_text(markdown)
def _generate_hash(self, content: bytes) -> str:
return hashlib.sha256(content).hexdigest()
def _save_to_cache(self, chunks: List, cache_path: Path):
with open(cache_path, "wb") as f:
pickle.dump({
"timestamp": datetime.now().timestamp(),
"chunks": chunks
}, f)
def _load_from_cache(self, cache_path: Path) -> List:
with open(cache_path, "rb") as f:
data = pickle.load(f)
return data["chunks"]
def _is_cache_valid(self, cache_path: Path) -> bool:
if not cache_path.exists():
return False
cache_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
return cache_age < timedelta(days=settings.CACHE_EXPIRE_DAYS) |