sikeaditya commited on
Commit
f776967
·
verified ·
1 Parent(s): 0456cfe

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +124 -164
app.py CHANGED
@@ -7,10 +7,8 @@ import pandas as pd
7
  import plotly.express as px
8
  import plotly.io as pio
9
  import dotenv
10
- import threading
11
- import tempfile
12
- import shutil
13
  import numpy as np
 
14
 
15
  dotenv.load_dotenv()
16
 
@@ -22,9 +20,10 @@ def clean_and_standardize(df):
22
  df = df.copy()
23
  df.columns = df.columns.str.replace('_x0020_', '_', regex=False)
24
  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False)
 
25
  required_columns = [
26
  'state', 'district', 'market', 'commodity', 'variety',
27
- 'grade', 'arrival_date', 'min_price', 'max_price', 'modal_price'
28
  ]
29
  existing_columns = [col for col in required_columns if col in df.columns]
30
  return df[existing_columns]
@@ -70,156 +69,142 @@ def load_hierarchy_from_json(path='location_hierarchy.json'):
70
  print(f"CRITICAL ERROR: Could not load '{path}'. Error: {e}")
71
  return {}
72
 
73
- def fetch_market_data(state=None, district=None,
74
- cache_path='agmarknet_cache.csv',
75
- use_cache=True, force_refresh=False,
76
- sleep_between=0.15, page_size=1000,
77
- synchronous=True):
 
78
  """
79
- Single-request fetcher (the API returns the full dataset in one response).
80
  Returns a cleaned DataFrame with duplicate columns consolidated and arrival_date normalized.
81
  """
82
  api_key = os.environ.get('DATA_GOV_API_KEY',
83
  "579b464db66ec23bdd00000140925613394847c57ae13db180760f06")
84
  base_url = "https://api.data.gov.in/resource/35985678-0d79-46b4-9ed6-6f13308a1d24"
85
 
86
- # Use cache if present and not forcing refresh
87
- if use_cache and not force_refresh and os.path.exists(cache_path):
88
- try:
89
- df_cache = pd.read_csv(cache_path)
90
- print(f"[fetch_market_data] Loaded cache '{cache_path}' ({len(df_cache)} rows).")
91
- dataframes_to_combine = [df_cache]
92
- try:
93
- df_csv = pd.read_csv("final_price_data.csv")
94
- dataframes_to_combine.append(df_csv)
95
- except FileNotFoundError:
96
- pass
97
- df_combined = pd.concat(dataframes_to_combine, ignore_index=True, sort=False)
98
- # first, consolidate duplicate columns (if any)
99
- df_combined = consolidate_duplicate_columns(df_combined)
100
- cleaned = clean_and_standardize(df_combined)
101
- if 'arrival_date' in cleaned.columns:
102
- cleaned = cleaned.copy()
103
- cleaned.loc[:, 'arrival_date'] = pd.to_datetime(
104
- cleaned['arrival_date'].astype(str).str.replace('\\/', '-', regex=True),
105
- dayfirst=True, errors='coerce'
106
- )
107
- return cleaned
108
- except Exception as e:
109
- print(f"[fetch_market_data] Failed reading cache: {e}. Will fetch live.")
110
-
111
- # Background start support
112
- if not synchronous:
113
- t = threading.Thread(target=fetch_market_data, kwargs={
114
- 'state': state, 'district': district, 'cache_path': cache_path,
115
- 'use_cache': use_cache, 'force_refresh': force_refresh,
116
- 'sleep_between': sleep_between, 'page_size': page_size, 'synchronous': True
117
- }, daemon=True)
118
- t.start()
119
- print("[fetch_market_data] Started background fetcher thread (single-request mode).")
120
- return None
121
-
122
- # Build params for single request
123
  params = {
124
  "api-key": api_key,
125
- "format": "json"
 
 
126
  }
 
127
  if state:
128
  params["filters[State]"] = state
129
  if district:
130
  params["filters[District]"] = district
131
 
132
- temp_fd, temp_file = tempfile.mkstemp(suffix='.csv')
133
- os.close(temp_fd)
134
  try:
 
 
 
 
 
135
  try:
