import json
import spacy
import re
import os, requests, time
import fitz # PyMuPDF We use PyMuPDF (fitz) to capture hierarchy (section → subsection → subsubsection → content/bullets).
from collections import Counter
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
# --------------------------
# HR Assistant Prompt Templates
# --------------------------
hr_system_message = """
You are the Flykite Airlines HR Policy Assistant.
Your role is to answer employee questions based on official HR documents (handbooks, policy PDFs, etc.).
Each user question will start with the token: ###Question.
### Response Rules
- Be clear, factual, and professional.
- Use bullet points (-) or numbered lists (1., 2., etc.) for clarity.
- Begin with a **one-line summary**, then details.
- Cite the Specific policy references (Document → Section → Subsection → Sub-subsection) where
the answer comes from.
- If the answer is not in the source, reply 1 line from generic resonse and post fix with exactly: \n\n **"Could not find anything out from Flyline HR documentation around your query.\n\nPlease rephrase your query."**
- Do **not** make assumptions or fabricate information.
### Ambiguity & Context
- If a query could refer to multiple policies or depends on role/location/department, ask **one short clarifying question**.
- If you assume a context, state it clearly (e.g., "Assuming HQ staff...").
- When policies differ by role/location, list variations clearly.
### Personalization
- Tailor responses to any role, location, or employment type provided.
- Mention if rules vary and what those differences are.
### Format
1. One-line summary.
2. Key details, steps, or rules.
3. Specific policy references (Document → Section → Subsection → Sub-subsection) where
the answer comes from.
4. Optional follow-up suggestion or clarifying question.
### Important
- Never guess or invent policy content.
- Maintain confidentiality and avoid personal data.
- User questions always begin with `###Question`. Respond only to those.
"""
hr_user_message_template = """
Consider the following ###Context and ###Question:
###Context
{context}
###Question
{question}
"""
# --------------------------
# PDF Parsing Utils
# --------------------------
def clean_text_hidden(s: str) -> str:
if not s:
return ""
s = re.sub(r"[\u200B-\u200F\u202A-\u202E\u00A0\u00AD]", " ", s)
s = re.sub(r"\s+", " ", s)
return s.strip()
def is_line_fully_bold(spans):
return all(
("Bold" in s["font"] or s["flags"] & 2 != 0)
for s in spans if s.get("text", "").strip()
)
def detect_font_levels(pdf_path):
doc = fitz.open(pdf_path)
font_sizes = []
for page in doc:
blocks = page.get_text("dict")["blocks"]
for b in blocks:
for l in b.get("lines", []):
for s in l.get("spans", []):
font_sizes.append(round(s["size"], 1))
unique_sizes = sorted(set(font_sizes), reverse=True)
if len(unique_sizes) > 3:
candidate_sizes = unique_sizes[1:-1]
else:
candidate_sizes = unique_sizes
section_size = candidate_sizes[0] if candidate_sizes else unique_sizes[0]
subsubsection_size = candidate_sizes[1] if len(candidate_sizes) > 1 else section_size
return section_size, subsubsection_size
def most_common_size(sizes):
return Counter(sizes).most_common(1)[0][0] if sizes else None
def parse_flykite(pdf_path):
section_size, subsubsection_size = detect_font_levels(pdf_path)
doc = fitz.open(pdf_path)
sections = []
current_section, current_subsection, current_subsubsection = None, None, None
for page_num, page in enumerate(doc, start=1):
blocks = page.get_text("dict")["blocks"]
for b in blocks:
for l in b.get("lines", []):
spans = l.get("spans", [])
line_text = "".join(s.get("text", "") for s in spans).strip()
line_text = clean_text_hidden(line_text)
if not line_text:
continue
span_sizes = [round(s["size"], 1) for s in spans]
line_size = most_common_size(span_sizes)
# SECTION/SUBSECTION
if line_size == section_size:
if is_line_fully_bold(spans) and "policy" in line_text.lower():
current_subsection = {"subsection": line_text, "subsubsections": [], "content": []}
if current_section:
current_section["subsections"].append(current_subsection)
else:
current_section = {"section": line_text, "subsections": []}
sections.append(current_section)
current_subsection = None
current_subsubsection = None
continue
# SUB-SUBSECTION
if re.match(r"^\d+\s*\.\s+", line_text):
if line_size == subsubsection_size:
is_heading = False
if is_line_fully_bold(spans):
is_heading = True
else:
if len(spans) > 1:
first_span_text = clean_text_hidden(spans[0]["text"]).strip()
if re.match(r"^\d+\.?$", first_span_text):
rest_bold = all(
("Bold" in s["font"] or s["flags"] & 2 != 0)
for s in spans[1:] if s.get("text", "").strip()
)
if rest_bold:
is_heading = True
if is_heading:
current_subsubsection = {"title": line_text, "content": []}
if current_subsection:
current_subsection["subsubsections"].append(current_subsubsection)
elif current_section:
auto_sub = {"subsection": current_section["section"], "subsubsections": []}
current_section["subsections"].append(auto_sub)
current_subsection = auto_sub
current_subsection["subsubsections"].append(current_subsubsection)
continue
# otherwise treat as content
if current_subsubsection:
current_subsubsection["content"].append(line_text)
elif current_subsection:
current_subsection["content"].append(line_text)
elif current_section:
current_section.setdefault("content", []).append(line_text)
else:
if not sections:
sections.append({"intro": [line_text]})
else:
sections[0].setdefault("intro", []).append(line_text)
return sections
# (REST calls, no LangChain-OpenAI).
class SimpleChat:
def __init__(self, model="gpt-4o-mini"):
self.model = model
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = "https://api.openai.com/v1/chat/completions"
def invoke(self, messages, temperature=0, max_tokens=1500):
resp = requests.post(
self.base_url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"].strip()
# --------------------------
# Chunking + RAG
# --------------------------
# ADDED section_title & subsection_title alongside subsubsection_titLes into each chunk,
# so that any Chunk as it gets embedded
# >>>> It should have reference of the Parent level Section/Subsetion Titles information , in particular , as well ,
# >>>> Just in case , some End User says something at the level of Section Level mapped information.
# Secondly this helps to Increase trust and compliance by citing sources (document name, section, subsection, subsubsection as well) for each response.
# --- Flatten JSON to chunks ---
# Load spaCy NER model
nlp = spacy.load("en_core_web_sm")
# --- spaCy Extraction ---
def extract_with_spacy(text):
doc = nlp(text)
roles, locations, departments = [], [], []
for ent in doc.ents:
if ent.label_ in ["GPE", "LOC"]: # e.g., "Singapore"
locations.append(ent.text)
elif ent.label_ in ["ORG"]: # e.g., "HR", "Finance"
departments.append(ent.text)
elif ent.label_ in ["PERSON"]: # sometimes job titles slip
roles.append(ent.text)
return {
"roles": list(set(roles)),
"locations": list(set(locations)),
"departments": list(set(departments))
}
# --- LLM Extraction ---
def extract_with_llm(text):
prompt = f"""
You are an expert HR assistant for an airline company.
Your Task:
- Extract **Role(s)**, **Location(s)**, and **Department(s)** explicitly or implicitly mentioned
in the following HR policy text.
- Focus on aviation-related roles (e.g., Pilot, Cabin Crew, Engineer, Ground Staff, Field Staff),
locations (e.g., India, UK, Singapore, Headquarters), and departments (e.g., HR, Finance, Compliance, Operations).
- If something is implied (e.g., "field staff" → role=Field Staff, location unspecified), capture it.
- If no information is found, return an empty list for that field.
---
### FEW SHOTS Examples
Text: "Special leave for cabin crew in Singapore"
Output: {{"roles": ["Cabin Crew"], "locations": ["Singapore"], "departments": []}}
Text: "Pilots based in UK headquarters"
Output: {{"roles": ["Pilot"], "locations": ["United Kingdom", "Headquarters"], "departments": []}}
Text: "HR staff policies in India"
Output: {{"roles": [], "locations": ["India"], "departments": ["HR"]}}
Text: "Field staff in Dubai get separate insurance policy"
Output: {{"roles": ["Field Staff"], "locations": ["Dubai"], "departments": []}}
---
Now extract from:
{text}
Output:
Return only valid JSON in this exact schema:
{{
"roles": [list of roles],
"locations": [list of locations],
"departments": [list of departments]
}}
"""
try:
# (REST calls, no LangChain-OpenAI).
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
llm = SimpleChat(model="gpt-4o-mini")
messages = [
{"role": "user", "content": prompt}
]
content = llm.invoke(messages, temperature=0, max_tokens=1500)
# Enforce safe parsing
if content.startswith("{"):
extracted = json.loads(content)
else:
extracted = {"roles": [], "locations": [], "departments": []}
except Exception:
print("NOT ABLE TO RESOLVE LLM CALL XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
extracted = {"roles": [], "locations": [], "departments": []}
return extracted
# --- Merge spaCy + LLM ---
def enrich_metadata(text):
spacy_res = extract_with_spacy(text)
llm_res = extract_with_llm(text)
return {
"roles": list(set(spacy_res["roles"] + llm_res["roles"])),
"locations": list(set(spacy_res["locations"] + llm_res["locations"])),
"departments": list(set(spacy_res["departments"] + llm_res["departments"]))
}
# --- Ensure metadata is Chroma-compatible ---
def sanitize_metadata(meta: dict) -> dict:
safe_meta = {}
for k, v in meta.items():
if isinstance(v, (str, int, float, bool)) or v is None:
safe_meta[k] = v
elif isinstance(v, (list, tuple)):
safe_meta[k] = ", ".join(map(str, v)) # flatten lists
elif isinstance(v, dict):
safe_meta[k] = json.dumps(v, ensure_ascii=False) # dict → string
else:
safe_meta[k] = str(v) # fallback
return safe_meta
# --- Flatten JSON to chunks ---
def flatten_json_to_chunks(structured_json, document_name="Flykite HR Policy Handbook"):
chunks = []
for sec in structured_json:
section_title = sec.get("section")
for sub in sec.get("subsections", []):
subsection_title = sub.get("subsection")
# Sub-subsections
for subsub in sub.get("subsubsections", []):
content_text = " ".join(subsub.get("content", []))
if content_text.strip():
enriched_meta = enrich_metadata(content_text)
meta = sanitize_metadata({
"document": document_name,
"section": section_title,
"subsection": subsection_title,
"subsubsection": subsub.get("title"),
**enriched_meta
})
chunks.append({
"text": f"{section_title} | {subsection_title} | {subsub.get('title')}\n\n{content_text}",
"metadata": meta
})
# Fallback: orphaned content under subsection
if sub.get("content"):
content_text = " ".join(sub.get("content", []))
enriched_meta = enrich_metadata(content_text)
meta = sanitize_metadata({
"document": document_name,
"section": section_title,
"subsection": subsection_title,
"subsubsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed),
**enriched_meta
})
chunks.append({
"text": f"{section_title} | {subsection_title}\n\n{content_text}",
"metadata": meta
})
# Fallback: orphaned content under section
if sec.get("content"):
content_text = " ".join(sec.get("content", []))
enriched_meta = enrich_metadata(content_text)
meta = sanitize_metadata({
"document": document_name,
"section": section_title,
"subsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed),
"subsubsection": "", # None, : Chroma doesn’t allow None values. They must be strings (or removed),
**enriched_meta
})
chunks.append({
"text": f"{section_title}\n\n{content_text}",
"metadata": meta
})
return chunks
def build_context(docs):
context_parts = []
for d in docs:
meta = d.metadata
citation = f"{meta.get('document')} → {meta.get('section')}"
if meta.get("subsection"):
citation += f" / {meta.get('subsection')}"
if meta.get("subsubsection"):
citation += f" / {meta.get('subsubsection')}"
context_parts.append(f"Source: {citation}\n{d.page_content}")
return "\n\n---\n\n".join(context_parts)
# -----------------------
# User Query Enrichment
# -----------------------
def extract_metadata_from_query(query: str):
"""Use spaCy + LLM to extract role/location/department from user query."""
spacy_res = extract_with_spacy(query)
print("spaCy results ## ==>", spacy_res)
llm_res = extract_with_llm(query)
print("LLM Extraction Results ## ==>", llm_res)
return {
"roles": list(set(spacy_res["roles"] + llm_res["roles"])),
"locations": list(set(spacy_res["locations"] + llm_res["locations"])),
"departments": list(set(spacy_res["departments"] + llm_res["departments"]))
}
# -----------------------
# Helper: Filter docs manually
# -----------------------
def filter_docs_by_metadata(docs, metadata_filters):
filtered = []
for d in docs:
meta = d.metadata
keep = True
if metadata_filters.get("roles"):
keep &= any(r in meta.get("roles", []) for r in metadata_filters["roles"])
if metadata_filters.get("locations"):
keep &= any(l in meta.get("locations", []) for l in metadata_filters["locations"])
if metadata_filters.get("departments"):
keep &= any(dep in meta.get("departments", []) for dep in metadata_filters["departments"])
if keep:
filtered.append(d)
return filtered
def generate_rag_response(user_input, retriever, k=3, max_tokens=1500):
# relevant_docs = retriever.get_relevant_documents(user_input)[:k]
# When user asks a query, we enrich it by extracting role, location, department using the same spaCy + LLM pipeline.
# Pass those extracted values as filters to the retriever → only chunks with matching metadata are considered.
# If nothing matches, fallback to plain semantic search (so we don’t block valid answers).
# Step 1: Extract personalization metadata from query
query_metadata = extract_metadata_from_query(user_input)
print("\n======================")
print(" User Query:", user_input)
print(" Extracted metadata from query:", query_metadata) # Investigatory log
# 2. Retrieve top-k docs semantically
retrieved_docs = retriever.get_relevant_documents(user_input, k=k)
print(f" Retrieved {len(retrieved_docs)} docs before filtering")
# 3. Apply metadata filtering
filtered_docs = filter_docs_by_metadata(retrieved_docs, query_metadata)
if filtered_docs:
selected_docs = filtered_docs
print(f"✅ {len(selected_docs)} docs kept after metadata filtering")
else:
selected_docs = retrieved_docs # fallback if no metadata match
print("⚠️ No metadata match, falling back to semantic retrieval only")
# Step 4: Log retrieved docs metadata
print(f"✅ Retrieved {len(selected_docs)} docs")
for i, d in enumerate(selected_docs, 1):
print(f"\n--- Chunk {i} ---")
print("Text:", d.page_content[:200], "...") # preview first 200 chars
print("Metadata:", d.metadata)
context_for_query = build_context(selected_docs)
user_prompt = hr_user_message_template.format(context=context_for_query, question=user_input)
messages = [
{"role": "system", "content": hr_system_message},
{"role": "user", "content": user_prompt},
]
#llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=max_tokens)
#response = llm.invoke(messages)
#return {"answer": response.content, "sources": [d.metadata for d in relevant_docs]}
# You still used ChatOpenAI (from langchain-openai) for generating answers.
# That’s where the proxies keyword issue blew up, since that part was still using the buggy client.
# Error: your container is pulling in a version of langchain-openai (and maybe openai)
# that still tries to pass proxies to the OpenAI client, but in your current environment the client doesn’t accept that argument.
# (REST calls, no LangChain-OpenAI).
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
llm = SimpleChat(model="gpt-4o-mini")
answer = llm.invoke(messages, temperature=0, max_tokens=max_tokens)
return {"answer": answer, "sources": [d.metadata for d in selected_docs]}
# --------------------------
# FastAPI App
# --------------------------
#--================== START of API setup on reboot =====================
app = FastAPI()
persist_dir = "./flykite_chromadb"
retriever = None
class QueryRequest(BaseModel):
query: str
top_k: Optional[int] = 3
#@app.on_event("startup")
#def startup_event():
#global retriever
time.sleep(2) # ✅ give Hugging Face time to inject secrets
print("🔑 OPENAI_API_KEY loaded:", bool(os.getenv("OPENAI_API_KEY")))
pdf_path = "data/Dataset-FlykiteAirlines_HRP.pdf" #Place PDF IN the repo Boot
# Parse PDF → JSON
parsed_data = parse_flykite(pdf_path)
print(json.dumps(parsed_data[:1], indent=2, ensure_ascii=False))
if not parsed_data:
raise RuntimeError(" Parsed JSON is empty, cannot build chunks/vectorstore")
# Flatten chunks
chunks = flatten_json_to_chunks(parsed_data)
print(f" Loaded {len(chunks)} chunks from JSON")
# If no chunks, fail early
if not chunks:
raise RuntimeError("No chunks generated from structured JSON")
# Build Chroma vectorstore
# Define SimpleEmbeddings inline
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
class SimpleEmbeddings:
def __init__(self, model="text-embedding-3-small"):
self.model = model
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = "https://api.openai.com/v1/embeddings"
def embed_documents(self, texts):
embeddings = []
for text in texts:
resp = requests.post(
self.base_url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": self.model, "input": text}
)
resp.raise_for_status()
embeddings.append(resp.json()["data"][0]["embedding"])
return embeddings
def embed_query(self, query):
resp = requests.post(
self.base_url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": self.model, "input": query}
)
resp.raise_for_status()
return resp.json()["data"][0]["embedding"]
# Use SimpleEmbeddings instead of OpenAIEmbeddings
embedding = SimpleEmbeddings(model="text-embedding-3-small")
texts = [c["text"] for c in chunks]
metadatas = [c["metadata"] for c in chunks]
vectorstore = Chroma.from_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
persist_directory=persist_dir,
ids=[f"chunk_{i}" for i in range(len(chunks))]
)
vectorstore.persist() #ensure data is saved to disk
print("💾 Chroma vectorstore saved !!")
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
print(" PDF parsed, chunks embedded, retriever initialized.")
#--================== END of API setup on start =====================
#@app.post("/query")
#def query_endpoint(req: QueryRequest):
# return generate_rag_response(req.query, retriever, k=req.top_k)
def wait_for_key(key_name="OPENAI_API_KEY", timeout=10):
for _ in range(timeout):
if os.getenv(key_name):
print(f"✅ {key_name} available.")
return True
print(f"⏳ Waiting for {key_name}...")
time.sleep(1)
print(f"❌ {key_name} not found after {timeout} seconds.")
return False
# =============================
# Step 5: Chat Function
# =============================
def format_answer(result):
answer = result["answer"]
sources = result.get("sources", [])
formatted_sources = "\n".join([
f"- {s['document']} → {s['section']} / {s['subsection']} / {s['subsubsection']}"
for s in sources
])
return f"""{answer}
📄 **Sources**
{formatted_sources}
"""
def chat_fn(message, history):
global retriever
wait_for_key()
if retriever is None:
return "⚠️ Retriever not initialized. Please rebuild or check vector DB."
answer = generate_rag_response(message, retriever)
return format_answer(answer) #f"{answer}\n\n🧠 (Context retrieved from {pdf_path})"
# =============================
# Step 6: Chat bubbles UI
# =============================
import gradio as gr
css1 = r"""
#chatbot .user {
background: linear-gradient(to bottom right, #93c5fd, #60a5fa);
color: white;
border-radius: 18px 18px 4px 18px;
padding: 10px 14px;
margin: 6px 0;
text-align: right;
max-width: 75%;
margin-left: auto;
box-shadow: 0 2px 6px rgba(37,99,235,0.25);
}
#chatbot .bot {
background: #f3f4f6;
color: #111827;
border-radius: 18px 18px 18px 4px;
padding: 10px 14px;
margin: 6px 0;
text-align: left;
max-width: 75%;
margin-right: auto;
box-shadow: 0 2px 6px rgba(0,0,0,0.05);
}
@keyframes typing {
0%, 100% { opacity: 0.4; transform: translateY(0); }
50% { opacity: 1; transform: translateY(-4px); }
}
.typing-dot {
animation: typing 1s infinite;
}
"""
css = """
#chatbot {
background-color: #f7f9fc;
border-radius: 10px;
padding: 15px;
overflow-y: auto;
}
#chatbot .message {
display: flex;
margin: 10px 0;
}
#chatbot .message.user {
justify-content: flex-end;
}
#chatbot .message.bot {
justify-content: flex-start;
}
/* User bubble */
#chatbot .message.user .bubble {
background: linear-gradient(135deg, #4CAF50, #81C784);
color: white;
border-radius: 16px 16px 0 16px;
padding: 10px 15px;
max-width: 70%;
box-shadow: 0 2px 5px rgba(0,0,0,0.15);
}
/* Bot bubble */
#chatbot .message.bot .bubble {
background: linear-gradient(135deg, #2196F3, #64B5F6);
color: white;
border-radius: 16px 16px 16px 0;
padding: 10px 15px;
max-width: 70%;
box-shadow: 0 2px 5px rgba(0,0,0,0.15);
}
/* Optional: add smooth fade-in animation */
@keyframes bubblePop {
from { transform: scale(0.95); opacity: 0; }
to { transform: scale(1); opacity: 1; }
}
#chatbot .bubble {
animation: bubblePop 0.2s ease-out;
}
#footer {
position: fixed;
bottom: 5px;
left: 0;
width: 100%;
text-align: center;
font-size: 12px;
color: #777;
padding: 5px 0;
background: rgba(255, 255, 255, 0.7);
}
"""
footer_html = """