RushiMane2003's picture
Update app.py
7dcc422 verified
raw
history blame
22.5 kB
from fastapi import FastAPI, HTTPException, Header, Request
from pydantic import BaseModel
import requests
import json
import hmac
import hashlib
import pandas as pd
import os
import re
import statistics
from datetime import datetime
from typing import Optional, Dict, Any, List
app = FastAPI()
# Your Retell.ai secret key (get from Retell.ai dashboard)
RETELL_SECRET_KEY = "key_bdb05277a4587c7441bdad4a2c1b"
# --- WEATHER CONFIG ---
WEATHER_API_KEY = "ee75ffd59875aa5ca6c207e594336b30"
# Load CSV data on startup
def load_csv_data():
"""Load all CSV files into memory"""
data = {}
csv_files = {
'contact_info': '/app/data/contact_info.csv',
'crop_advisory': '/app/data/crop_advisory.csv',
'government_schemes': '/app/data/government_schemes.csv'
}
for key, file_path in csv_files.items():
try:
if os.path.exists(file_path):
data[key] = pd.read_csv(file_path)
# Strip whitespace from column names and string values
data[key].columns = data[key].columns.str.strip()
for col in data[key].select_dtypes(include=['object']).columns:
data[key][col] = data[key][col].astype(str).str.strip()
print(f"Loaded {key}: {len(data[key])} records")
else:
print(f"Warning: {file_path} not found")
data[key] = pd.DataFrame()
except Exception as e:
print(f"Error loading {key}: {str(e)}")
data[key] = pd.DataFrame()
return data
# Load CSV data
csv_data = load_csv_data()
def get_weather(city: str):
"""Fetches weather data from OpenWeatherMap API."""
if not city:
return None, None, None, None
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={WEATHER_API_KEY}&units=metric"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
# OpenWeather returns cod as int or string depending on response
if str(data.get("cod")) == "200":
weather_description = data['weather'][0]['description']
temperature = data['main']['temp']
humidity = data['main']['humidity']
pressure = data['main']['pressure']
return temperature, humidity, weather_description, pressure
except Exception as e:
print(f"Error fetching weather data: {e}")
return None, None, None, None
class RetellRequest(BaseModel):
name: str # Function name
call: Dict[str, Any] # Call object with transcript and context
args: Dict[str, Any] # Function arguments
def verify_retell_signature(request_body: bytes, signature: str) -> bool:
"""Verify the request is from Retell.ai"""
expected_signature = hmac.new(
RETELL_SECRET_KEY.encode(),
request_body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
def search_csv_data(df: pd.DataFrame, search_terms: Dict[str, str]) -> pd.DataFrame:
"""Search dataframe based on multiple criteria"""
if df.empty:
return df
result = df.copy()
for column, value in search_terms.items():
if column in df.columns and value:
# Case-insensitive partial matching
result = result[result[column].astype(str).str.contains(value, case=False, na=False)]
return result
# -------------------------
# Helper utilities
# -------------------------
def find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
"""Return first matching column name from candidates (case-insensitive) or None."""
cols = {c.lower(): c for c in df.columns}
for cand in candidates:
if cand.lower() in cols:
return cols[cand.lower()]
return None
def extract_number_from_price(val: Any) -> Optional[float]:
"""
Try to extract numeric value from price strings like "₹2,180 per quintal" or "2180".
Returns float or None if not parseable.
"""
if pd.isna(val):
return None
if isinstance(val, (int, float)):
return float(val)
s = str(val)
# remove currency symbols and non-digit characters except dot and minus
# first try to find first numeric group
# remove common words like per, quintal
# Use regex to capture numbers like 2,180.50 or 2180.5
match = re.search(r"(-?\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", s.replace('₹','').replace('Rs','').replace('INR',''))
if match:
num = match.group(0).replace(',', '')
try:
return float(num)
except:
return None
return None
def format_scheme_row(row: pd.Series, mapping: Dict[str,str]) -> Dict[str,str]:
"""Build a consistent scheme dict from a CSV row using mapping of fields to column names."""
return {
"scheme": row.get(mapping.get("name", ""), "N/A"),
"introduction": row.get(mapping.get("introduction", ""), ""),
"objective": row.get(mapping.get("objective", ""), ""),
"benefit": row.get(mapping.get("benefit", ""), ""),
"eligibility": row.get(mapping.get("eligibility", ""), ""),
"process": row.get(mapping.get("process", ""), "Contact local agriculture office"),
"contact": row.get(mapping.get("contact", ""), ""),
"extra": row.get(mapping.get("extra", ""), ""),
}
def get_schemes_from_csv(farmer_category: str, land_size: float, state: str, crop_type: str) -> List[Dict[str,str]]:
"""
Read government_schemes dataframe and return a list of scheme dicts.
This function attempts to surface the most relevant schemes first but will
return all schemes if filtering doesn't match.
"""
schemes_out = []
df = csv_data.get('government_schemes', pd.DataFrame())
if df.empty:
return []
# build mapping for column names (supports different CSV header variants)
mapping = {
"name": find_column(df, ["Name", "scheme_name", "Scheme", "Scheme Name"]),
"introduction": find_column(df, ["Introduction", "introduction", "Description"]),
"objective": find_column(df, ["Objective", "objective"]),
"benefit": find_column(df, ["Benefits", "Benefit", "benefit"]),
"eligibility": find_column(df, ["Eligibility Criteria", "eligibility", "Eligibility", "eligibility_criteria"]),
"process": find_column(df, ["Application Process & Required Documents", "application_process", "Process", "application_process & required_documents"]),
"contact": find_column(df, ["Helpline & Website", "contact", "Helpline", "helpline"]),
"extra": find_column(df, ["Extra Details", "extra_details", "Extra"])
}
# Build list of all schemes with formatting
all_schemes = []
for _, r in df.iterrows():
all_schemes.append(format_scheme_row(r, mapping))
# Try to filter schemes based on simple heuristics:
prioritized = []
others = []
state_lower = state.lower() if state else ""
farmer_cat_lower = farmer_category.lower() if farmer_category else ""
crop_lower = crop_type.lower() if crop_type else ""
for s in all_schemes:
elig = str(s.get("eligibility", "")).lower()
text_blob = " ".join([
str(s.get("scheme","") or ""),
str(s.get("introduction","") or ""),
str(s.get("objective","") or ""),
str(s.get("benefit","") or ""),
str(s.get("eligibility","") or ""),
str(s.get("extra","") or "")
]).lower()
score = 0
# If scheme mentions the state explicitly -> higher relevance
if state_lower and state_lower in text_blob:
score += 2
# If eligibility explicitly mentions landholding and user has land_size > 0
if land_size and ("land" in elig or "landholding" in elig or "land" in text_blob):
score += 2
# If eligibility says "all farmers" or similar, raise modestly
if "all" in elig or "all farmers" in elig or "all landholding" in elig:
score += 1
# crop-specific mention
if crop_lower and crop_lower in text_blob:
score += 2
# farmer category mention
if farmer_cat_lower and farmer_cat_lower in text_blob:
score += 1
# Put high-scored into prioritized list
if score >= 2:
prioritized.append((score, s))
else:
others.append((score, s))
# sort priority by score desc
prioritized.sort(key=lambda x: x[0], reverse=True)
others.sort(key=lambda x: x[0], reverse=True)
# return only scheme dicts, prioritized first
schemes_out = [s for _, s in prioritized] + [s for _, s in others]
return schemes_out
# -------------------------
# End helpers
# -------------------------
@app.post("/api/market-prices")
async def market_prices(request: dict):
# Keep your request shape usage intact
crop_name = request.get("query", {}).get("crop_name", "").strip()
state = request.get("query", {}).get("state", "").strip()
district = request.get("query", {}).get("district", "").strip()
# Safely handle missing CSV or missing expected columns
if "crop_advisory" in csv_data and not csv_data["crop_advisory"].empty:
df = csv_data["crop_advisory"].copy()
# find likely column names for crop, state, district, price
crop_col = find_column(df, ["crop_name", "crop", "Crop", "Crop Name"])
state_col = find_column(df, ["state", "State", "state_name"])
district_col = find_column(df, ["district", "District", "district_name"])
price_col = find_column(df, ["price", "Price", "market_price", "market price", "price_per_quintal"])
# build mask progressively (use contains if exact match column not present)
mask = pd.Series([True] * len(df))
if crop_col and crop_name:
mask = mask & df[crop_col].astype(str).str.contains(crop_name, case=False, na=False)
if state_col and state:
mask = mask & df[state_col].astype(str).str.contains(state, case=False, na=False)
if district_col and district:
mask = mask & df[district_col].astype(str).str.contains(district, case=False, na=False)
matches = df[mask]
if not matches.empty:
# compute average over numeric-parsable values in price_col if exists
avg_price = None
parsed_prices = []
if price_col:
for v in matches[price_col].tolist():
num = extract_number_from_price(v)
if num is not None:
parsed_prices.append(num)
if parsed_prices:
try:
avg_price = statistics.mean(parsed_prices)
except Exception:
avg_price = None
if avg_price is not None:
result = f"The average market price of {crop_name} in {district}, {state} is ₹{avg_price:.2f} per quintal."
else:
# If price_col absent or non-numeric, fallback to your previous text but mention CSV found
result = f"Market data found for {crop_name} in {district}, {state} but numeric price values were not available."
return {
"success": True,
"result": result,
"data": matches.to_dict(orient="records")
}
# fallback to previous mock behavior (keeps your logic)
return {
"success": False,
"result": f"No market price data found for {crop_name} in {district}, {state}."
}
@app.post("/api/scheme-eligibility")
async def scheme_eligibility_endpoint(
request: Request,
x_retell_signature: str = Header(None, alias="X-Retell-Signature")
):
"""Handle scheme eligibility function call from Retell.ai"""
request_body = await request.body()
retell_request = json.loads(request_body.decode('utf-8'))
# Extract arguments
farmer_category = retell_request["args"].get("farmer_category", "")
land_size = retell_request["args"].get("land_size", 0)
state = retell_request["args"].get("state", "")
crop_type = retell_request["args"].get("crop_type", "")
try:
eligible_schemes = []
# Search government schemes CSV and apply simple relevance heuristics
if not csv_data['government_schemes'].empty:
eligible_schemes = get_schemes_from_csv(farmer_category, land_size, state, crop_type)
# Add default schemes if no CSV data or as fallback
if not eligible_schemes:
# PM-KISAN eligibility
if land_size and float(land_size) > 0:
eligible_schemes.append({
"scheme": "PM-KISAN",
"benefit": "₹6,000 per year in 3 installments",
"description": "Direct income support to landholding farmer families.",
"eligibility": "All landholding farmer families.",
"process": "Apply online at pmkisan.gov.in or visit nearest CSC",
"contact": "https://pmkisan.gov.in/"
})
# Crop Insurance
eligible_schemes.append({
"scheme": "Pradhan Mantri Fasal Bima Yojana",
"benefit": "Comprehensive crop insurance coverage",
"description": "Crop insurance against natural calamities, pests, and diseases.",
"eligibility": "All farmers in notified crops/areas",
"process": "Contact your nearest bank, CSC or PMFBY portal",
"contact": "https://pmfby.gov.in/"
})
# State-specific schemes
if state and state.lower() == "punjab":
eligible_schemes.append({
"scheme": "Punjab Crop Diversification Scheme",
"benefit": "₹17,500 per hectare for diversification",
"process": "Contact District Agriculture Officer",
"contact": ""
})
# Format response for voice (limit to first 3 items, keep original style)
if eligible_schemes:
schemes_text = f"You are eligible for {len(eligible_schemes)} government schemes: "
for i, scheme in enumerate(eligible_schemes[:3]): # Limit to first 3 for voice response
contact_info = f" Apply through {scheme.get('process','Contact local agriculture office')}" if scheme.get('process') else ""
if scheme.get('contact'):
contact_info += f" or contact {scheme.get('contact')}"
schemes_text += f"{i+1}. {scheme.get('scheme','N/A')} - {scheme.get('benefit', scheme.get('description','N/A'))}.{contact_info}. "
if len(eligible_schemes) > 3:
schemes_text += f"And {len(eligible_schemes)-3} more schemes available."
else:
schemes_text = "I couldn't find specific schemes for your profile. Please contact your local agriculture department for personalized advice."
return {
"result": schemes_text,
"eligible_schemes": eligible_schemes
}
except Exception as e:
return {
"result": "I'm having trouble accessing scheme information right now. Please contact your local agriculture officer or visit the nearest CSC for scheme details.",
"error": str(e)
}
@app.post("/api/weather-advisory")
async def weather_advisory(request: dict):
# Keep your request shape usage intact
city = request.get("query", {}).get("location", "").strip()
temperature, humidity, description, pressure = get_weather(city)
if temperature is None:
# Fallback values
temperature, humidity, description, pressure = 32.0, 60, "Not Available", 1012
weather_condition = "NORMAL"
else:
desc_lower = description.lower()
if "clear" in desc_lower:
weather_condition = "SUNNY"
elif "rain" in desc_lower:
weather_condition = "RAINY"
elif "wind" in desc_lower:
weather_condition = "WINDY"
else:
weather_condition = "NORMAL"
result = (
f"Weather in {city}: {description}. "
f"Temperature {temperature}°C, Humidity {humidity}%, Pressure {pressure} hPa. "
f"Condition classified as {weather_condition}."
)
return {
"success": True,
"result": result,
"data": {
"city": city,
"temperature": temperature,
"humidity": humidity,
"pressure": pressure,
"description": description,
"condition": weather_condition
}
}
@app.post("/api/crop-advisory")
async def crop_advisory_endpoint(
request: Request,
x_retell_signature: str = Header(None, alias="X-Retell-Signature")
):
"""Handle crop advisory function call from Retell.ai"""
request_body = await request.body()
retell_request = json.loads(request_body.decode('utf-8'))
crop_name = retell_request["args"].get("crop_name", "")
growth_stage = retell_request["args"].get("growth_stage", "")
issue_type = retell_request["args"].get("issue_type", "general")
state = retell_request["args"].get("state", "")
try:
advisory = None
contact_info = ""
# Search crop advisory CSV
if not csv_data['crop_advisory'].empty:
search_terms = {}
if crop_name:
# Search by crop name
crop_matches = csv_data['crop_advisory'][
csv_data['crop_advisory']['crop'].str.contains(crop_name, case=False, na=False)
]
if not crop_matches.empty:
crop_info = crop_matches.iloc[0]
# Build advisory based on available data
advisory_parts = []
if issue_type == "general":
if pd.notna(crop_info.get('sowing_time')):
advisory_parts.append(f"Sowing time: {crop_info['sowing_time']}")
if pd.notna(crop_info.get('fertilizer')):
advisory_parts.append(f"Recommended fertilizer: {crop_info['fertilizer']}")
if pd.notna(crop_info.get('season')):
advisory_parts.append(f"Best season: {crop_info['season']}")
# Check for specific issues
if pd.notna(crop_info.get('common_issues')) and pd.notna(crop_info.get('solution')):
if issue_type in ['pest', 'disease'] or issue_type == 'general':
advisory_parts.append(f"For {crop_info['common_issues']}: {crop_info['solution']}")
if advisory_parts:
advisory = f"For {crop_name}: " + ". ".join(advisory_parts)
# Search contact info CSV
if not csv_data['contact_info'].empty and state:
# Search by state
contact_matches = csv_data['contact_info'][
csv_data['contact_info']['state'].str.contains(state, case=False, na=False)
]
if not contact_matches.empty:
contact_match = contact_matches.iloc[0]
contact_parts = []
if pd.notna(contact_match.get('agriculture_officer')):
contact_parts.append(f"Agriculture Officer at {contact_match['agriculture_officer']}")
if pd.notna(contact_match.get('kvk_contact')):
contact_parts.append(f"KVK at {contact_match['kvk_contact']}")
if pd.notna(contact_match.get('kisan_call_center')):
contact_parts.append(f"Kisan Call Center at {contact_match['kisan_call_center']}")
if contact_parts:
contact_info = f"For detailed advice in {state}, contact: " + " or ".join(contact_parts) + "."
# Fallback advisory
if not advisory:
if crop_name.lower() == "wheat" and issue_type == "pest":
advisory = "For wheat pest control: If you see aphids, spray Imidacloprid 200 SL at 0.3ml per liter of water. Spray during evening hours. Avoid over-irrigation which attracts pests."
elif crop_name.lower() == "rice" and issue_type == "disease":
advisory = "For rice disease management: If you see brown spots on leaves, it might be blast disease. Apply Tricyclazole 75% WP at 0.6g per liter. Ensure proper drainage."
else:
advisory = f"For {crop_name} at {growth_stage} stage: Monitor crop regularly, maintain proper spacing, apply fertilizers as per soil test recommendations."
if not contact_info:
contact_info = f"For detailed advice, contact your local Krishi Vigyan Kendra or Agriculture Officer in {state}. You can also call the Kisan Call Centre at 1800-1801-551."
result_text = f"{advisory} {contact_info}"
return {
"result": result_text,
"recommendations": advisory,
"contact_info": contact_info
}
except Exception as e:
return {
"result": f"I couldn't provide specific advice for {crop_name} right now. Please contact your local agriculture extension officer for crop-specific guidance.",
"error": str(e)
}
@app.get("/api/csv-status")
async def csv_status():
"""Check status of loaded CSV files"""
status = {}
for key, df in csv_data.items():
status[key] = {
"loaded": not df.empty,
"records": len(df),
"columns": list(df.columns) if not df.empty else []
}
return status
# Health check endpoint
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"service": "Krishi Mitra API",
"csv_files_loaded": {key: len(df) for key, df in csv_data.items()}
}
@app.get("/")
async def root():
return {
"message": "Krishi Mitra API is running!",
"endpoints": [
"/api/market-prices",
"/api/scheme-eligibility",
"/api/weather-advisory",
"/api/crop-advisory",
"/api/csv-status",
"/health"
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)