136
- print(f"[fetch_market_data] Sending single request to API (may be large). Params: { {k:v for k,v in params.items() if k!='api-key'} }")
137
- resp = requests.get(base_url, params=params, timeout=180)
138
- except Exception as e:
139
- print(f"[fetch_market_data] Network error on single request: {e}")
140
- # fallback to local CSV if present
141
- try:
142
- df_csv = pd.read_csv("final_price_data.csv")
143
- df_csv = consolidate_duplicate_columns(df_csv)
144
- return clean_and_standardize(df_csv)
145
- except FileNotFoundError:
146
- return pd.DataFrame()
147
-
148
- if resp.status_code != 200:
149
- print(f"[fetch_market_data] API returned {resp.status_code}: {resp.text[:500]}")
150
- try:
151
- df_csv = pd.read_csv("final_price_data.csv")
152
- df_csv = consolidate_duplicate_columns(df_csv)
153
- return clean_and_standardize(df_csv)
154
- except FileNotFoundError:
155
- return pd.DataFrame()
156
-
157
  try:
158
- data = resp.json()
159
- except Exception as e:
160
- print(f"[fetch_market_data] JSON decode error: {e}")
161
- try:
162
- df_csv = pd.read_csv("final_price_data.csv")
163
- df_csv = consolidate_duplicate_columns(df_csv)
164
- return clean_and_standardize(df_csv)
165
- except FileNotFoundError:
166
- return pd.DataFrame()
167
-
168
- records = data.get("records", [])
169
- if not records and isinstance(data, list):
170
- records = data
171
-
172
- if not records:
173
- print("[fetch_market_data] No records returned by API in single response.")
174
- try:
175
- df_csv = pd.read_csv("final_price_data.csv")
176
- df_csv = consolidate_duplicate_columns(df_csv)
177
- return clean_and_standardize(df_csv)
178
- except FileNotFoundError:
179
- return pd.DataFrame()
180
-
181
- df_api = pd.DataFrame.from_records(records)
182
- # Consolidate duplicate columns immediately
183
- df_api = consolidate_duplicate_columns(df_api)
184
 
185
- # write cache atomically
 
 
 
186
  try:
187
- df_api.to_csv(temp_file, index=False)
188
- shutil.move(temp_file, cache_path)
189
- print(f"[fetch_market_data] Single-request cache updated at '{cache_path}' ({len(df_api)} rows).")
190
- except Exception as e:
191
- print(f"[fetch_market_data] Failed to write cache atomically: {e}")
192
- try:
193
- df_api.to_csv(cache_path, index=False)
194
- except Exception as e2:
195
- print(f"[fetch_market_data] Fallback write also failed: {e2}")
196
-
197
- # Merge with final_price_data.csv if exists
198
- dataframes_to_combine = [df_api]
 
 
 
 
 
 
 
199
  try:
200
- df_csv = pd.read_csv("final_price_data.csv")
201
- df_csv = consolidate_duplicate_columns(df_csv)
202
- dataframes_to_combine.append(df_csv)
203
- except FileNotFoundError:
204
- pass
205
-
206
- df_combined = pd.concat(dataframes_to_combine, ignore_index=True, sort=False)
207
- df_combined = consolidate_duplicate_columns(df_combined)
208
- cleaned = clean_and_standardize(df_combined)
209
- if 'arrival_date' in cleaned.columns:
210
- cleaned = cleaned.copy()
211
- cleaned.loc[:, 'arrival_date'] = pd.to_datetime(
212
- cleaned['arrival_date'].astype(str).str.replace('\\/', '-', regex=True),
213
- dayfirst=True, errors='coerce'
214
- )
215
- return cleaned
216
-
217
- finally:
218
- if os.path.exists(temp_file):
219
- try:
220
- os.remove(temp_file)
221
- except Exception:
222
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
 
224
  # Utility to flatten/clean numeric-like columns safely
225
  def flatten_column(df, col):
@@ -317,7 +302,7 @@ def get_ai_insights(market_data, state, district, language="English"):
317
  top_commodities_str = ", ".join([f"{name} (Avg: ₹{price:.2f})" for name, price in top_commodities.items()])
318
  prompt = f'Analyze agricultural market data for {district}, {state}. Top commodities: {top_commodities_str}. Provide a JSON object with keys "crop_profitability", "market_analysis", "farmer_recommendations", each with an array of insights in {language}.'
319
  try:
320
- api_url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent"
321
  headers = {"Content-Type": "application/json"}
322
  payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"responseMimeType": "application/json"}}
