Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Header, Request | |
| from pydantic import BaseModel | |
| import requests | |
| import json | |
| import hmac | |
| import hashlib | |
| import pandas as pd | |
| import os | |
| import re | |
| import statistics | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| app = FastAPI() | |
| # Your Retell.ai secret key (get from Retell.ai dashboard) | |
| RETELL_SECRET_KEY = "key_bdb05277a4587c7441bdad4a2c1b" | |
| # --- WEATHER CONFIG --- | |
| WEATHER_API_KEY = "ee75ffd59875aa5ca6c207e594336b30" | |
| # Load CSV data on startup | |
| def load_csv_data(): | |
| """Load all CSV files into memory""" | |
| data = {} | |
| csv_files = { | |
| 'contact_info': '/app/data/contact_info.csv', | |
| 'crop_advisory': '/app/data/crop_advisory.csv', | |
| 'government_schemes': '/app/data/government_schemes.csv' | |
| } | |
| for key, file_path in csv_files.items(): | |
| try: | |
| if os.path.exists(file_path): | |
| data[key] = pd.read_csv(file_path) | |
| # Strip whitespace from column names and string values | |
| data[key].columns = data[key].columns.str.strip() | |
| for col in data[key].select_dtypes(include=['object']).columns: | |
| data[key][col] = data[key][col].astype(str).str.strip() | |
| print(f"Loaded {key}: {len(data[key])} records") | |
| else: | |
| print(f"Warning: {file_path} not found") | |
| data[key] = pd.DataFrame() | |
| except Exception as e: | |
| print(f"Error loading {key}: {str(e)}") | |
| data[key] = pd.DataFrame() | |
| return data | |
| # Load CSV data | |
| csv_data = load_csv_data() | |
| def get_weather(city: str): | |
| """Fetches weather data from OpenWeatherMap API.""" | |
| if not city: | |
| return None, None, None, None | |
| url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={WEATHER_API_KEY}&units=metric" | |
| try: | |
| response = requests.get(url, timeout=5) | |
| response.raise_for_status() | |
| data = response.json() | |
| # OpenWeather returns cod as int or string depending on response | |
| if str(data.get("cod")) == "200": | |
| weather_description = data['weather'][0]['description'] | |
| temperature = data['main']['temp'] | |
| humidity = data['main']['humidity'] | |
| pressure = data['main']['pressure'] | |
| return temperature, humidity, weather_description, pressure | |
| except Exception as e: | |
| print(f"Error fetching weather data: {e}") | |
| return None, None, None, None | |
| class RetellRequest(BaseModel): | |
| name: str # Function name | |
| call: Dict[str, Any] # Call object with transcript and context | |
| args: Dict[str, Any] # Function arguments | |
| def verify_retell_signature(request_body: bytes, signature: str) -> bool: | |
| """Verify the request is from Retell.ai""" | |
| expected_signature = hmac.new( | |
| RETELL_SECRET_KEY.encode(), | |
| request_body, | |
| hashlib.sha256 | |
| ).hexdigest() | |
| return hmac.compare_digest(signature, expected_signature) | |
| def search_csv_data(df: pd.DataFrame, search_terms: Dict[str, str]) -> pd.DataFrame: | |
| """Search dataframe based on multiple criteria""" | |
| if df.empty: | |
| return df | |
| result = df.copy() | |
| for column, value in search_terms.items(): | |
| if column in df.columns and value: | |
| # Case-insensitive partial matching | |
| result = result[result[column].astype(str).str.contains(value, case=False, na=False)] | |
| return result | |
| # ------------------------- | |
| # Helper utilities | |
| # ------------------------- | |
| def find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: | |
| """Return first matching column name from candidates (case-insensitive) or None.""" | |
| cols = {c.lower(): c for c in df.columns} | |
| for cand in candidates: | |
| if cand.lower() in cols: | |
| return cols[cand.lower()] | |
| return None | |
| def extract_number_from_price(val: Any) -> Optional[float]: | |
| """ | |
| Try to extract numeric value from price strings like "₹2,180 per quintal" or "2180". | |
| Returns float or None if not parseable. | |
| """ | |
| if pd.isna(val): | |
| return None | |
| if isinstance(val, (int, float)): | |
| return float(val) | |
| s = str(val) | |
| # remove currency symbols and non-digit characters except dot and minus | |
| # first try to find first numeric group | |
| # remove common words like per, quintal | |
| # Use regex to capture numbers like 2,180.50 or 2180.5 | |
| match = re.search(r"(-?\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", s.replace('₹','').replace('Rs','').replace('INR','')) | |
| if match: | |
| num = match.group(0).replace(',', '') | |
| try: | |
| return float(num) | |
| except: | |
| return None | |
| return None | |
| def format_scheme_row(row: pd.Series, mapping: Dict[str,str]) -> Dict[str,str]: | |
| """Build a consistent scheme dict from a CSV row using mapping of fields to column names.""" | |
| return { | |
| "scheme": row.get(mapping.get("name", ""), "N/A"), | |
| "introduction": row.get(mapping.get("introduction", ""), ""), | |
| "objective": row.get(mapping.get("objective", ""), ""), | |
| "benefit": row.get(mapping.get("benefit", ""), ""), | |
| "eligibility": row.get(mapping.get("eligibility", ""), ""), | |
| "process": row.get(mapping.get("process", ""), "Contact local agriculture office"), | |
| "contact": row.get(mapping.get("contact", ""), ""), | |
| "extra": row.get(mapping.get("extra", ""), ""), | |
| } | |
| def get_schemes_from_csv(farmer_category: str, land_size: float, state: str, crop_type: str) -> List[Dict[str,str]]: | |
| """ | |
| Read government_schemes dataframe and return a list of scheme dicts. | |
| This function attempts to surface the most relevant schemes first but will | |
| return all schemes if filtering doesn't match. | |
| """ | |
| schemes_out = [] | |
| df = csv_data.get('government_schemes', pd.DataFrame()) | |
| if df.empty: | |
| return [] | |
| # build mapping for column names (supports different CSV header variants) | |
| mapping = { | |
| "name": find_column(df, ["Name", "scheme_name", "Scheme", "Scheme Name"]), | |
| "introduction": find_column(df, ["Introduction", "introduction", "Description"]), | |
| "objective": find_column(df, ["Objective", "objective"]), | |
| "benefit": find_column(df, ["Benefits", "Benefit", "benefit"]), | |
| "eligibility": find_column(df, ["Eligibility Criteria", "eligibility", "Eligibility", "eligibility_criteria"]), | |
| "process": find_column(df, ["Application Process & Required Documents", "application_process", "Process", "application_process & required_documents"]), | |
| "contact": find_column(df, ["Helpline & Website", "contact", "Helpline", "helpline"]), | |
| "extra": find_column(df, ["Extra Details", "extra_details", "Extra"]) | |
| } | |
| # Build list of all schemes with formatting | |
| all_schemes = [] | |
| for _, r in df.iterrows(): | |
| all_schemes.append(format_scheme_row(r, mapping)) | |
| # Try to filter schemes based on simple heuristics: | |
| prioritized = [] | |
| others = [] | |
| state_lower = state.lower() if state else "" | |
| farmer_cat_lower = farmer_category.lower() if farmer_category else "" | |
| crop_lower = crop_type.lower() if crop_type else "" | |
| for s in all_schemes: | |
| elig = str(s.get("eligibility", "")).lower() | |
| text_blob = " ".join([ | |
| str(s.get("scheme","") or ""), | |
| str(s.get("introduction","") or ""), | |
| str(s.get("objective","") or ""), | |
| str(s.get("benefit","") or ""), | |
| str(s.get("eligibility","") or ""), | |
| str(s.get("extra","") or "") | |
| ]).lower() | |
| score = 0 | |
| # If scheme mentions the state explicitly -> higher relevance | |
| if state_lower and state_lower in text_blob: | |
| score += 2 | |
| # If eligibility explicitly mentions landholding and user has land_size > 0 | |
| if land_size and ("land" in elig or "landholding" in elig or "land" in text_blob): | |
| score += 2 | |
| # If eligibility says "all farmers" or similar, raise modestly | |
| if "all" in elig or "all farmers" in elig or "all landholding" in elig: | |
| score += 1 | |
| # crop-specific mention | |
| if crop_lower and crop_lower in text_blob: | |
| score += 2 | |
| # farmer category mention | |
| if farmer_cat_lower and farmer_cat_lower in text_blob: | |
| score += 1 | |
| # Put high-scored into prioritized list | |
| if score >= 2: | |
| prioritized.append((score, s)) | |
| else: | |
| others.append((score, s)) | |
| # sort priority by score desc | |
| prioritized.sort(key=lambda x: x[0], reverse=True) | |
| others.sort(key=lambda x: x[0], reverse=True) | |
| # return only scheme dicts, prioritized first | |
| schemes_out = [s for _, s in prioritized] + [s for _, s in others] | |
| return schemes_out | |
| # ------------------------- | |
| # End helpers | |
| # ------------------------- | |
| async def market_prices(request: dict): | |
| # Keep your request shape usage intact | |
| crop_name = request.get("query", {}).get("crop_name", "").strip() | |
| state = request.get("query", {}).get("state", "").strip() | |
| district = request.get("query", {}).get("district", "").strip() | |
| # Safely handle missing CSV or missing expected columns | |
| if "crop_advisory" in csv_data and not csv_data["crop_advisory"].empty: | |
| df = csv_data["crop_advisory"].copy() | |
| # find likely column names for crop, state, district, price | |
| crop_col = find_column(df, ["crop_name", "crop", "Crop", "Crop Name"]) | |
| state_col = find_column(df, ["state", "State", "state_name"]) | |
| district_col = find_column(df, ["district", "District", "district_name"]) | |
| price_col = find_column(df, ["price", "Price", "market_price", "market price", "price_per_quintal"]) | |
| # build mask progressively (use contains if exact match column not present) | |
| mask = pd.Series([True] * len(df)) | |
| if crop_col and crop_name: | |
| mask = mask & df[crop_col].astype(str).str.contains(crop_name, case=False, na=False) | |
| if state_col and state: | |
| mask = mask & df[state_col].astype(str).str.contains(state, case=False, na=False) | |
| if district_col and district: | |
| mask = mask & df[district_col].astype(str).str.contains(district, case=False, na=False) | |
| matches = df[mask] | |
| if not matches.empty: | |
| # compute average over numeric-parsable values in price_col if exists | |
| avg_price = None | |
| parsed_prices = [] | |
| if price_col: | |
| for v in matches[price_col].tolist(): | |
| num = extract_number_from_price(v) | |
| if num is not None: | |
| parsed_prices.append(num) | |
| if parsed_prices: | |
| try: | |
| avg_price = statistics.mean(parsed_prices) | |
| except Exception: | |
| avg_price = None | |
| if avg_price is not None: | |
| result = f"The average market price of {crop_name} in {district}, {state} is ₹{avg_price:.2f} per quintal." | |
| else: | |
| # If price_col absent or non-numeric, fallback to your previous text but mention CSV found | |
| result = f"Market data found for {crop_name} in {district}, {state} but numeric price values were not available." | |
| return { | |
| "success": True, | |
| "result": result, | |
| "data": matches.to_dict(orient="records") | |
| } | |
| # fallback to previous mock behavior (keeps your logic) | |
| return { | |
| "success": False, | |
| "result": f"No market price data found for {crop_name} in {district}, {state}." | |
| } | |
| async def scheme_eligibility_endpoint( | |
| request: Request, | |
| x_retell_signature: str = Header(None, alias="X-Retell-Signature") | |
| ): | |
| """Handle scheme eligibility function call from Retell.ai""" | |
| request_body = await request.body() | |
| retell_request = json.loads(request_body.decode('utf-8')) | |
| # Extract arguments | |
| farmer_category = retell_request["args"].get("farmer_category", "") | |
| land_size = retell_request["args"].get("land_size", 0) | |
| state = retell_request["args"].get("state", "") | |
| crop_type = retell_request["args"].get("crop_type", "") | |
| try: | |
| eligible_schemes = [] | |
| # Search government schemes CSV and apply simple relevance heuristics | |
| if not csv_data['government_schemes'].empty: | |
| eligible_schemes = get_schemes_from_csv(farmer_category, land_size, state, crop_type) | |
| # Add default schemes if no CSV data or as fallback | |
| if not eligible_schemes: | |
| # PM-KISAN eligibility | |
| if land_size and float(land_size) > 0: | |
| eligible_schemes.append({ | |
| "scheme": "PM-KISAN", | |
| "benefit": "₹6,000 per year in 3 installments", | |
| "description": "Direct income support to landholding farmer families.", | |
| "eligibility": "All landholding farmer families.", | |
| "process": "Apply online at pmkisan.gov.in or visit nearest CSC", | |
| "contact": "https://pmkisan.gov.in/" | |
| }) | |
| # Crop Insurance | |
| eligible_schemes.append({ | |
| "scheme": "Pradhan Mantri Fasal Bima Yojana", | |
| "benefit": "Comprehensive crop insurance coverage", | |
| "description": "Crop insurance against natural calamities, pests, and diseases.", | |
| "eligibility": "All farmers in notified crops/areas", | |
| "process": "Contact your nearest bank, CSC or PMFBY portal", | |
| "contact": "https://pmfby.gov.in/" | |
| }) | |
| # State-specific schemes | |
| if state and state.lower() == "punjab": | |
| eligible_schemes.append({ | |
| "scheme": "Punjab Crop Diversification Scheme", | |
| "benefit": "₹17,500 per hectare for diversification", | |
| "process": "Contact District Agriculture Officer", | |
| "contact": "" | |
| }) | |
| # Format response for voice (limit to first 3 items, keep original style) | |
| if eligible_schemes: | |
| schemes_text = f"You are eligible for {len(eligible_schemes)} government schemes: " | |
| for i, scheme in enumerate(eligible_schemes[:3]): # Limit to first 3 for voice response | |
| contact_info = f" Apply through {scheme.get('process','Contact local agriculture office')}" if scheme.get('process') else "" | |
| if scheme.get('contact'): | |
| contact_info += f" or contact {scheme.get('contact')}" | |
| schemes_text += f"{i+1}. {scheme.get('scheme','N/A')} - {scheme.get('benefit', scheme.get('description','N/A'))}.{contact_info}. " | |
| if len(eligible_schemes) > 3: | |
| schemes_text += f"And {len(eligible_schemes)-3} more schemes available." | |
| else: | |
| schemes_text = "I couldn't find specific schemes for your profile. Please contact your local agriculture department for personalized advice." | |
| return { | |
| "result": schemes_text, | |
| "eligible_schemes": eligible_schemes | |
| } | |
| except Exception as e: | |
| return { | |
| "result": "I'm having trouble accessing scheme information right now. Please contact your local agriculture officer or visit the nearest CSC for scheme details.", | |
| "error": str(e) | |
| } | |
| async def weather_advisory(request: dict): | |
| # Keep your request shape usage intact | |
| city = request.get("query", {}).get("location", "").strip() | |
| temperature, humidity, description, pressure = get_weather(city) | |
| if temperature is None: | |
| # Fallback values | |
| temperature, humidity, description, pressure = 32.0, 60, "Not Available", 1012 | |
| weather_condition = "NORMAL" | |
| else: | |
| desc_lower = description.lower() | |
| if "clear" in desc_lower: | |
| weather_condition = "SUNNY" | |
| elif "rain" in desc_lower: | |
| weather_condition = "RAINY" | |
| elif "wind" in desc_lower: | |
| weather_condition = "WINDY" | |
| else: | |
| weather_condition = "NORMAL" | |
| result = ( | |
| f"Weather in {city}: {description}. " | |
| f"Temperature {temperature}°C, Humidity {humidity}%, Pressure {pressure} hPa. " | |
| f"Condition classified as {weather_condition}." | |
| ) | |
| return { | |
| "success": True, | |
| "result": result, | |
| "data": { | |
| "city": city, | |
| "temperature": temperature, | |
| "humidity": humidity, | |
| "pressure": pressure, | |
| "description": description, | |
| "condition": weather_condition | |
| } | |
| } | |
| async def crop_advisory_endpoint( | |
| request: Request, | |
| x_retell_signature: str = Header(None, alias="X-Retell-Signature") | |
| ): | |
| """Handle crop advisory function call from Retell.ai""" | |
| request_body = await request.body() | |
| retell_request = json.loads(request_body.decode('utf-8')) | |
| crop_name = retell_request["args"].get("crop_name", "") | |
| growth_stage = retell_request["args"].get("growth_stage", "") | |
| issue_type = retell_request["args"].get("issue_type", "general") | |
| state = retell_request["args"].get("state", "") | |
| try: | |
| advisory = None | |
| contact_info = "" | |
| # Search crop advisory CSV | |
| if not csv_data['crop_advisory'].empty: | |
| search_terms = {} | |
| if crop_name: | |
| # Search by crop name | |
| crop_matches = csv_data['crop_advisory'][ | |
| csv_data['crop_advisory']['crop'].str.contains(crop_name, case=False, na=False) | |
| ] | |
| if not crop_matches.empty: | |
| crop_info = crop_matches.iloc[0] | |
| # Build advisory based on available data | |
| advisory_parts = [] | |
| if issue_type == "general": | |
| if pd.notna(crop_info.get('sowing_time')): | |
| advisory_parts.append(f"Sowing time: {crop_info['sowing_time']}") | |
| if pd.notna(crop_info.get('fertilizer')): | |
| advisory_parts.append(f"Recommended fertilizer: {crop_info['fertilizer']}") | |
| if pd.notna(crop_info.get('season')): | |
| advisory_parts.append(f"Best season: {crop_info['season']}") | |
| # Check for specific issues | |
| if pd.notna(crop_info.get('common_issues')) and pd.notna(crop_info.get('solution')): | |
| if issue_type in ['pest', 'disease'] or issue_type == 'general': | |
| advisory_parts.append(f"For {crop_info['common_issues']}: {crop_info['solution']}") | |
| if advisory_parts: | |
| advisory = f"For {crop_name}: " + ". ".join(advisory_parts) | |
| # Search contact info CSV | |
| if not csv_data['contact_info'].empty and state: | |
| # Search by state | |
| contact_matches = csv_data['contact_info'][ | |
| csv_data['contact_info']['state'].str.contains(state, case=False, na=False) | |
| ] | |
| if not contact_matches.empty: | |
| contact_match = contact_matches.iloc[0] | |
| contact_parts = [] | |
| if pd.notna(contact_match.get('agriculture_officer')): | |
| contact_parts.append(f"Agriculture Officer at {contact_match['agriculture_officer']}") | |
| if pd.notna(contact_match.get('kvk_contact')): | |
| contact_parts.append(f"KVK at {contact_match['kvk_contact']}") | |
| if pd.notna(contact_match.get('kisan_call_center')): | |
| contact_parts.append(f"Kisan Call Center at {contact_match['kisan_call_center']}") | |
| if contact_parts: | |
| contact_info = f"For detailed advice in {state}, contact: " + " or ".join(contact_parts) + "." | |
| # Fallback advisory | |
| if not advisory: | |
| if crop_name.lower() == "wheat" and issue_type == "pest": | |
| advisory = "For wheat pest control: If you see aphids, spray Imidacloprid 200 SL at 0.3ml per liter of water. Spray during evening hours. Avoid over-irrigation which attracts pests." | |
| elif crop_name.lower() == "rice" and issue_type == "disease": | |
| advisory = "For rice disease management: If you see brown spots on leaves, it might be blast disease. Apply Tricyclazole 75% WP at 0.6g per liter. Ensure proper drainage." | |
| else: | |
| advisory = f"For {crop_name} at {growth_stage} stage: Monitor crop regularly, maintain proper spacing, apply fertilizers as per soil test recommendations." | |
| if not contact_info: | |
| contact_info = f"For detailed advice, contact your local Krishi Vigyan Kendra or Agriculture Officer in {state}. You can also call the Kisan Call Centre at 1800-1801-551." | |
| result_text = f"{advisory} {contact_info}" | |
| return { | |
| "result": result_text, | |
| "recommendations": advisory, | |
| "contact_info": contact_info | |
| } | |
| except Exception as e: | |
| return { | |
| "result": f"I couldn't provide specific advice for {crop_name} right now. Please contact your local agriculture extension officer for crop-specific guidance.", | |
| "error": str(e) | |
| } | |
| async def csv_status(): | |
| """Check status of loaded CSV files""" | |
| status = {} | |
| for key, df in csv_data.items(): | |
| status[key] = { | |
| "loaded": not df.empty, | |
| "records": len(df), | |
| "columns": list(df.columns) if not df.empty else [] | |
| } | |
| return status | |
| # Health check endpoint | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "service": "Krishi Mitra API", | |
| "csv_files_loaded": {key: len(df) for key, df in csv_data.items()} | |
| } | |
| async def root(): | |
| return { | |
| "message": "Krishi Mitra API is running!", | |
| "endpoints": [ | |
| "/api/market-prices", | |
| "/api/scheme-eligibility", | |
| "/api/weather-advisory", | |
| "/api/crop-advisory", | |
| "/api/csv-status", | |
| "/health" | |
| ] | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |