import json import spacy import re import os, requests, time import fitz # PyMuPDF We use PyMuPDF (fitz) to capture hierarchy (section → subsection → subsubsection → content/bullets). from collections import Counter from fastapi import FastAPI from pydantic import BaseModel from typing import Optional from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI # -------------------------- # HR Assistant Prompt Templates # -------------------------- hr_system_message = """ You are the Flykite Airlines HR Policy Assistant. Your role is to answer employee questions based on official HR documents (handbooks, policy PDFs, etc.). Each user question will start with the token: ###Question. ### Response Rules - Be clear, factual, and professional. - Use bullet points (-) or numbered lists (1., 2., etc.) for clarity. - Begin with a **one-line summary**, then details. - Cite the Specific policy references (Document → Section → Subsection → Sub-subsection) where the answer comes from. - If the answer is not in the source, reply 1 line from generic resonse and post fix with exactly: \n\n **"Could not find anything out from Flyline HR documentation around your query.\n\nPlease rephrase your query."** - Do **not** make assumptions or fabricate information. ### Ambiguity & Context - If a query could refer to multiple policies or depends on role/location/department, ask **one short clarifying question**. - If you assume a context, state it clearly (e.g., "Assuming HQ staff..."). - When policies differ by role/location, list variations clearly. ### Personalization - Tailor responses to any role, location, or employment type provided. - Mention if rules vary and what those differences are. ### Format 1. One-line summary. 2. Key details, steps, or rules. 3. Specific policy references (Document → Section → Subsection → Sub-subsection) where the answer comes from. 4. Optional follow-up suggestion or clarifying question. ### Important - Never guess or invent policy content. - Maintain confidentiality and avoid personal data. - User questions always begin with `###Question`. Respond only to those. """ hr_user_message_template = """ Consider the following ###Context and ###Question: ###Context {context} ###Question {question} """ # -------------------------- # PDF Parsing Utils # -------------------------- def clean_text_hidden(s: str) -> str: if not s: return "" s = re.sub(r"[\u200B-\u200F\u202A-\u202E\u00A0\u00AD]", " ", s) s = re.sub(r"\s+", " ", s) return s.strip() def is_line_fully_bold(spans): return all( ("Bold" in s["font"] or s["flags"] & 2 != 0) for s in spans if s.get("text", "").strip() ) def detect_font_levels(pdf_path): doc = fitz.open(pdf_path) font_sizes = [] for page in doc: blocks = page.get_text("dict")["blocks"] for b in blocks: for l in b.get("lines", []): for s in l.get("spans", []): font_sizes.append(round(s["size"], 1)) unique_sizes = sorted(set(font_sizes), reverse=True) if len(unique_sizes) > 3: candidate_sizes = unique_sizes[1:-1] else: candidate_sizes = unique_sizes section_size = candidate_sizes[0] if candidate_sizes else unique_sizes[0] subsubsection_size = candidate_sizes[1] if len(candidate_sizes) > 1 else section_size return section_size, subsubsection_size def most_common_size(sizes): return Counter(sizes).most_common(1)[0][0] if sizes else None def parse_flykite(pdf_path): section_size, subsubsection_size = detect_font_levels(pdf_path) doc = fitz.open(pdf_path) sections = [] current_section, current_subsection, current_subsubsection = None, None, None for page_num, page in enumerate(doc, start=1): blocks = page.get_text("dict")["blocks"] for b in blocks: for l in b.get("lines", []): spans = l.get("spans", []) line_text = "".join(s.get("text", "") for s in spans).strip() line_text = clean_text_hidden(line_text) if not line_text: continue span_sizes = [round(s["size"], 1) for s in spans] line_size = most_common_size(span_sizes) # SECTION/SUBSECTION if line_size == section_size: if is_line_fully_bold(spans) and "policy" in line_text.lower(): current_subsection = {"subsection": line_text, "subsubsections": [], "content": []} if current_section: current_section["subsections"].append(current_subsection) else: current_section = {"section": line_text, "subsections": []} sections.append(current_section) current_subsection = None current_subsubsection = None continue # SUB-SUBSECTION if re.match(r"^\d+\s*\.\s+", line_text): if line_size == subsubsection_size: is_heading = False if is_line_fully_bold(spans): is_heading = True else: if len(spans) > 1: first_span_text = clean_text_hidden(spans[0]["text"]).strip() if re.match(r"^\d+\.?$", first_span_text): rest_bold = all( ("Bold" in s["font"] or s["flags"] & 2 != 0) for s in spans[1:] if s.get("text", "").strip() ) if rest_bold: is_heading = True if is_heading: current_subsubsection = {"title": line_text, "content": []} if current_subsection: current_subsection["subsubsections"].append(current_subsubsection) elif current_section: auto_sub = {"subsection": current_section["section"], "subsubsections": []} current_section["subsections"].append(auto_sub) current_subsection = auto_sub current_subsection["subsubsections"].append(current_subsubsection) continue # otherwise treat as content if current_subsubsection: current_subsubsection["content"].append(line_text) elif current_subsection: current_subsection["content"].append(line_text) elif current_section: current_section.setdefault("content", []).append(line_text) else: if not sections: sections.append({"intro": [line_text]}) else: sections[0].setdefault("intro", []).append(line_text) return sections # (REST calls, no LangChain-OpenAI). class SimpleChat: def __init__(self, model="gpt-4o-mini"): self.model = model self.api_key = os.getenv("OPENAI_API_KEY") self.base_url = "https://api.openai.com/v1/chat/completions" def invoke(self, messages, temperature=0, max_tokens=1500): resp = requests.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json={ "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens } ) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"].strip() # -------------------------- # Chunking + RAG # -------------------------- # ADDED section_title & subsection_title alongside subsubsection_titLes into each chunk, # so that any Chunk as it gets embedded # >>>> It should have reference of the Parent level Section/Subsetion Titles information , in particular , as well , # >>>> Just in case , some End User says something at the level of Section Level mapped information. # Secondly this helps to Increase trust and compliance by citing sources (document name, section, subsection, subsubsection as well) for each response. # --- Flatten JSON to chunks --- # Load spaCy NER model nlp = spacy.load("en_core_web_sm") # --- spaCy Extraction --- def extract_with_spacy(text): doc = nlp(text) roles, locations, departments = [], [], [] for ent in doc.ents: if ent.label_ in ["GPE", "LOC"]: # e.g., "Singapore" locations.append(ent.text) elif ent.label_ in ["ORG"]: # e.g., "HR", "Finance" departments.append(ent.text) elif ent.label_ in ["PERSON"]: # sometimes job titles slip roles.append(ent.text) return { "roles": list(set(roles)), "locations": list(set(locations)), "departments": list(set(departments)) } # --- LLM Extraction --- def extract_with_llm(text): prompt = f""" You are an expert HR assistant for an airline company. Your Task: - Extract **Role(s)**, **Location(s)**, and **Department(s)** explicitly or implicitly mentioned in the following HR policy text. - Focus on aviation-related roles (e.g., Pilot, Cabin Crew, Engineer, Ground Staff, Field Staff), locations (e.g., India, UK, Singapore, Headquarters), and departments (e.g., HR, Finance, Compliance, Operations). - If something is implied (e.g., "field staff" → role=Field Staff, location unspecified), capture it. - If no information is found, return an empty list for that field. --- ### FEW SHOTS Examples Text: "Special leave for cabin crew in Singapore" Output: {{"roles": ["Cabin Crew"], "locations": ["Singapore"], "departments": []}} Text: "Pilots based in UK headquarters" Output: {{"roles": ["Pilot"], "locations": ["United Kingdom", "Headquarters"], "departments": []}} Text: "HR staff policies in India" Output: {{"roles": [], "locations": ["India"], "departments": ["HR"]}} Text: "Field staff in Dubai get separate insurance policy" Output: {{"roles": ["Field Staff"], "locations": ["Dubai"], "departments": []}} --- Now extract from: {text} Output: Return only valid JSON in this exact schema: {{ "roles": [list of roles], "locations": [list of locations], "departments": [list of departments] }} """ try: # (REST calls, no LangChain-OpenAI). os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") llm = SimpleChat(model="gpt-4o-mini") messages = [ {"role": "user", "content": prompt} ] content = llm.invoke(messages, temperature=0, max_tokens=1500) # Enforce safe parsing if content.startswith("{"): extracted = json.loads(content) else: extracted = {"roles": [], "locations": [], "departments": []} except Exception: print("NOT ABLE TO RESOLVE LLM CALL XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") extracted = {"roles": [], "locations": [], "departments": []} return extracted # --- Merge spaCy + LLM --- def enrich_metadata(text): spacy_res = extract_with_spacy(text) llm_res = extract_with_llm(text) return { "roles": list(set(spacy_res["roles"] + llm_res["roles"])), "locations": list(set(spacy_res["locations"] + llm_res["locations"])), "departments": list(set(spacy_res["departments"] + llm_res["departments"])) } # --- Ensure metadata is Chroma-compatible --- def sanitize_metadata(meta: dict) -> dict: safe_meta = {} for k, v in meta.items(): if isinstance(v, (str, int, float, bool)) or v is None: safe_meta[k] = v elif isinstance(v, (list, tuple)): safe_meta[k] = ", ".join(map(str, v)) # flatten lists elif isinstance(v, dict): safe_meta[k] = json.dumps(v, ensure_ascii=False) # dict → string else: safe_meta[k] = str(v) # fallback return safe_meta # --- Flatten JSON to chunks --- def flatten_json_to_chunks(structured_json, document_name="Flykite HR Policy Handbook"): chunks = [] for sec in structured_json: section_title = sec.get("section") for sub in sec.get("subsections", []): subsection_title = sub.get("subsection") # Sub-subsections for subsub in sub.get("subsubsections", []): content_text = " ".join(subsub.get("content", [])) if content_text.strip(): enriched_meta = enrich_metadata(content_text) meta = sanitize_metadata({ "document": document_name, "section": section_title, "subsection": subsection_title, "subsubsection": subsub.get("title"), **enriched_meta }) chunks.append({ "text": f"{section_title} | {subsection_title} | {subsub.get('title')}\n\n{content_text}", "metadata": meta }) # Fallback: orphaned content under subsection if sub.get("content"): content_text = " ".join(sub.get("content", [])) enriched_meta = enrich_metadata(content_text) meta = sanitize_metadata({ "document": document_name, "section": section_title, "subsection": subsection_title, "subsubsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed), **enriched_meta }) chunks.append({ "text": f"{section_title} | {subsection_title}\n\n{content_text}", "metadata": meta }) # Fallback: orphaned content under section if sec.get("content"): content_text = " ".join(sec.get("content", [])) enriched_meta = enrich_metadata(content_text) meta = sanitize_metadata({ "document": document_name, "section": section_title, "subsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed), "subsubsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed), **enriched_meta }) chunks.append({ "text": f"{section_title}\n\n{content_text}", "metadata": meta }) return chunks def build_context(docs): context_parts = [] for d in docs: meta = d.metadata citation = f"{meta.get('document')} → {meta.get('section')}" if meta.get("subsection"): citation += f" / {meta.get('subsection')}" if meta.get("subsubsection"): citation += f" / {meta.get('subsubsection')}" context_parts.append(f"Source: {citation}\n{d.page_content}") return "\n\n---\n\n".join(context_parts) # ----------------------- # User Query Enrichment # ----------------------- def extract_metadata_from_query(query: str): """Use spaCy + LLM to extract role/location/department from user query.""" spacy_res = extract_with_spacy(query) print("spaCy results ## ==>", spacy_res) llm_res = extract_with_llm(query) print("LLM Extraction Results ## ==>", llm_res) return { "roles": list(set(spacy_res["roles"] + llm_res["roles"])), "locations": list(set(spacy_res["locations"] + llm_res["locations"])), "departments": list(set(spacy_res["departments"] + llm_res["departments"])) } # ----------------------- # Helper: Filter docs manually # ----------------------- def filter_docs_by_metadata(docs, metadata_filters): filtered = [] for d in docs: meta = d.metadata keep = True if metadata_filters.get("roles"): keep &= any(r in meta.get("roles", []) for r in metadata_filters["roles"]) if metadata_filters.get("locations"): keep &= any(l in meta.get("locations", []) for l in metadata_filters["locations"]) if metadata_filters.get("departments"): keep &= any(dep in meta.get("departments", []) for dep in metadata_filters["departments"]) if keep: filtered.append(d) return filtered def generate_rag_response(user_input, retriever, k=3, max_tokens=1500): # relevant_docs = retriever.get_relevant_documents(user_input)[:k] # When user asks a query, we enrich it by extracting role, location, department using the same spaCy + LLM pipeline. # Pass those extracted values as filters to the retriever → only chunks with matching metadata are considered. # If nothing matches, fallback to plain semantic search (so we don’t block valid answers). # Step 1: Extract personalization metadata from query query_metadata = extract_metadata_from_query(user_input) print("\n======================") print(" User Query:", user_input) print(" Extracted metadata from query:", query_metadata) # Investigatory log # 2. Retrieve top-k docs semantically retrieved_docs = retriever.get_relevant_documents(user_input, k=k) print(f" Retrieved {len(retrieved_docs)} docs before filtering") # 3. Apply metadata filtering filtered_docs = filter_docs_by_metadata(retrieved_docs, query_metadata) if filtered_docs: selected_docs = filtered_docs print(f"✅ {len(selected_docs)} docs kept after metadata filtering") else: selected_docs = retrieved_docs # fallback if no metadata match print("⚠️ No metadata match, falling back to semantic retrieval only") # Step 4: Log retrieved docs metadata print(f"✅ Retrieved {len(selected_docs)} docs") for i, d in enumerate(selected_docs, 1): print(f"\n--- Chunk {i} ---") print("Text:", d.page_content[:200], "...") # preview first 200 chars print("Metadata:", d.metadata) context_for_query = build_context(selected_docs) user_prompt = hr_user_message_template.format(context=context_for_query, question=user_input) messages = [ {"role": "system", "content": hr_system_message}, {"role": "user", "content": user_prompt}, ] #llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=max_tokens) #response = llm.invoke(messages) #return {"answer": response.content, "sources": [d.metadata for d in relevant_docs]} # You still used ChatOpenAI (from langchain-openai) for generating answers. # That’s where the proxies keyword issue blew up, since that part was still using the buggy client. # Error: your container is pulling in a version of langchain-openai (and maybe openai) # that still tries to pass proxies to the OpenAI client, but in your current environment the client doesn’t accept that argument. # (REST calls, no LangChain-OpenAI). os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") llm = SimpleChat(model="gpt-4o-mini") answer = llm.invoke(messages, temperature=0, max_tokens=max_tokens) return {"answer": answer, "sources": [d.metadata for d in selected_docs]} # -------------------------- # FastAPI App # -------------------------- #--================== START of API setup on reboot ===================== app = FastAPI() persist_dir = "./flykite_chromadb" retriever = None class QueryRequest(BaseModel): query: str top_k: Optional[int] = 3 #@app.on_event("startup") #def startup_event(): #global retriever time.sleep(2) # ✅ give Hugging Face time to inject secrets print("🔑 OPENAI_API_KEY loaded:", bool(os.getenv("OPENAI_API_KEY"))) pdf_path = "data/Dataset-FlykiteAirlines_HRP.pdf" #Place PDF IN the repo Boot # Parse PDF → JSON parsed_data = parse_flykite(pdf_path) print(json.dumps(parsed_data[:1], indent=2, ensure_ascii=False)) if not parsed_data: raise RuntimeError(" Parsed JSON is empty, cannot build chunks/vectorstore") # Flatten chunks chunks = flatten_json_to_chunks(parsed_data) print(f" Loaded {len(chunks)} chunks from JSON") # If no chunks, fail early if not chunks: raise RuntimeError("No chunks generated from structured JSON") # Build Chroma vectorstore # Define SimpleEmbeddings inline os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") class SimpleEmbeddings: def __init__(self, model="text-embedding-3-small"): self.model = model self.api_key = os.getenv("OPENAI_API_KEY") self.base_url = "https://api.openai.com/v1/embeddings" def embed_documents(self, texts): embeddings = [] for text in texts: resp = requests.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json={"model": self.model, "input": text} ) resp.raise_for_status() embeddings.append(resp.json()["data"][0]["embedding"]) return embeddings def embed_query(self, query): resp = requests.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json={"model": self.model, "input": query} ) resp.raise_for_status() return resp.json()["data"][0]["embedding"] # Use SimpleEmbeddings instead of OpenAIEmbeddings embedding = SimpleEmbeddings(model="text-embedding-3-small") texts = [c["text"] for c in chunks] metadatas = [c["metadata"] for c in chunks] vectorstore = Chroma.from_texts( texts=texts, embedding=embedding, metadatas=metadatas, persist_directory=persist_dir, ids=[f"chunk_{i}" for i in range(len(chunks))] ) vectorstore.persist() #ensure data is saved to disk print("💾 Chroma vectorstore saved !!") retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) print(" PDF parsed, chunks embedded, retriever initialized.") #--================== END of API setup on start ===================== #@app.post("/query") #def query_endpoint(req: QueryRequest): # return generate_rag_response(req.query, retriever, k=req.top_k) def wait_for_key(key_name="OPENAI_API_KEY", timeout=10): for _ in range(timeout): if os.getenv(key_name): print(f"✅ {key_name} available.") return True print(f"⏳ Waiting for {key_name}...") time.sleep(1) print(f"❌ {key_name} not found after {timeout} seconds.") return False # ============================= # Step 5: Chat Function # ============================= def format_answer(result): answer = result["answer"] sources = result.get("sources", []) formatted_sources = "\n".join([ f"- {s['document']} → {s['section']} / {s['subsection']} / {s['subsubsection']}" for s in sources ]) return f"""{answer} 📄 **Sources** {formatted_sources} """ def chat_fn(message, history): global retriever wait_for_key() if retriever is None: return "⚠️ Retriever not initialized. Please rebuild or check vector DB." answer = generate_rag_response(message, retriever) return format_answer(answer) #f"{answer}\n\n🧠 (Context retrieved from {pdf_path})" # ============================= # Step 6: Chat bubbles UI # ============================= import gradio as gr css1 = r""" #chatbot .user { background: linear-gradient(to bottom right, #93c5fd, #60a5fa); color: white; border-radius: 18px 18px 4px 18px; padding: 10px 14px; margin: 6px 0; text-align: right; max-width: 75%; margin-left: auto; box-shadow: 0 2px 6px rgba(37,99,235,0.25); } #chatbot .bot { background: #f3f4f6; color: #111827; border-radius: 18px 18px 18px 4px; padding: 10px 14px; margin: 6px 0; text-align: left; max-width: 75%; margin-right: auto; box-shadow: 0 2px 6px rgba(0,0,0,0.05); } @keyframes typing { 0%, 100% { opacity: 0.4; transform: translateY(0); } 50% { opacity: 1; transform: translateY(-4px); } } .typing-dot { animation: typing 1s infinite; } """ css = """ #chatbot { background-color: #f7f9fc; border-radius: 10px; padding: 15px; overflow-y: auto; } #chatbot .message { display: flex; margin: 10px 0; } #chatbot .message.user { justify-content: flex-end; } #chatbot .message.bot { justify-content: flex-start; } /* User bubble */ #chatbot .message.user .bubble { background: linear-gradient(135deg, #4CAF50, #81C784); color: white; border-radius: 16px 16px 0 16px; padding: 10px 15px; max-width: 70%; box-shadow: 0 2px 5px rgba(0,0,0,0.15); } /* Bot bubble */ #chatbot .message.bot .bubble { background: linear-gradient(135deg, #2196F3, #64B5F6); color: white; border-radius: 16px 16px 16px 0; padding: 10px 15px; max-width: 70%; box-shadow: 0 2px 5px rgba(0,0,0,0.15); } /* Optional: add smooth fade-in animation */ @keyframes bubblePop { from { transform: scale(0.95); opacity: 0; } to { transform: scale(1); opacity: 1; } } #chatbot .bubble { animation: bubblePop 0.2s ease-out; } #footer { position: fixed; bottom: 5px; left: 0; width: 100%; text-align: center; font-size: 12px; color: #777; padding: 5px 0; background: rgba(255, 255, 255, 0.7); } """ footer_html = """
""" # ============================= # Step 7: Launch App # ============================= #def respond(message, history): # return f"BubbleBot says: {message}" gr.ChatInterface( fn=chat_fn, title="Flyline Chatbot ✈ ️", description="Ask Flyline HR "+footer_html, theme="soft", css=css ).launch()