323
  response = requests.post(f"{api_url}?key={api_key}", headers=headers, json=payload, timeout=25)
@@ -375,33 +360,6 @@ LOCATION_HIERARCHY = load_hierarchy_from_json()
375
  print("Location hierarchy loaded.")
376
 
377
  # --- Flask Routes ---
378
- @app.route('/refresh_cache', methods=['POST'])
379
- def refresh_cache():
380
- state = request.form.get('state')
381
- district = request.form.get('district')
382
-
383
- def _bg():
384
- try:
385
- fetch_market_data(state=state, district=district, cache_path='agmarknet_cache.csv',
386
- use_cache=False, force_refresh=True, page_size=1000, synchronous=True)
387
- print("[refresh_cache] Background refresh finished.")
388
- except Exception as e:
389
- print(f"[refresh_cache] Background refresh failed: {e}")
390
-
391
- t = threading.Thread(target=_bg, daemon=True)
392
- t.start()
393
- return jsonify({'success': True, 'message': 'Background cache refresh started.'})
394
-
395
- @app.route('/download_full_sync', methods=['POST'])
396
- def download_full_sync():
397
- state = request.form.get('state')
398
- district = request.form.get('district')
399
- df = fetch_market_data(state=state, district=district, cache_path='agmarknet_cache.csv',
400
- use_cache=False, force_refresh=True, page_size=1000, synchronous=True)
401
- if df is None or df.empty:
402
- return jsonify({'success': False, 'message': 'Download produced no data.'})
403
- return jsonify({'success': True, 'message': f'Download complete. Cached {len(df)} rows.'})
404
-
405
  @app.route('/')
406
  def index():
407
  states = sorted(list(LOCATION_HIERARCHY.keys()))
@@ -439,18 +397,20 @@ def filter_data():
439
  if not state:
440
  return jsonify({'success': False, 'message': 'Please select a state.'})
441
 
442
- df_combined = fetch_market_data(state, district)
 
443
  if df_combined is None or df_combined.empty:
444
  return jsonify({'success': False, 'message': 'No data found from API or local CSV.'})
445
 
446
  # Defensive copy
447
  df_filtered = df_combined.copy()
448
 
 
449
  if state:
450
  df_filtered = df_filtered[df_filtered['state'].str.lower() == state.lower()]
451
  if district:
452
  df_filtered = df_filtered[df_filtered['district'].str.lower() == district.lower()]
453
- if market:
454
  df_filtered = df_filtered[df_filtered['market'].str.lower() == market.lower()]
455
  if commodity:
456
  df_filtered = df_filtered[df_filtered['commodity'].str.lower() == commodity.lower()]
@@ -498,4 +458,4 @@ def filter_data():
498
 
499
  if __name__ == '__main__':
500
  pio.templates.default = "plotly_white"
501
- app.run(debug=True, host='0.0.0.0', port=7860)
 
7
  import plotly.express as px
8
  import plotly.io as pio
9
  import dotenv
 
 
 
10
  import numpy as np
11
+ from datetime import datetime, timedelta
12
 
13
  dotenv.load_dotenv()
14
 
 
20
  df = df.copy()
21
  df.columns = df.columns.str.replace('_x0020_', '_', regex=False)
22
  df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_', regex=False)
23
+ df = df.drop(columns=['grade'])
24
  required_columns = [
25
  'state', 'district', 'market', 'commodity', 'variety',
26
+ 'arrival_date', 'min_price', 'max_price', 'modal_price'
27
  ]
28
  existing_columns = [col for col in required_columns if col in df.columns]
29
  return df[existing_columns]
 
69
  print(f"CRITICAL ERROR: Could not load '{path}'. Error: {e}")
70
  return {}
71
 
72
+ def get_last_year_date():
73
+ """Get date from one year ago in dd/MM/yyyy format"""
74
+ last_year = datetime.now() - timedelta(days=365)
75
+ return last_year.strftime("%d/%m/%Y")
76
+
77
+ def fetch_market_data(state=None, district=None, market=None):
78
  """
79
+ Fetcher to use new API endpoint with arrival date filter and market filtering.
80
  Returns a cleaned DataFrame with duplicate columns consolidated and arrival_date normalized.
81
  """
82
  api_key = os.environ.get('DATA_GOV_API_KEY',
83
  "579b464db66ec23bdd00000140925613394847c57ae13db180760f06")
