from fastapi import FastAPI, HTTPException, Header, Request from pydantic import BaseModel import requests import json import hmac import hashlib import pandas as pd import os import re import statistics from datetime import datetime from typing import Optional, Dict, Any, List app = FastAPI() # Your Retell.ai secret key (get from Retell.ai dashboard) RETELL_SECRET_KEY = "key_bdb05277a4587c7441bdad4a2c1b" # --- WEATHER CONFIG --- WEATHER_API_KEY = "ee75ffd59875aa5ca6c207e594336b30" # Load CSV data on startup def load_csv_data(): """Load all CSV files into memory""" data = {} csv_files = { 'contact_info': '/app/data/contact_info.csv', 'crop_advisory': '/app/data/crop_advisory.csv', 'government_schemes': '/app/data/government_schemes.csv' } for key, file_path in csv_files.items(): try: if os.path.exists(file_path): data[key] = pd.read_csv(file_path) # Strip whitespace from column names and string values data[key].columns = data[key].columns.str.strip() for col in data[key].select_dtypes(include=['object']).columns: data[key][col] = data[key][col].astype(str).str.strip() print(f"Loaded {key}: {len(data[key])} records") else: print(f"Warning: {file_path} not found") data[key] = pd.DataFrame() except Exception as e: print(f"Error loading {key}: {str(e)}") data[key] = pd.DataFrame() return data # Load CSV data csv_data = load_csv_data() def get_weather(city: str): """Fetches weather data from OpenWeatherMap API.""" if not city: return None, None, None, None url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={WEATHER_API_KEY}&units=metric" try: response = requests.get(url, timeout=5) response.raise_for_status() data = response.json() # OpenWeather returns cod as int or string depending on response if str(data.get("cod")) == "200": weather_description = data['weather'][0]['description'] temperature = data['main']['temp'] humidity = data['main']['humidity'] pressure = data['main']['pressure'] return temperature, humidity, weather_description, pressure except Exception as e: print(f"Error fetching weather data: {e}") return None, None, None, None class RetellRequest(BaseModel): name: str # Function name call: Dict[str, Any] # Call object with transcript and context args: Dict[str, Any] # Function arguments def verify_retell_signature(request_body: bytes, signature: str) -> bool: """Verify the request is from Retell.ai""" expected_signature = hmac.new( RETELL_SECRET_KEY.encode(), request_body, hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) def search_csv_data(df: pd.DataFrame, search_terms: Dict[str, str]) -> pd.DataFrame: """Search dataframe based on multiple criteria""" if df.empty: return df result = df.copy() for column, value in search_terms.items(): if column in df.columns and value: # Case-insensitive partial matching result = result[result[column].astype(str).str.contains(value, case=False, na=False)] return result # ------------------------- # Helper utilities # ------------------------- def find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: """Return first matching column name from candidates (case-insensitive) or None.""" cols = {c.lower(): c for c in df.columns} for cand in candidates: if cand.lower() in cols: return cols[cand.lower()] return None def extract_number_from_price(val: Any) -> Optional[float]: """ Try to extract numeric value from price strings like "₹2,180 per quintal" or "2180". Returns float or None if not parseable. """ if pd.isna(val): return None if isinstance(val, (int, float)): return float(val) s = str(val) # remove currency symbols and non-digit characters except dot and minus # first try to find first numeric group # remove common words like per, quintal # Use regex to capture numbers like 2,180.50 or 2180.5 match = re.search(r"(-?\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", s.replace('₹','').replace('Rs','').replace('INR','')) if match: num = match.group(0).replace(',', '') try: return float(num) except: return None return None def format_scheme_row(row: pd.Series, mapping: Dict[str,str]) -> Dict[str,str]: """Build a consistent scheme dict from a CSV row using mapping of fields to column names.""" return { "scheme": row.get(mapping.get("name", ""), "N/A"), "introduction": row.get(mapping.get("introduction", ""), ""), "objective": row.get(mapping.get("objective", ""), ""), "benefit": row.get(mapping.get("benefit", ""), ""), "eligibility": row.get(mapping.get("eligibility", ""), ""), "process": row.get(mapping.get("process", ""), "Contact local agriculture office"), "contact": row.get(mapping.get("contact", ""), ""), "extra": row.get(mapping.get("extra", ""), ""), } def get_schemes_from_csv(farmer_category: str, land_size: float, state: str, crop_type: str) -> List[Dict[str,str]]: """ Read government_schemes dataframe and return a list of scheme dicts. This function attempts to surface the most relevant schemes first but will return all schemes if filtering doesn't match. """ schemes_out = [] df = csv_data.get('government_schemes', pd.DataFrame()) if df.empty: return [] # build mapping for column names (supports different CSV header variants) mapping = { "name": find_column(df, ["Name", "scheme_name", "Scheme", "Scheme Name"]), "introduction": find_column(df, ["Introduction", "introduction", "Description"]), "objective": find_column(df, ["Objective", "objective"]), "benefit": find_column(df, ["Benefits", "Benefit", "benefit"]), "eligibility": find_column(df, ["Eligibility Criteria", "eligibility", "Eligibility", "eligibility_criteria"]), "process": find_column(df, ["Application Process & Required Documents", "application_process", "Process", "application_process & required_documents"]), "contact": find_column(df, ["Helpline & Website", "contact", "Helpline", "helpline"]), "extra": find_column(df, ["Extra Details", "extra_details", "Extra"]) } # Build list of all schemes with formatting all_schemes = [] for _, r in df.iterrows(): all_schemes.append(format_scheme_row(r, mapping)) # Try to filter schemes based on simple heuristics: prioritized = [] others = [] state_lower = state.lower() if state else "" farmer_cat_lower = farmer_category.lower() if farmer_category else "" crop_lower = crop_type.lower() if crop_type else "" for s in all_schemes: elig = str(s.get("eligibility", "")).lower() text_blob = " ".join([ str(s.get("scheme","") or ""), str(s.get("introduction","") or ""), str(s.get("objective","") or ""), str(s.get("benefit","") or ""), str(s.get("eligibility","") or ""), str(s.get("extra","") or "") ]).lower() score = 0 # If scheme mentions the state explicitly -> higher relevance if state_lower and state_lower in text_blob: score += 2 # If eligibility explicitly mentions landholding and user has land_size > 0 if land_size and ("land" in elig or "landholding" in elig or "land" in text_blob): score += 2 # If eligibility says "all farmers" or similar, raise modestly if "all" in elig or "all farmers" in elig or "all landholding" in elig: score += 1 # crop-specific mention if crop_lower and crop_lower in text_blob: score += 2 # farmer category mention if farmer_cat_lower and farmer_cat_lower in text_blob: score += 1 # Put high-scored into prioritized list if score >= 2: prioritized.append((score, s)) else: others.append((score, s)) # sort priority by score desc prioritized.sort(key=lambda x: x[0], reverse=True) others.sort(key=lambda x: x[0], reverse=True) # return only scheme dicts, prioritized first schemes_out = [s for _, s in prioritized] + [s for _, s in others] return schemes_out # ------------------------- # End helpers # ------------------------- @app.post("/api/market-prices") async def market_prices(request: dict): # Keep your request shape usage intact crop_name = request.get("query", {}).get("crop_name", "").strip() state = request.get("query", {}).get("state", "").strip() district = request.get("query", {}).get("district", "").strip() # Safely handle missing CSV or missing expected columns if "crop_advisory" in csv_data and not csv_data["crop_advisory"].empty: df = csv_data["crop_advisory"].copy() # find likely column names for crop, state, district, price crop_col = find_column(df, ["crop_name", "crop", "Crop", "Crop Name"]) state_col = find_column(df, ["state", "State", "state_name"]) district_col = find_column(df, ["district", "District", "district_name"]) price_col = find_column(df, ["price", "Price", "market_price", "market price", "price_per_quintal"]) # build mask progressively (use contains if exact match column not present) mask = pd.Series([True] * len(df)) if crop_col and crop_name: mask = mask & df[crop_col].astype(str).str.contains(crop_name, case=False, na=False) if state_col and state: mask = mask & df[state_col].astype(str).str.contains(state, case=False, na=False) if district_col and district: mask = mask & df[district_col].astype(str).str.contains(district, case=False, na=False) matches = df[mask] if not matches.empty: # compute average over numeric-parsable values in price_col if exists avg_price = None parsed_prices = [] if price_col: for v in matches[price_col].tolist(): num = extract_number_from_price(v) if num is not None: parsed_prices.append(num) if parsed_prices: try: avg_price = statistics.mean(parsed_prices) except Exception: avg_price = None if avg_price is not None: result = f"The average market price of {crop_name} in {district}, {state} is ₹{avg_price:.2f} per quintal." else: # If price_col absent or non-numeric, fallback to your previous text but mention CSV found result = f"Market data found for {crop_name} in {district}, {state} but numeric price values were not available." return { "success": True, "result": result, "data": matches.to_dict(orient="records") } # fallback to previous mock behavior (keeps your logic) return { "success": False, "result": f"No market price data found for {crop_name} in {district}, {state}." } @app.post("/api/scheme-eligibility") async def scheme_eligibility_endpoint( request: Request, x_retell_signature: str = Header(None, alias="X-Retell-Signature") ): """Handle scheme eligibility function call from Retell.ai""" request_body = await request.body() retell_request = json.loads(request_body.decode('utf-8')) # Extract arguments farmer_category = retell_request["args"].get("farmer_category", "") land_size = retell_request["args"].get("land_size", 0) state = retell_request["args"].get("state", "") crop_type = retell_request["args"].get("crop_type", "") try: eligible_schemes = [] # Search government schemes CSV and apply simple relevance heuristics if not csv_data['government_schemes'].empty: eligible_schemes = get_schemes_from_csv(farmer_category, land_size, state, crop_type) # Add default schemes if no CSV data or as fallback if not eligible_schemes: # PM-KISAN eligibility if land_size and float(land_size) > 0: eligible_schemes.append({ "scheme": "PM-KISAN", "benefit": "₹6,000 per year in 3 installments", "description": "Direct income support to landholding farmer families.", "eligibility": "All landholding farmer families.", "process": "Apply online at pmkisan.gov.in or visit nearest CSC", "contact": "https://pmkisan.gov.in/" }) # Crop Insurance eligible_schemes.append({ "scheme": "Pradhan Mantri Fasal Bima Yojana", "benefit": "Comprehensive crop insurance coverage", "description": "Crop insurance against natural calamities, pests, and diseases.", "eligibility": "All farmers in notified crops/areas", "process": "Contact your nearest bank, CSC or PMFBY portal", "contact": "https://pmfby.gov.in/" }) # State-specific schemes if state and state.lower() == "punjab": eligible_schemes.append({ "scheme": "Punjab Crop Diversification Scheme", "benefit": "₹17,500 per hectare for diversification", "process": "Contact District Agriculture Officer", "contact": "" }) # Format response for voice (limit to first 3 items, keep original style) if eligible_schemes: schemes_text = f"You are eligible for {len(eligible_schemes)} government schemes: " for i, scheme in enumerate(eligible_schemes[:3]): # Limit to first 3 for voice response contact_info = f" Apply through {scheme.get('process','Contact local agriculture office')}" if scheme.get('process') else "" if scheme.get('contact'): contact_info += f" or contact {scheme.get('contact')}" schemes_text += f"{i+1}. {scheme.get('scheme','N/A')} - {scheme.get('benefit', scheme.get('description','N/A'))}.{contact_info}. " if len(eligible_schemes) > 3: schemes_text += f"And {len(eligible_schemes)-3} more schemes available." else: schemes_text = "I couldn't find specific schemes for your profile. Please contact your local agriculture department for personalized advice." return { "result": schemes_text, "eligible_schemes": eligible_schemes } except Exception as e: return { "result": "I'm having trouble accessing scheme information right now. Please contact your local agriculture officer or visit the nearest CSC for scheme details.", "error": str(e) } @app.post("/api/weather-advisory") async def weather_advisory(request: dict): # Keep your request shape usage intact city = request.get("query", {}).get("location", "").strip() temperature, humidity, description, pressure = get_weather(city) if temperature is None: # Fallback values temperature, humidity, description, pressure = 32.0, 60, "Not Available", 1012 weather_condition = "NORMAL" else: desc_lower = description.lower() if "clear" in desc_lower: weather_condition = "SUNNY" elif "rain" in desc_lower: weather_condition = "RAINY" elif "wind" in desc_lower: weather_condition = "WINDY" else: weather_condition = "NORMAL" result = ( f"Weather in {city}: {description}. " f"Temperature {temperature}°C, Humidity {humidity}%, Pressure {pressure} hPa. " f"Condition classified as {weather_condition}." ) return { "success": True, "result": result, "data": { "city": city, "temperature": temperature, "humidity": humidity, "pressure": pressure, "description": description, "condition": weather_condition } } @app.post("/api/crop-advisory") async def crop_advisory_endpoint( request: Request, x_retell_signature: str = Header(None, alias="X-Retell-Signature") ): """Handle crop advisory function call from Retell.ai""" request_body = await request.body() retell_request = json.loads(request_body.decode('utf-8')) crop_name = retell_request["args"].get("crop_name", "") growth_stage = retell_request["args"].get("growth_stage", "") issue_type = retell_request["args"].get("issue_type", "general") state = retell_request["args"].get("state", "") try: advisory = None contact_info = "" # Search crop advisory CSV if not csv_data['crop_advisory'].empty: search_terms = {} if crop_name: # Search by crop name crop_matches = csv_data['crop_advisory'][ csv_data['crop_advisory']['crop'].str.contains(crop_name, case=False, na=False) ] if not crop_matches.empty: crop_info = crop_matches.iloc[0] # Build advisory based on available data advisory_parts = [] if issue_type == "general": if pd.notna(crop_info.get('sowing_time')): advisory_parts.append(f"Sowing time: {crop_info['sowing_time']}") if pd.notna(crop_info.get('fertilizer')): advisory_parts.append(f"Recommended fertilizer: {crop_info['fertilizer']}") if pd.notna(crop_info.get('season')): advisory_parts.append(f"Best season: {crop_info['season']}") # Check for specific issues if pd.notna(crop_info.get('common_issues')) and pd.notna(crop_info.get('solution')): if issue_type in ['pest', 'disease'] or issue_type == 'general': advisory_parts.append(f"For {crop_info['common_issues']}: {crop_info['solution']}") if advisory_parts: advisory = f"For {crop_name}: " + ". ".join(advisory_parts) # Search contact info CSV if not csv_data['contact_info'].empty and state: # Search by state contact_matches = csv_data['contact_info'][ csv_data['contact_info']['state'].str.contains(state, case=False, na=False) ] if not contact_matches.empty: contact_match = contact_matches.iloc[0] contact_parts = [] if pd.notna(contact_match.get('agriculture_officer')): contact_parts.append(f"Agriculture Officer at {contact_match['agriculture_officer']}") if pd.notna(contact_match.get('kvk_contact')): contact_parts.append(f"KVK at {contact_match['kvk_contact']}") if pd.notna(contact_match.get('kisan_call_center')): contact_parts.append(f"Kisan Call Center at {contact_match['kisan_call_center']}") if contact_parts: contact_info = f"For detailed advice in {state}, contact: " + " or ".join(contact_parts) + "." # Fallback advisory if not advisory: if crop_name.lower() == "wheat" and issue_type == "pest": advisory = "For wheat pest control: If you see aphids, spray Imidacloprid 200 SL at 0.3ml per liter of water. Spray during evening hours. Avoid over-irrigation which attracts pests." elif crop_name.lower() == "rice" and issue_type == "disease": advisory = "For rice disease management: If you see brown spots on leaves, it might be blast disease. Apply Tricyclazole 75% WP at 0.6g per liter. Ensure proper drainage." else: advisory = f"For {crop_name} at {growth_stage} stage: Monitor crop regularly, maintain proper spacing, apply fertilizers as per soil test recommendations." if not contact_info: contact_info = f"For detailed advice, contact your local Krishi Vigyan Kendra or Agriculture Officer in {state}. You can also call the Kisan Call Centre at 1800-1801-551." result_text = f"{advisory} {contact_info}" return { "result": result_text, "recommendations": advisory, "contact_info": contact_info } except Exception as e: return { "result": f"I couldn't provide specific advice for {crop_name} right now. Please contact your local agriculture extension officer for crop-specific guidance.", "error": str(e) } @app.get("/api/csv-status") async def csv_status(): """Check status of loaded CSV files""" status = {} for key, df in csv_data.items(): status[key] = { "loaded": not df.empty, "records": len(df), "columns": list(df.columns) if not df.empty else [] } return status # Health check endpoint @app.get("/health") async def health_check(): return { "status": "healthy", "service": "Krishi Mitra API", "csv_files_loaded": {key: len(df) for key, df in csv_data.items()} } @app.get("/") async def root(): return { "message": "Krishi Mitra API is running!", "endpoints": [ "/api/market-prices", "/api/scheme-eligibility", "/api/weather-advisory", "/api/crop-advisory", "/api/csv-status", "/health" ] } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)