Spaces:
Sleeping
Sleeping
| # app.py | |
| from fastapi import FastAPI, HTTPException, Header, Request, Query | |
| from pydantic import BaseModel | |
| import requests | |
| import json | |
| import hmac | |
| import hashlib | |
| import pandas as pd | |
| import os | |
| import re | |
| import statistics | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any, List, Union | |
| import requests | |
| import urllib.parse | |
| app = FastAPI(title="Krishi Mitra API") | |
| # ------------------------- | |
| # Configuration (update with env vars in production) | |
| # ------------------------- | |
| RETELL_SECRET_KEY = os.getenv("RETELL_SECRET_KEY", "key_bdb05277a4587c7441bdad4a2c1b") | |
| WEATHER_API_KEY = os.getenv("WEATHER_API_KEY", "ee75ffd59875aa5ca6c207e594336b30") | |
| # ------------------------- | |
| # CSV loader | |
| # ------------------------- | |
| def load_csv_data(): | |
| """Load all CSV files into memory; trim whitespace from columns and string cells.""" | |
| data = {} | |
| csv_files = { | |
| 'contact_info': './data/contact_info.csv', | |
| 'crop_advisory': './data/crop_advisory.csv', | |
| 'government_schemes': './data/government_schemes.csv', | |
| 'market_prices': './data/market_prices.csv' | |
| } | |
| for key, file_path in csv_files.items(): | |
| try: | |
| if os.path.exists(file_path): | |
| df = pd.read_csv(file_path) | |
| # strip whitespace from column names | |
| df.columns = df.columns.str.strip() | |
| # strip whitespace from string columns | |
| for col in df.select_dtypes(include=['object']).columns: | |
| df[col] = df[col].astype(str).str.strip() | |
| data[key] = df | |
| print(f"Loaded {key} ({file_path}): {len(df)} records") | |
| else: | |
| print(f"Warning: {file_path} not found - {key} will be empty") | |
| data[key] = pd.DataFrame() | |
| except Exception as e: | |
| print(f"Error loading {file_path}: {e}") | |
| data[key] = pd.DataFrame() | |
| return data | |
| csv_data = load_csv_data() | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def verify_retell_signature(request_body: bytes, signature: Optional[str]) -> bool: | |
| """Verify the request is from Retell.ai if signature provided. If no signature, treat as allowed (for local testing).""" | |
| if not signature: | |
| return True | |
| expected_signature = hmac.new( | |
| RETELL_SECRET_KEY.encode(), | |
| request_body, | |
| hashlib.sha256 | |
| ).hexdigest() | |
| return hmac.compare_digest(signature, expected_signature) | |
| def find_column(df: pd.DataFrame, candidates: List[str]) -> Optional[str]: | |
| """Return first matching column name from candidates (case-insensitive) or None.""" | |
| cols = {c.lower(): c for c in df.columns} | |
| for cand in candidates: | |
| if cand and cand.lower() in cols: | |
| return cols[cand.lower()] | |
| return None | |
| def extract_number_from_price(val: Any) -> Optional[float]: | |
| """Extract numeric value from messy price strings like '₹2,180 per quintal'.""" | |
| if pd.isna(val): | |
| return None | |
| if isinstance(val, (int, float)): | |
| return float(val) | |
| s = str(val) | |
| s = s.replace('₹', '').replace('Rs', '').replace('INR', '') | |
| match = re.search(r"(-?\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|-?\d+(?:\.\d+)?)", s) | |
| if match: | |
| try: | |
| return float(match.group(0).replace(',', '')) | |
| except: | |
| return None | |
| return None | |
| def format_scheme_row(row: pd.Series, mapping: Dict[str,str]) -> Dict[str,str]: | |
| """Normalize scheme row into dict keys used in responses.""" | |
| return { | |
| "scheme": row.get(mapping.get("name", ""), "N/A"), | |
| "introduction": row.get(mapping.get("introduction", ""), ""), | |
| "objective": row.get(mapping.get("objective", ""), ""), | |
| "benefit": row.get(mapping.get("benefit", ""), ""), | |
| "eligibility": row.get(mapping.get("eligibility", ""), ""), | |
| "process": row.get(mapping.get("process", ""), "Contact local agriculture office"), | |
| "contact": row.get(mapping.get("contact", ""), ""), | |
| "extra": row.get(mapping.get("extra", ""), "") | |
| } | |
| def get_schemes_from_csv(farmer_category: str, land_size: float, state: str, crop_type: str) -> List[Dict[str,str]]: | |
| """Return list of scheme dicts from government_schemes CSV (with simple heuristics).""" | |
| schemes_out = [] | |
| df = csv_data.get('government_schemes', pd.DataFrame()) | |
| if df.empty: | |
| return [] | |
| mapping = { | |
| "name": find_column(df, ["Name", "scheme_name", "Scheme", "Scheme Name"]), | |
| "introduction": find_column(df, ["Introduction", "introduction", "Description"]), | |
| "objective": find_column(df, ["Objective", "objective"]), | |
| "benefit": find_column(df, ["Benefits", "Benefit", "benefit"]), | |
| "eligibility": find_column(df, ["Eligibility Criteria", "eligibility", "Eligibility", "eligibility_criteria"]), | |
| "process": find_column(df, ["Application Process & Required Documents", "application_process", "Process", "application_process & required_documents"]), | |
| "contact": find_column(df, ["Helpline & Website", "contact", "Helpline", "helpline"]), | |
| "extra": find_column(df, ["Extra Details", "extra_details", "Extra"]) | |
| } | |
| all_schemes = [] | |
| for _, r in df.iterrows(): | |
| all_schemes.append(format_scheme_row(r, mapping)) | |
| prioritized = [] | |
| others = [] | |
| state_lower = (state or "").lower() | |
| farmer_cat_lower = (farmer_category or "").lower() | |
| crop_lower = (crop_type or "").lower() | |
| for s in all_schemes: | |
| elig = str(s.get("eligibility", "")).lower() | |
| text_blob = " ".join([ | |
| str(s.get("scheme","") or ""), | |
| str(s.get("introduction","") or ""), | |
| str(s.get("objective","") or ""), | |
| str(s.get("benefit","") or ""), | |
| str(s.get("eligibility","") or ""), | |
| str(s.get("extra","") or "") | |
| ]).lower() | |
| score = 0 | |
| if state_lower and state_lower in text_blob: | |
| score += 2 | |
| if land_size and ("land" in elig or "landholding" in elig or "land" in text_blob): | |
| score += 2 | |
| if "all" in elig or "all farmers" in elig: | |
| score += 1 | |
| if crop_lower and crop_lower in text_blob: | |
| score += 2 | |
| if farmer_cat_lower and farmer_cat_lower in text_blob: | |
| score += 1 | |
| if score >= 2: | |
| prioritized.append((score, s)) | |
| else: | |
| others.append((score, s)) | |
| prioritized.sort(key=lambda x: x[0], reverse=True) | |
| others.sort(key=lambda x: x[0], reverse=True) | |
| schemes_out = [s for _, s in prioritized] + [s for _, s in others] | |
| return schemes_out | |
| # ------------------------- | |
| # Weather helper (simple) | |
| # ------------------------- | |
| def get_weather(city: str): | |
| """Fetch weather data from OpenWeatherMap API. Returns (temperature, humidity, description, pressure) or (None,...).""" | |
| if not city: | |
| return None, None, None, None | |
| url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={WEATHER_API_KEY}&units=metric" | |
| try: | |
| resp = requests.get(url, timeout=5) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if str(data.get("cod")) == "200": | |
| weather_description = data['weather'][0]['description'] | |
| temperature = data['main']['temp'] | |
| humidity = data['main']['humidity'] | |
| pressure = data['main']['pressure'] | |
| return temperature, humidity, weather_description, pressure | |
| except Exception as e: | |
| print(f"Weather fetch error: {e}") | |
| return None, None, None, None | |
| # ------------------------- | |
| # Market Prices Helper Functions (Updated for CSV) | |
| # ------------------------- | |
| def get_market_prices_from_csv(state: str, district: Optional[str] = None, crop_name: Optional[str] = None): | |
| """ | |
| Fetch market price data from local CSV file | |
| Returns (success: bool, data: list, message: str) | |
| """ | |
| try: | |
| # Load market prices CSV | |
| market_df = csv_data.get('market_prices', pd.DataFrame()) | |
| # If market_prices not loaded, try to load it directly | |
| if market_df.empty: | |
| market_csv_path = './data/market_prices.csv' | |
| if os.path.exists(market_csv_path): | |
| market_df = pd.read_csv(market_csv_path) | |
| # Clean column names and string data | |
| market_df.columns = market_df.columns.str.strip() | |
| for col in market_df.select_dtypes(include=['object']).columns: | |
| market_df[col] = market_df[col].astype(str).str.strip() | |
| # Update the global csv_data | |
| csv_data['market_prices'] = market_df | |
| else: | |
| return False, [], f"Market prices CSV file not found at {market_csv_path}" | |
| if market_df.empty: | |
| return False, [], "No market price data available" | |
| # Find relevant columns (case-insensitive matching) | |
| state_col = find_column(market_df, ["State", "state"]) | |
| district_col = find_column(market_df, ["District", "district"]) | |
| commodity_col = find_column(market_df, ["Commodity", "commodity", "Crop", "crop"]) | |
| market_col = find_column(market_df, ["Market", "market"]) | |
| variety_col = find_column(market_df, ["Variety", "variety"]) | |
| date_col = find_column(market_df, ["Arrival_Date", "arrival_date", "Date", "date"]) | |
| min_price_col = find_column(market_df, ["Min_x0020_Price", "min_price", "Min_Price", "Minimum_Price"]) | |
| max_price_col = find_column(market_df, ["Max_x0020_Price", "max_price", "Max_Price", "Maximum_Price"]) | |
| modal_price_col = find_column(market_df, ["Modal_x0020_Price", "modal_price", "Modal_Price", "Average_Price"]) | |
| if not state_col: | |
| return False, [], "State column not found in market prices data" | |
| # Filter by state (case-insensitive) | |
| filtered_df = market_df[market_df[state_col].astype(str).str.contains(state, case=False, na=False)] | |
| # Filter by district if provided | |
| if district and district_col: | |
| filtered_df = filtered_df[filtered_df[district_col].astype(str).str.contains(district, case=False, na=False)] | |
| # Filter by crop/commodity if provided | |
| if crop_name and commodity_col: | |
| filtered_df = filtered_df[filtered_df[commodity_col].astype(str).str.contains(crop_name, case=False, na=False)] | |
| if filtered_df.empty: | |
| return False, [], f"No market price data found for the specified criteria" | |
| # Convert to list of dictionaries | |
| processed_data = [] | |
| for _, record in filtered_df.iterrows(): | |
| processed_record = { | |
| "state": record.get(state_col, "") if state_col else "", | |
| "district": record.get(district_col, "") if district_col else "", | |
| "market": record.get(market_col, "") if market_col else "", | |
| "commodity": record.get(commodity_col, "") if commodity_col else "", | |
| "variety": record.get(variety_col, "") if variety_col else "", | |
| "arrival_date": record.get(date_col, "") if date_col else "", | |
| "min_price": record.get(min_price_col, "") if min_price_col else "", | |
| "max_price": record.get(max_price_col, "") if max_price_col else "", | |
| "modal_price": record.get(modal_price_col, "") if modal_price_col else "" | |
| } | |
| processed_data.append(processed_record) | |
| return True, processed_data, f"Found {len(processed_data)} market price records" | |
| except Exception as e: | |
| return False, [], f"Error processing market data: {str(e)}" | |
| def format_market_prices_response(data: List[Dict], state: str, district: Optional[str] = None, crop_name: Optional[str] = None): | |
| """ | |
| Format market price data into a voice-friendly response | |
| """ | |
| if not data: | |
| location_text = f"{district}, {state}" if district else state | |
| return f"No current market price data available for {location_text}. Please contact your local market or agriculture office for current rates." | |
| # Group data by commodity for better presentation | |
| commodity_data = {} | |
| for record in data: | |
| commodity = record.get("commodity", "Unknown") | |
| if commodity not in commodity_data: | |
| commodity_data[commodity] = [] | |
| commodity_data[commodity].append(record) | |
| # Build response text | |
| location_text = f"{district}, {state}" if district else state | |
| if crop_name and crop_name.lower() in [c.lower() for c in commodity_data.keys()]: | |
| # Specific crop requested | |
| matching_commodity = next(c for c in commodity_data.keys() if c.lower() == crop_name.lower()) | |
| crop_records = commodity_data[matching_commodity] | |
| if len(crop_records) == 1: | |
| record = crop_records[0] | |
| response_text = f"Market price for {matching_commodity} in {record.get('market', location_text)}: " | |
| # Clean and format prices | |
| min_price = extract_number_from_price(record.get('min_price', '')) | |
| max_price = extract_number_from_price(record.get('max_price', '')) | |
| modal_price = extract_number_from_price(record.get('modal_price', '')) | |
| if min_price is not None: | |
| response_text += f"Minimum ₹{min_price:.0f}, " | |
| if max_price is not None: | |
| response_text += f"Maximum ₹{max_price:.0f}, " | |
| if modal_price is not None: | |
| response_text += f"Modal price ₹{modal_price:.0f} per quintal. " | |
| else: | |
| response_text += "per quintal. " | |
| if record.get('arrival_date'): | |
| response_text += f"Data from {record.get('arrival_date')}." | |
| else: | |
| # Multiple records for the same commodity | |
| min_prices = [] | |
| max_prices = [] | |
| modal_prices = [] | |
| for r in crop_records: | |
| min_p = extract_number_from_price(r.get('min_price', '')) | |
| max_p = extract_number_from_price(r.get('max_price', '')) | |
| modal_p = extract_number_from_price(r.get('modal_price', '')) | |
| if min_p is not None: | |
| min_prices.append(min_p) | |
| if max_p is not None: | |
| max_prices.append(max_p) | |
| if modal_p is not None: | |
| modal_prices.append(modal_p) | |
| response_text = f"Market prices for {matching_commodity} in {location_text}: " | |
| if min_prices and max_prices: | |
| response_text += f"Price range ₹{min(min_prices):.0f} to ₹{max(max_prices):.0f} per quintal. " | |
| if modal_prices: | |
| avg_modal = sum(modal_prices) / len(modal_prices) | |
| response_text += f"Average modal price ₹{avg_modal:.0f} per quintal. " | |
| response_text += f"Data from {len(crop_records)} markets." | |
| else: | |
| # General market overview or multiple commodities | |
| response_text = f"Current market prices in {location_text}: " | |
| commodity_summaries = [] | |
| for commodity, records in list(commodity_data.items())[:5]: # Limit to 5 commodities for voice | |
| if records: | |
| modal_prices = [] | |
| for r in records: | |
| modal_p = extract_number_from_price(r.get('modal_price', '')) | |
| if modal_p is not None: | |
| modal_prices.append(modal_p) | |
| if modal_prices: | |
| avg_price = sum(modal_prices) / len(modal_prices) | |
| commodity_summaries.append(f"{commodity} at ₹{avg_price:.0f}") | |
| else: | |
| commodity_summaries.append(f"{commodity} (price varies)") | |
| if commodity_summaries: | |
| response_text += ", ".join(commodity_summaries) + " per quintal. " | |
| if len(commodity_data) > 5: | |
| response_text += f"And {len(commodity_data) - 5} more commodities available." | |
| return response_text | |
| # ------------------------- | |
| # Request models (if needed) | |
| # ------------------------- | |
| class RetellRequest(BaseModel): | |
| name: str | |
| call: Dict[str, Any] | |
| args: Dict[str, Any] | |
| # ------------------------- | |
| # Endpoints | |
| # ------------------------- | |
| # Root and health | |
| async def root(): | |
| return { | |
| "message": "Krishi Mitra API is running!", | |
| "endpoints": [ | |
| "/api/market-prices (GET|POST)", | |
| "/api/scheme-eligibility (GET|POST)", | |
| "/api/weather-advisory (GET|POST)", | |
| "/api/crop-advisory (GET|POST)", | |
| "/api/csv-status (GET)", | |
| "/health (GET)" | |
| ] | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "service": "Krishi Mitra API", | |
| "csv_files_loaded": {key: len(df) for key, df in csv_data.items()} | |
| } | |
| async def csv_status(): | |
| """Check status of loaded CSV files""" | |
| status = {} | |
| for key, df in csv_data.items(): | |
| status[key] = { | |
| "loaded": not df.empty, | |
| "records": len(df), | |
| "columns": list(df.columns) if not df.empty else [] | |
| } | |
| return status | |
| # ------------------------- | |
| # Market prices (Updated for CSV) | |
| # ------------------------- | |
| async def market_prices_post(request: Request): | |
| """ | |
| Get market prices from local CSV data | |
| """ | |
| try: | |
| body = await request.json() if (await request.body()) else {} | |
| # Extract parameters from different possible locations in payload | |
| query_params = body.get("query", {}) | |
| args_params = body.get("args", {}) | |
| crop_name = ( | |
| query_params.get("crop_name", "") or | |
| args_params.get("crop_name", "") or | |
| body.get("crop_name", "") | |
| ).strip() | |
| state = ( | |
| query_params.get("state", "") or | |
| args_params.get("state", "") or | |
| body.get("state", "") | |
| ).strip() | |
| district = ( | |
| query_params.get("district", "") or | |
| args_params.get("district", "") or | |
| body.get("district", "") | |
| ).strip() | |
| if not state: | |
| return { | |
| "success": False, | |
| "result": "Please provide state name to get market prices.", | |
| "data": [] | |
| } | |
| # Fetch market data from CSV | |
| success, data, message = get_market_prices_from_csv(state, district or None, crop_name or None) | |
| if success: | |
| response_text = format_market_prices_response(data, state, district or None, crop_name or None) | |
| return { | |
| "success": True, | |
| "result": response_text, | |
| "data": data[:10], # Limit response data for voice interface | |
| "total_records": len(data) | |
| } | |
| else: | |
| # Fallback message | |
| location_text = f"{district}, {state}" if district else state | |
| fallback_message = f"Current market price data for {location_text} is not available right now. Please contact your local mandi or agriculture market committee for current rates." | |
| return { | |
| "success": False, | |
| "result": fallback_message, | |
| "data": [], | |
| "error": message | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "result": "I'm having trouble accessing market price data right now. Please contact your local mandi for current rates.", | |
| "data": [], | |
| "error": str(e) | |
| } | |
| async def market_prices_get( | |
| crop_name: Optional[str] = Query("", alias="crop_name"), | |
| state: Optional[str] = Query("", alias="state"), | |
| district: Optional[str] = Query("", alias="district") | |
| ): | |
| """ | |
| Get market prices via GET request from local CSV | |
| """ | |
| if not state: | |
| return { | |
| "success": False, | |
| "result": "Please provide state parameter to get market prices.", | |
| "data": [] | |
| } | |
| try: | |
| # Fetch market data from CSV | |
| success, data, message = get_market_prices_from_csv(state.strip(), district.strip() if district else None, crop_name.strip() if crop_name else None) | |
| if success: | |
| response_text = format_market_prices_response(data, state.strip(), district.strip() if district else None, crop_name.strip() if crop_name else None) | |
| return { | |
| "success": True, | |
| "result": response_text, | |
| "data": data[:10], # Limit response data | |
| "total_records": len(data) | |
| } | |
| else: | |
| # Fallback message | |
| location_text = f"{district}, {state}" if district else state | |
| fallback_message = f"Current market price data for {location_text} is not available right now. Please contact your local mandi or agriculture market committee for current rates." | |
| return { | |
| "success": False, | |
| "result": fallback_message, | |
| "data": [], | |
| "error": message | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "result": "I'm having trouble accessing market price data right now. Please contact your local mandi for current rates.", | |
| "data": [], | |
| "error": str(e) | |
| } | |
| # ------------------------- | |
| # Scheme eligibility (POST for Retell style, GET for easy testing) | |
| # ------------------------- | |
| async def scheme_eligibility_endpoint( | |
| request: Request, | |
| x_retell_signature: Optional[str] = Header(None, alias="X-Retell-Signature") | |
| ): | |
| request_body = await request.body() | |
| # verify signature if header present | |
| if x_retell_signature and not verify_retell_signature(request_body, x_retell_signature): | |
| raise HTTPException(status_code=401, detail="Invalid Retell signature") | |
| try: | |
| payload = json.loads(request_body.decode('utf-8')) if request_body else {} | |
| except Exception: | |
| payload = {} | |
| farmer_category = payload.get("args", {}).get("farmer_category", "") or payload.get("farmer_category", "") | |
| land_size = payload.get("args", {}).get("land_size", 0) or payload.get("land_size", 0) | |
| state = payload.get("args", {}).get("state", "") or payload.get("state", "") | |
| crop_type = payload.get("args", {}).get("crop_type", "") or payload.get("crop_type", "") | |
| try: | |
| eligible_schemes = [] | |
| if not csv_data['government_schemes'].empty: | |
| # ensure land_size numeric | |
| try: | |
| land_size_f = float(land_size) if land_size not in [None, ""] else 0.0 | |
| except: | |
| land_size_f = 0.0 | |
| eligible_schemes = get_schemes_from_csv(farmer_category or "", land_size_f, state or "", crop_type or "") | |
| # Fallback defaults | |
| if not eligible_schemes: | |
| try: | |
| ls_f = float(land_size) if land_size not in [None, ""] else 0.0 | |
| except: | |
| ls_f = 0.0 | |
| if ls_f > 0: | |
| eligible_schemes.append({ | |
| "scheme": "PM-KISAN", | |
| "benefit": "₹6,000 per year in 3 installments", | |
| "description": "Direct income support to landholding farmer families.", | |
| "eligibility": "All landholding farmer families.", | |
| "process": "Apply via pmkisan.gov.in or your nearest CSC", | |
| "contact": "https://pmkisan.gov.in/" | |
| }) | |
| eligible_schemes.append({ | |
| "scheme": "Pradhan Mantri Fasal Bima Yojana", | |
| "benefit": "Comprehensive crop insurance coverage", | |
| "description": "Crop insurance against natural calamities, pests, and diseases.", | |
| "eligibility": "All farmers in notified crops/areas", | |
| "process": "Contact your nearest bank, CSC or PMFBY portal", | |
| "contact": "https://pmfby.gov.in/" | |
| }) | |
| if state and state.strip().lower() == "punjab": | |
| eligible_schemes.append({ | |
| "scheme": "Punjab Crop Diversification Scheme", | |
| "benefit": "₹17,500 per hectare for diversification", | |
| "process": "Contact District Agriculture Officer", | |
| "contact": "" | |
| }) | |
| # Build voice-friendly text (limit first 3) | |
| if eligible_schemes: | |
| schemes_text = f"You are eligible for {len(eligible_schemes)} government schemes: " | |
| for i, scheme in enumerate(eligible_schemes[:3]): | |
| contact_info = f" Apply through {scheme.get('process','Contact local agriculture office')}" | |
| if scheme.get('contact'): | |
| contact_info += f" or contact {scheme.get('contact')}" | |
| schemes_text += f"{i+1}. {scheme.get('scheme','N/A')} - {scheme.get('benefit', scheme.get('description','N/A'))}.{contact_info}. " | |
| if len(eligible_schemes) > 3: | |
| schemes_text += f"And {len(eligible_schemes)-3} more schemes available." | |
| else: | |
| schemes_text = "I couldn't find specific schemes for your profile. Please contact your local agriculture department for personalized advice." | |
| return { | |
| "result": schemes_text, | |
| "eligible_schemes": eligible_schemes | |
| } | |
| except Exception as e: | |
| return { | |
| "result": "I'm having trouble accessing scheme information right now. Please contact your local agriculture officer.", | |
| "error": str(e) | |
| } | |
| async def scheme_eligibility_get( | |
| farmer_category: Optional[str] = Query("", alias="farmer_category"), | |
| land_size: Optional[float] = Query(0.0, alias="land_size"), | |
| state: Optional[str] = Query("", alias="state"), | |
| crop_type: Optional[str] = Query("", alias="crop_type") | |
| ): | |
| try: | |
| eligible_schemes = [] | |
| if not csv_data['government_schemes'].empty: | |
| eligible_schemes = get_schemes_from_csv(farmer_category or "", float(land_size or 0.0), state or "", crop_type or "") | |
| if not eligible_schemes: | |
| if float(land_size or 0.0) > 0: | |
| eligible_schemes.append({ | |
| "scheme": "PM-KISAN", | |
| "benefit": "₹6,000 per year in 3 installments", | |
| "description": "Direct income support to landholding farmer families.", | |
| "eligibility": "All landholding farmer families.", | |
| "process": "Apply via pmkisan.gov.in or your nearest CSC", | |
| "contact": "https://pmkisan.gov.in/" | |
| }) | |
| eligible_schemes.append({ | |
| "scheme": "Pradhan Mantri Fasal Bima Yojana", | |
| "benefit": "Comprehensive crop insurance coverage", | |
| "description": "Crop insurance against natural calamities, pests, and diseases.", | |
| "eligibility": "All farmers in notified crops/areas", | |
| "process": "Contact your nearest bank, CSC or PMFBY portal", | |
| "contact": "https://pmfby.gov.in/" | |
| }) | |
| if state and state.strip().lower() == "punjab": | |
| eligible_schemes.append({ | |
| "scheme": "Punjab Crop Diversification Scheme", | |
| "benefit": "₹17,500 per hectare for diversification", | |
| "process": "Contact District Agriculture Officer", | |
| "contact": "" | |
| }) | |
| # Build text | |
| if eligible_schemes: | |
| schemes_text = f"You are eligible for {len(eligible_schemes)} government schemes: " | |
| for i, scheme in enumerate(eligible_schemes[:3]): | |
| contact_info = f" Apply through {scheme.get('process','Contact local agriculture office')}" | |
| if scheme.get('contact'): | |
| contact_info += f" or contact {scheme.get('contact')}" | |
| schemes_text += f"{i+1}. {scheme.get('scheme','N/A')} - {scheme.get('benefit', scheme.get('description','N/A'))}.{contact_info}. " | |
| if len(eligible_schemes) > 3: | |
| schemes_text += f"And {len(eligible_schemes)-3} more schemes available." | |
| else: | |
| schemes_text = "I couldn't find specific schemes for your profile. Please contact your local agriculture department for personalized advice." | |
| return { | |
| "result": schemes_text, | |
| "eligible_schemes": eligible_schemes | |
| } | |
| except Exception as e: | |
| return {"result": "Error computing schemes", "error": str(e)} | |
| # ------------------------- | |
| # Weather advisory (POST and GET) | |
| # ------------------------- | |
| async def weather_advisory_post(request: Request): | |
| body = await request.json() if (await request.body()) else {} | |
| city = body.get("query", {}).get("location", "").strip() if body else "" | |
| temperature, humidity, description, pressure = get_weather(city) | |
| if temperature is None: | |
| temperature, humidity, description, pressure = 32.0, 60, "Not Available", 1012 | |
| weather_condition = "NORMAL" | |
| else: | |
| desc_lower = (description or "").lower() | |
| if "clear" in desc_lower: | |
| weather_condition = "SUNNY" | |
| elif "rain" in desc_lower: | |
| weather_condition = "RAINY" | |
| elif "wind" in desc_lower: | |
| weather_condition = "WINDY" | |
| else: | |
| weather_condition = "NORMAL" | |
| result = ( | |
| f"Weather in {city or 'your location'}: {description}. " | |
| f"Temperature {temperature}°C, Humidity {humidity}%, Pressure {pressure} hPa. " | |
| f"Condition classified as {weather_condition}." | |
| ) | |
| return { | |
| "success": True, | |
| "result": result, | |
| "data": { | |
| "city": city, | |
| "temperature": temperature, | |
| "humidity": humidity, | |
| "pressure": pressure, | |
| "description": description, | |
| "condition": weather_condition | |
| } | |
| } | |
| async def weather_advisory_get(location: Optional[str] = Query("", alias="location")): | |
| # delegate to same logic above | |
| temperature, humidity, description, pressure = get_weather(location) | |
| if temperature is None: | |
| temperature, humidity, description, pressure = 32.0, 60, "Not Available", 1012 | |
| weather_condition = "NORMAL" | |
| else: | |
| desc_lower = (description or "").lower() | |
| if "clear" in desc_lower: | |
| weather_condition = "SUNNY" | |
| elif "rain" in desc_lower: | |
| weather_condition = "RAINY" | |
| elif "wind" in desc_lower: | |
| weather_condition = "WINDY" | |
| else: | |
| weather_condition = "NORMAL" | |
| result = ( | |
| f"Weather in {location or 'your location'}: {description}. " | |
| f"Temperature {temperature}°C, Humidity {humidity}%, Pressure {pressure} hPa. " | |
| f"Condition classified as {weather_condition}." | |
| ) | |
| return { | |
| "success": True, | |
| "result": result, | |
| "data": { | |
| "city": location, | |
| "temperature": temperature, | |
| "humidity": humidity, | |
| "pressure": pressure, | |
| "description": description, | |
| "condition": weather_condition | |
| } | |
| } | |
| # ------------------------- | |
| # Crop advisory (POST - Retell style; GET - query params) | |
| # ------------------------- | |
| async def crop_advisory_endpoint( | |
| request: Request, | |
| x_retell_signature: Optional[str] = Header(None, alias="X-Retell-Signature") | |
| ): | |
| request_body = await request.body() | |
| if x_retell_signature and not verify_retell_signature(request_body, x_retell_signature): | |
| raise HTTPException(status_code=401, detail="Invalid Retell signature") | |
| try: | |
| payload = json.loads(request_body.decode('utf-8')) if request_body else {} | |
| except Exception: | |
| payload = {} | |
| crop_name = (payload.get("args", {}).get("crop_name", "") or payload.get("crop_name", "") or "").strip() | |
| growth_stage = (payload.get("args", {}).get("growth_stage", "") or payload.get("growth_stage", "") or "").strip() | |
| issue_type = (payload.get("args", {}).get("issue_type", "general") or payload.get("issue_type", "general") or "general").strip().lower() | |
| state = (payload.get("args", {}).get("state", "") or payload.get("state", "") or "").strip() | |
| try: | |
| advisory = None | |
| contact_info = "" | |
| # Search crop_advisory CSV robustly (support various column names) | |
| df = csv_data.get('crop_advisory', pd.DataFrame()) | |
| if not df.empty: | |
| # find likely columns | |
| crop_col = find_column(df, ["crop", "Crop", "Crop Name", "crop_name"]) | |
| sowing_col = find_column(df, ["sowing_time", "Sowing_Time", "Sowing Time", "Sowing"]) | |
| fertilizer_col = find_column(df, ["fertilizer", "Fertilizer"]) | |
| season_col = find_column(df, ["season", "Season"]) | |
| issues_col = find_column(df, ["common_issues", "Common_Issues", "Common Issues", "Common_Issues"]) | |
| solution_col = find_column(df, ["solution", "Solution", "Solution"]) | |
| if crop_col and crop_name: | |
| matches = df[df[crop_col].astype(str).str.contains(crop_name, case=False, na=False)] | |
| elif crop_col: | |
| matches = df.copy() | |
| else: | |
| matches = pd.DataFrame() | |
| if not matches.empty: | |
| crop_info = matches.iloc[0] | |
| parts = [] | |
| if issue_type == "general": | |
| if sowing_col and pd.notna(crop_info.get(sowing_col)): | |
| parts.append(f"Sowing time: {crop_info[sowing_col]}") | |
| if fertilizer_col and pd.notna(crop_info.get(fertilizer_col)): | |
| parts.append(f"Recommended fertilizer: {crop_info[fertilizer_col]}") | |
| if season_col and pd.notna(crop_info.get(season_col)): | |
| parts.append(f"Best season: {crop_info[season_col]}") | |
| if issues_col and solution_col and pd.notna(crop_info.get(issues_col)) and pd.notna(crop_info.get(solution_col)): | |
| if issue_type in ['pest', 'disease'] or issue_type == 'general': | |
| parts.append(f"For {crop_info[issues_col]}: {crop_info[solution_col]}") | |
| if parts: | |
| advisory = f"For {crop_name or crop_info.get(crop_col,'the crop')}: " + ". ".join(parts) | |
| # contact info from contact_info CSV | |
| df_contact = csv_data.get('contact_info', pd.DataFrame()) | |
| if not df_contact.empty and state: | |
| state_col = find_column(df_contact, ["state", "State", "state_name"]) | |
| if state_col: | |
| contact_matches = df_contact[df_contact[state_col].astype(str).str.contains(state, case=False, na=False)] | |
| if not contact_matches.empty: | |
| contact_match = contact_matches.iloc[0] | |
| contact_parts = [] | |
| if 'agriculture_officer' in contact_match and pd.notna(contact_match.get('agriculture_officer')): | |
| contact_parts.append(f"Agriculture Officer at {contact_match['agriculture_officer']}") | |
| if 'kvk_contact' in contact_match and pd.notna(contact_match.get('kvk_contact')): | |
| contact_parts.append(f"KVK at {contact_match['kvk_contact']}") | |
| if 'kisan_call_center' in contact_match and pd.notna(contact_match.get('kisan_call_center')): | |
| contact_parts.append(f"Kisan Call Center at {contact_match['kisan_call_center']}") | |
| if contact_parts: | |
| contact_info = f"For detailed advice in {state}, contact: " + " or ".join(contact_parts) + "." | |
| # fallback advisory if none found | |
| if not advisory: | |
| if crop_name and crop_name.lower() == "wheat" and issue_type == "pest": | |
| advisory = "For wheat pest control: If you see aphids, spray Imidacloprid 200 SL at 0.3ml per liter of water. Spray during evening hours. Avoid over-irrigation." | |
| elif crop_name and crop_name.lower() == "rice" and issue_type == "disease": | |
| advisory = "For rice disease management: If you see brown spots on leaves, it might be blast disease. Apply Tricyclazole 75% WP at 0.6g per liter. Ensure proper drainage." | |
| else: | |
| advisory = f"For {crop_name or 'the crop'} at {growth_stage or 'current'} stage: Monitor crop regularly, maintain proper spacing, apply fertilizers as per soil test recommendations." | |
| if not contact_info: | |
| contact_info = f"For detailed advice, contact your local Krishi Vigyan Kendra or Agriculture Officer in {state or 'your state'}. You can also call the Kisan Call Centre at 1800-1801-551." | |
| result_text = f"{advisory} {contact_info}" | |
| return { | |
| "result": result_text, | |
| "recommendations": advisory, | |
| "contact_info": contact_info | |
| } | |
| except Exception as e: | |
| return { | |
| "result": f"I couldn't provide specific advice for {crop_name} right now. Please contact your local agriculture extension officer for crop-specific guidance.", | |
| "error": str(e) | |
| } | |
| async def crop_advisory_get( | |
| crop_name: Optional[str] = Query("", alias="crop_name"), | |
| growth_stage: Optional[str] = Query("", alias="growth_stage"), | |
| issue_type: Optional[str] = Query("general", alias="issue_type"), | |
| state: Optional[str] = Query("", alias="state") | |
| ): | |
| try: | |
| crop_name = (crop_name or "").strip() | |
| growth_stage = (growth_stage or "").strip() | |
| issue_type = (issue_type or "general").strip().lower() | |
| state = (state or "").strip() | |
| advisory = None | |
| contact_info = "" | |
| df = csv_data.get('crop_advisory', pd.DataFrame()) | |
| if not df.empty: | |
| crop_col = find_column(df, ["crop", "Crop", "Crop Name", "crop_name"]) | |
| sowing_col = find_column(df, ["sowing_time", "Sowing_Time", "Sowing Time", "Sowing"]) | |
| fertilizer_col = find_column(df, ["fertilizer", "Fertilizer"]) | |
| season_col = find_column(df, ["season", "Season"]) | |
| issues_col = find_column(df, ["common_issues", "Common_Issues", "Common Issues"]) | |
| solution_col = find_column(df, ["solution", "Solution", "Solution"]) | |
| if crop_col and crop_name: | |
| matches = df[df[crop_col].astype(str).str.contains(crop_name, case=False, na=False)] | |
| elif crop_col: | |
| matches = df.copy() | |
| else: | |
| matches = pd.DataFrame() | |
| if not matches.empty: | |
| crop_info = matches.iloc[0] | |
| parts = [] | |
| if issue_type == "general": | |
| if sowing_col and pd.notna(crop_info.get(sowing_col)): | |
| parts.append(f"Sowing time: {crop_info[sowing_col]}") | |
| if fertilizer_col and pd.notna(crop_info.get(fertilizer_col)): | |
| parts.append(f"Recommended fertilizer: {crop_info[fertilizer_col]}") | |
| if season_col and pd.notna(crop_info.get(season_col)): | |
| parts.append(f"Best season: {crop_info[season_col]}") | |
| if issues_col and solution_col and pd.notna(crop_info.get(issues_col)) and pd.notna(crop_info.get(solution_col)): | |
| if issue_type in ['pest', 'disease'] or issue_type == 'general': | |
| parts.append(f"For {crop_info[issues_col]}: {crop_info[solution_col]}") | |
| if parts: | |
| advisory = f"For {crop_name or crop_info.get(crop_col,'the crop')}: " + ". ".join(parts) | |
| # contact info | |
| df_contact = csv_data.get('contact_info', pd.DataFrame()) | |
| if not df_contact.empty and state: | |
| state_col = find_column(df_contact, ["state", "State", "state_name"]) | |
| if state_col: | |
| contact_matches = df_contact[df_contact[state_col].astype(str).str.contains(state, case=False, na=False)] | |
| if not contact_matches.empty: | |
| contact_match = contact_matches.iloc[0] | |
| contact_parts = [] | |
| if 'agriculture_officer' in contact_match and pd.notna(contact_match.get('agriculture_officer')): | |
| contact_parts.append(f"Agriculture Officer at {contact_match['agriculture_officer']}") | |
| if 'kvk_contact' in contact_match and pd.notna(contact_match.get('kvk_contact')): | |
| contact_parts.append(f"KVK at {contact_match['kvk_contact']}") | |
| if 'kisan_call_center' in contact_match and pd.notna(contact_match.get('kisan_call_center')): | |
| contact_parts.append(f"Kisan Call Center at {contact_match['kisan_call_center']}") | |
| if contact_parts: | |
| contact_info = f"For detailed advice in {state}, contact: " + " or ".join(contact_parts) + "." | |
| if not advisory: | |
| if crop_name and crop_name.lower() == "wheat" and issue_type == "pest": | |
| advisory = "For wheat pest control: If you see aphids, spray Imidacloprid 200 SL at 0.3ml per liter of water. Spray during evening hours. Avoid over-irrigation." | |
| elif crop_name and crop_name.lower() == "rice" and issue_type == "disease": | |
| advisory = "For rice disease management: If you see brown spots on leaves, it might be blast disease. Apply Tricyclazole 75% WP at 0.6g per liter. Ensure proper drainage." | |
| else: | |
| advisory = f"For {crop_name or 'the crop'} at {growth_stage or 'current'} stage: Monitor crop regularly, maintain proper spacing, apply fertilizers as per soil test recommendations." | |
| if not contact_info: | |
| contact_info = f"For detailed advice, contact your local Krishi Vigyan Kendra or Agriculture Officer in {state or 'your state'}. You can also call the Kisan Call Centre at 1800-1801-551." | |
| result_text = f"{advisory} {contact_info}" | |
| return { | |
| "result": result_text, | |
| "recommendations": advisory, | |
| "contact_info": contact_info | |
| } | |
| except Exception as e: | |
| return { | |
| "result": "I couldn't provide specific advice right now. Please contact your local agriculture extension officer.", | |
| "error": str(e) | |
| } | |
| # ------------------------- | |
| # Run server (for local dev) | |
| # ------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860))) |