84
  base_url = "https://api.data.gov.in/resource/35985678-0d79-46b4-9ed6-6f13308a1d24"
85
 
86
+ # Build params for API request with arrival date from last year
87
+ arrival_date = get_last_year_date()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  params = {
89
  "api-key": api_key,
90
+ "format": "json",
91
+ "limit": 1000,
92
+ "filters[Arrival_Date]": arrival_date
93
  }
94
+
95
  if state:
96
  params["filters[State]"] = state
97
  if district:
98
  params["filters[District]"] = district
99
 
 
 
100
  try:
101
+ print(f"[fetch_market_data] Sending request to API with arrival date: {arrival_date}. Params: {params}")
102
+ resp = requests.get(base_url, params=params, timeout=180)
103
+ except Exception as e:
104
+ print(f"[fetch_market_data] Network error on request: {e}")
105
+ # fallback to local CSV if present
106
  try:
107
+ if os.path.exists("final_price_data.csv") and os.path.getsize("final_price_data.csv") > 0:
108
+ df_csv = pd.read_csv("final_price_data.csv", encoding='utf-8', on_bad_lines='skip')
109
+ if not df_csv.empty:
110
+ df_csv = consolidate_duplicate_columns(df_csv)
111
+ cleaned = clean_and_standardize(df_csv)
112
+ if market and 'market' in cleaned.columns:
113
+ cleaned = cleaned[cleaned['market'].str.lower() == market.lower()]
114
+ return cleaned
115
+ except Exception as csv_err:
116
+ print(f"[fetch_market_data] Could not load final_price_data.csv: {csv_err}")
117
+ return pd.DataFrame()
118
+
119
+ if resp.status_code != 200:
120
+ print(f"[fetch_market_data] API returned {resp.status_code}: {resp.text[:500]}")
 
 
 
 
 
 
 
121
  try:
122
+ if os.path.exists("final_price_data.csv") and os.path.getsize("final_price_data.csv") > 0:
123
+ df_csv = pd.read_csv("final_price_data.csv", encoding='utf-8', on_bad_lines='skip')
124
+ if not df_csv.empty:
125
+ df_csv = consolidate_duplicate_columns(df_csv)
126
+ cleaned = clean_and_standardize(df_csv)
127
+ if market and 'market' in cleaned.columns:
128
+ cleaned = cleaned[cleaned['market'].str.lower() == market.lower()]
129
+ return cleaned
130
+ except Exception as csv_err:
131
+ print(f"[fetch_market_data] Could not load final_price_data.csv: {csv_err}")
132
+ return pd.DataFrame()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
+ try:
135
+ data = resp.json()
136
+ except Exception as e:
137
+ print(f"[fetch_market_data] JSON decode error: {e}")
138
  try:
139
+ if os.path.exists("final_price_data.csv") and os.path.getsize("final_price_data.csv") > 0:
140
+ df_csv = pd.read_csv("final_price_data.csv", encoding='utf-8', on_bad_lines='skip')
141
+ if not df_csv.empty:
142
+ df_csv = consolidate_duplicate_columns(df_csv)
143
+ cleaned = clean_and_standardize(df_csv)
144
+ if market and 'market' in cleaned.columns:
145
+ cleaned = cleaned[cleaned['market'].str.lower() == market.lower()]
146
+ return cleaned
147
+ except Exception as csv_err:
148
+ print(f"[fetch_market_data] Could not load final_price_data.csv: {csv_err}")
149
+ return pd.DataFrame()
150
+
151
+ # Parse the new API response format
152
+ records = data.get("records", [])
153
+ if not records and isinstance(data, list):
154
+ records = data
155
+
156
+ if not records:
157
+ print("[fetch_market_data] No records returned by API in response.")
158
  try:
159
+ if os.path.exists("final_price_data.csv") and os.path.getsize("final_price_data.csv") > 0:
160
+ df_csv = pd.read_csv("final_price_data.csv", encoding='utf-8', on_bad_lines='skip')
161
+ if not df_csv.empty:
162
+ df_csv = consolidate_duplicate_columns(df_csv)
163
+ cleaned = clean_and_standardize(df_csv)
164
+ if market and 'market' in cleaned.columns:
165
+ cleaned = cleaned[cleaned['market'].str.lower() == market.lower()]
166
+ return cleaned
167
+ except Exception as csv_err:
168
+ print(f"[fetch_market_data] Could not load final_price_data.csv: {csv_err}")
169
+ return pd.DataFrame()
170
+
171
+ df_api = pd.DataFrame.from_records(records)
172
+
173
+ # Filter by market if specified (from the API response)
174
+ if market and 'Market' in df_api.columns:
175
+ df_api = df_api[df_api['Market'].str.lower() == market.lower()]
176
+
177
+ # Consolidate duplicate columns immediately
178
+ df_api = consolidate_duplicate_columns(df_api)
179
+
180
+ print(f"[fetch_market_data] Retrieved {len(df_api)} rows from API.")
181
+
182
+ # Merge with final_price_data.csv if exists
183
+ dataframes_to_combine = [df_api]
184
+ try:
185
+ if os.path.exists("final_price_data.csv") and os.path.getsize("final_price_data.csv") > 0:
186
+ df_csv = pd.read_csv("final_price_data.csv", encoding='utf-8', on_bad_lines='skip')
187
+ if not df_csv.empty:
188
+ df_csv = consolidate_duplicate_columns(df_csv)
189
+ dataframes_to_combine.append(df_csv)
190
+ except Exception as csv_err:
191
+ print(f"[fetch_market_data] Could not load final_price_data.csv for merging: {csv_err}")
192
+
193
+ df_combined = pd.concat(dataframes_to_combine, ignore_index=True, sort=False)
194
+ df_combined = consolidate_duplicate_columns(df_combined)
195
+ cleaned = clean_and_standardize(df_combined)
196
+ if 'arrival_date' in cleaned.columns:
197
+ cleaned = cleaned.copy()
198
+ cleaned.loc[:, 'arrival_date'] = pd.to_datetime(
199
+ cleaned['arrival_date'].astype(str).str.replace('\\/', '-', regex=True),
200
+ dayfirst=True, errors='coerce'
201
+ )
202
+
203
+ # Additional market filtering after standardization
204
+ if market and 'market' in cleaned.columns:
205
+ cleaned = cleaned[cleaned['market'].str.lower() == market.lower()]
206
+
207
+ return cleaned
208
 
209
  # Utility to flatten/clean numeric-like columns safely
210
  def flatten_column(df, col):
 
302
  top_commodities_str = ", ".join([f"{name} (Avg: ₹{price:.2f})" for name, price in top_commodities.items()])
303
  prompt = f'Analyze agricultural market data for {district}, {state}. Top commodities: {top_commodities_str}. Provide a JSON object with keys "crop_profitability", "market_analysis", "farmer_recommendations", each with an array of insights in {language}.'
304
  try:
305
+ api_url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent"
306
  headers = {"Content-Type": "application/json"}
307
  payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"responseMimeType": "application/json"}}
308
  response = requests.post(f"{api_url}?key={api_key}", headers=headers, json=payload, timeout=25)
 
360
  print("Location hierarchy loaded.")
361
 
362
  # --- Flask Routes ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
363
  @app.route('/')
364
  def index():
365
  states = sorted(list(LOCATION_HIERARCHY.keys()))
 
397
  if not state:
398
  return jsonify({'success': False, 'message': 'Please select a state.'})
399
 
400
+ # Pass market parameter to fetch_market_data for API filtering
401
+ df_combined = fetch_market_data(state, district, market)
402
  if df_combined is None or df_combined.empty:
403
  return jsonify({'success': False, 'message': 'No data found from API or local CSV.'})
404
 
405
  # Defensive copy
406
  df_filtered = df_combined.copy()
407
 
408
+ # Additional frontend filtering (in case not filtered by API)
409
  if state:
410
  df_filtered = df_filtered[df_filtered['state'].str.lower() == state.lower()]
411
  if district:
412
  df_filtered = df_filtered[df_filtered['district'].str.lower() == district.lower()]
413
+ if market and 'market' in df_filtered.columns:
414
  df_filtered = df_filtered[df_filtered['market'].str.lower() == market.lower()]
415
  if commodity:
416
  df_filtered = df_filtered[df_filtered['commodity'].str.lower() == commodity.lower()]
 
458
 
459
  if __name__ == '__main__':
460
  pio.templates.default = "plotly_white"
461
+ app.run(debug=True, host='0.0.0.0', port=7860)