# app.py import json import re import tempfile from datetime import datetime, timedelta from dateutil import tz import gradio as gr import pandas as pd import matplotlib.pyplot as plt import numpy as np import matplotlib.dates as mdates import folium from matplotlib import cm import branca.colormap as bcm from grafanalib.core import ( Dashboard, Graph, Row, Target, YAxis, YAxes, Time ) TAIPEI = tz.gettz("Asia/Taipei") # ----------------------------- # Google Drive 連結處理 # ----------------------------- DRIVE_PRESETS = [ "https://drive.google.com/file/d/15yZ4QicICKZCnX6vjcD9JNXjnJmMFJD4/view?usp=drivesdk", "https://drive.google.com/file/d/1dqazYh_YzNNMbkUpgLRKSE9Y3ioPhtFu/view?usp=drivesdk", "https://drive.google.com/file/d/1A23f4q8DXHpoRIN5UQsDd6eM8jJ_Ruf8/view?usp=drivesdk", ] def normalize_drive_url(url: str) -> str: """ 接受 Google Drive / Google Sheets 各式分享連結,回傳可直接給 pandas 讀取 CSV 的 URL。 - Sheets: .../spreadsheets/d//edit → .../export?format=csv - Drive File: .../file/d//view → https://drive.google.com/uc?export=download&id= """ if not isinstance(url, str) or not url.strip(): raise ValueError("請提供有效的 Google 連結") url = url.strip() # Sheets m = re.search(r"https://docs\.google\.com/spreadsheets/d/([a-zA-Z0-9-_]+)", url) if m: sheet_id = m.group(1) return f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" # Drive file m = re.search(r"https://drive\.google\.com/file/d/([a-zA-Z0-9-_]+)/", url) if m: file_id = m.group(1) return f"https://drive.google.com/uc?export=download&id={file_id}" return url # ----------------------------- # Demo / Data loading # ----------------------------- def make_demo_dataframe() -> pd.DataFrame: """隨機示範資料:含經緯度 + pid""" t0 = datetime.now(tz=TAIPEI) - timedelta(minutes=60) times = [t0 + timedelta(minutes=i) for i in range(61)] amp = np.random.rand(len(times)) cnt = np.random.randint(0, 11, size=len(times)) lats = np.random.uniform(21.8, 25.3, size=len(times)) lons = np.random.uniform(120.0, 122.0, size=len(times)) df = pd.DataFrame({ "time": times, "amplitude": amp, "count": cnt, "lat": lats, "lon": lons }) df["pid"] = np.arange(len(df)) return df def _finalize_time(df: pd.DataFrame) -> pd.DataFrame: """確保 time 欄位有時區、排序""" time_col = next((c for c in ["time", "timestamp", "datetime", "date"] if c in df.columns), None) if time_col is None: raise ValueError("資料需包含時間欄位(time/timestamp/datetime/date 其一)") df[time_col] = pd.to_datetime(df[time_col]) df = df.rename(columns={time_col: "time"}) if getattr(df["time"].dt, "tz", None) is None: df["time"] = df["time"].dt.tz_localize(TAIPEI) else: df["time"] = df["time"].dt.tz_convert(TAIPEI) return df.sort_values("time").reset_index(drop=True) def load_csv(file: gr.File | None) -> pd.DataFrame: """讀上傳 CSV""" df = pd.read_csv(file.name) df = _finalize_time(df) # 若無 lat/lon,補隨機(避免地圖空白) if "lat" not in df.columns or "lon" not in df.columns: n = len(df) df["lat"] = np.random.uniform(21.8, 25.3, size=n) df["lon"] = np.random.uniform(120.0, 122.0, size=n) if "pid" not in df.columns: df["pid"] = np.arange(len(df)) return df def load_drive_csv(sheet_or_file_url: str) -> pd.DataFrame: """從 Google Sheets 或 Google Drive File 讀 CSV""" url = normalize_drive_url(sheet_or_file_url) df = pd.read_csv(url) df = _finalize_time(df) if "lat" not in df.columns or "lon" not in df.columns: n = len(df) df["lat"] = np.random.uniform(21.8, 25.3, size=n) df["lon"] = np.random.uniform(120.0, 122.0, size=n) if "pid" not in df.columns: df["pid"] = np.arange(len(df)) return df def load_data(source: str, file: gr.File | None = None, sheet_url: str = "") -> pd.DataFrame: """依來源載入資料:demo / upload / drive""" if source == "drive": if not sheet_url: raise ValueError("請選擇 Google 連結") return load_drive_csv(sheet_url) elif source == "upload": if file is None: raise ValueError("請上傳 CSV 檔") return load_csv(file) else: return make_demo_dataframe() # ----------------------------- # grafanalib JSON builder # ----------------------------- def build_grafanalib_dashboard(series_columns: list[str], dual_axis: bool, rolling_window: int) -> dict: panels = [] panels.append( Graph( title=f"{series_columns[0]}", dataSource="(example)", targets=[Target(expr=f"{series_columns[0]}", legendFormat=series_columns[0])], lines=True, bars=False, points=False, yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), ) ) if len(series_columns) > 1: targets = [Target(expr=f"{series_columns[1]}", legendFormat=series_columns[1])] lines, bars, title = False, True, f"{series_columns[1]} (bar)" if dual_axis: targets.append(Target(expr=f"{series_columns[0]}", legendFormat=f"{series_columns[0]} (line)")) lines, bars = True, True title = f"{series_columns[1]} (bar) + {series_columns[0]} (line)" panels.append( Graph( title=title, dataSource="(example)", targets=targets, lines=lines, bars=bars, points=False, yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), ) ) panels.append( Graph( title=f"{series_columns[0]} rolling({rolling_window})", dataSource="(example)", targets=[Target(expr=f"{series_columns[0]}_rolling{rolling_window}", legendFormat=f"{series_columns[0]}_rolling{rolling_window}")], lines=True, bars=False, points=False, yAxes=YAxes(left=YAxis(format="short"), right=YAxis(format="short")), ) ) return Dashboard( title="Grafana-like Demo (grafanalib + Gradio)", rows=[Row(panels=panels)], timezone="browser", time=Time("now-1h", "now"), ).to_json_data() # ----------------------------- # Matplotlib helpers # ----------------------------- def _style_time_axis(ax): locator = mdates.AutoDateLocator(minticks=3, maxticks=6) formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) ax.tick_params(axis="x", labelrotation=20, labelsize=9) ax.tick_params(axis="y", labelsize=9) ax.grid(True, which="major", alpha=0.25) plt.margins(x=0.02, y=0.05) def _normalize_times(series: pd.Series) -> pd.Series: s = series.copy() if getattr(s.dt, "tz", None) is not None: s = s.dt.tz_convert("UTC").dt.tz_localize(None) return s def render_line(df, col): times = _normalize_times(df["time"]) fig, ax = plt.subplots(figsize=(6.5, 3.6)) ax.plot(times, df[col], linewidth=1.6) ax.set_title(col, fontsize=12, pad=8) ax.set_xlabel("Time") ax.set_ylabel(col) _style_time_axis(ax) fig.tight_layout() return fig def render_bar_or_dual(df, second_col, first_col, dual_axis): times = _normalize_times(df["time"]) x = mdates.date2num(times.dt.to_pydatetime().tolist()) fig, ax = plt.subplots(figsize=(6.5, 3.6)) width = max(10, (times.astype("int64").diff().median() or 60) / 1e9 * 0.8) / 86400 ax.bar(x, df[second_col], width=width, align="center", label=second_col) title = f"{second_col} (bar)" if dual_axis: ax2 = ax.twinx() ax2.plot(times, df[first_col], linewidth=1.6, label=f"{first_col} (line)") title = f"{second_col} (bar) + {first_col} (line)" h1, l1 = ax.get_legend_handles_labels() h2, l2 = ax2.get_legend_handles_labels() ax.legend(h1 + h2, l1 + l2, loc="upper left") else: ax.legend(loc="upper left") ax.set_title(title, fontsize=12, pad=8) _style_time_axis(ax) fig.tight_layout() return fig def render_rolling(df, col, window=5): times = _normalize_times(df["time"]) roll_col = f"{col}_rolling{window}" if roll_col not in df.columns: df[roll_col] = df[col].rolling(window=window, min_periods=1).mean() fig, ax = plt.subplots(figsize=(6.5, 3.6)) ax.plot(times, df[roll_col], linewidth=1.6) ax.set_title(f"{col} rolling({window})", fontsize=12, pad=8) ax.set_xlabel("Time") ax.set_ylabel(roll_col) _style_time_axis(ax) fig.tight_layout() return fig, df # ----------------------------- # Folium helpers (map + legend) # ----------------------------- def _to_hex_color(value: float, cmap=cm.viridis) -> str: rgba = cmap(value) return "#{:02x}{:02x}{:02x}".format(int(rgba[0]*255), int(rgba[1]*255), int(rgba[2]*255)) def render_map_folium( df: pd.DataFrame, value_col: str = "amplitude", size_col: str = "count", cmap_name: str = "viridis", tiles: str = "OpenStreetMap", ) -> str: center_lat, center_lon = df["lat"].mean(), df["lon"].mean() m = folium.Map(location=[center_lat, center_lon], zoom_start=7, tiles=tiles) vmin, vmax = df[value_col].min(), df[value_col].max() cmap = getattr(cm, cmap_name) colormap = bcm.LinearColormap( [_to_hex_color(i, cmap) for i in np.linspace(0, 1, 256)], vmin=vmin, vmax=vmax ) colormap.caption = f"{value_col} (color scale)" colormap.add_to(m) for _, row in df.iterrows(): norm_val = (row[value_col] - vmin) / (vmax - vmin + 1e-9) popup_html = ( f"#ID: {int(row['pid'])}
" f"time: {pd.to_datetime(row['time']).strftime('%Y-%m-%d %H:%M:%S')}
" f"{value_col}: {row[value_col]:.4f}
" f"{size_col}: {row[size_col]}
" f"lat/lon: {row['lat']:.5f}, {row['lon']:.5f}" ) folium.CircleMarker( location=[row["lat"], row["lon"]], radius=row[size_col] + 3, color="black", weight=1, fill=True, fill_opacity=0.7, fill_color=_to_hex_color(norm_val, cmap), popup=folium.Popup(popup_html, max_width=300), ).add_to(m) return m._repr_html_() # ----------------------------- # Detail helpers # ----------------------------- def make_point_choices(df: pd.DataFrame) -> list[str]: labels = [] for _, r in df.iterrows(): t = pd.to_datetime(r["time"]).strftime("%H:%M:%S") labels.append(f"#{int(r['pid'])} | {t} | amp={r['amplitude']:.3f} cnt={int(r['count'])}") return labels def pick_detail(df: pd.DataFrame, choice: str) -> pd.DataFrame: if not choice: return pd.DataFrame() try: pid_str = choice.split("|")[0].strip().lstrip("#") pid = int(pid_str) row = df[df["pid"] == pid] return row.reset_index(drop=True) except Exception: return pd.DataFrame() # ----------------------------- # Main pipeline # ----------------------------- def pipeline(source, file, sheet_url, series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice): df = load_data(source, file, sheet_url) numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])] chosen = [c for c in (series_choice or numeric_cols[:2]) if c in numeric_cols] if not chosen: chosen = numeric_cols[:2] dash_json = build_grafanalib_dashboard(chosen, bool(dual_axis), int(rolling_window)) dash_json_str = json.dumps(dash_json, ensure_ascii=False, indent=2, default=str) with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") as f: f.write(dash_json_str) json_path = f.name fig1 = render_line(df, chosen[0]) fig2 = render_bar_or_dual(df, chosen[1], chosen[0], bool(dual_axis)) if len(chosen) > 1 else plt.figure() fig3, df_with_roll = render_rolling(df.copy(), chosen[0], int(rolling_window)) map_html = render_map_folium(df, value_col=chosen[0], size_col="count", cmap_name=cmap_choice, tiles=tiles_choice) point_choices = make_point_choices(df) default_choice = point_choices[0] if point_choices else "" detail_df = pick_detail(df, default_choice) demo_df = make_demo_dataframe() with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8") as f: demo_df.to_csv(f, index=False) demo_csv_path = f.name return ( fig1, fig2, fig3, map_html, dash_json_str, json_path, df_with_roll, demo_csv_path, gr.Dropdown(choices=point_choices, value=default_choice), detail_df, ) def regenerate_demo(series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice, current_choice): return pipeline("demo", None, "", series_choice, dual_axis, rolling_window, cmap_choice, tiles_choice) def update_detail(df: pd.DataFrame, choice: str): return pick_detail(df, choice) # ----------------------------- # UI(將 Google 來源改成只有下拉選單) # ----------------------------- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("## Grafana-like Demo + Folium Map(支援 Google Drive / Sheets,下拉選單選擇來源)") source_radio = gr.Radio(["upload", "drive", "demo"], label="資料來源", value="demo") file_in = gr.File(label="上傳 CSV(選 upload 時使用)", file_types=[".csv"]) # 只保留下拉選單,不再顯示可編輯的文字框 preset_dd = gr.Dropdown( label="Google 預設來源(3 個連結)", choices=DRIVE_PRESETS, value=DRIVE_PRESETS[0] ) series_multiselect = gr.CheckboxGroup(label="數值欄位", choices=[]) dual_axis_chk = gr.Checkbox(label="第二面板啟用雙軸", value=False) rolling_dd = gr.Dropdown(label="Rolling window", choices=["3", "5", "10", "20"], value="5") cmap_dd = gr.Dropdown(label="地圖配色 (colormap)", choices=["viridis", "plasma", "inferno", "magma", "cividis", "coolwarm"], value="viridis") tiles_dd = gr.Dropdown(label="地圖底圖 (tiles)", choices=["OpenStreetMap", "Stamen Terrain", "Stamen Toner", "CartoDB positron", "CartoDB dark_matter"], value="OpenStreetMap") with gr.Row(): run_btn = gr.Button("產生 Dashboard", scale=1) regen_btn = gr.Button("🔁 重新產生示範資料", scale=1) plot1 = gr.Plot(label="1:Line") plot2 = gr.Plot(label="2:Bar / Dual Axis") plot3 = gr.Plot(label="3:Rolling Mean") map_out = gr.HTML(label="4:Geo Map (Interactive + Legend)") json_box = gr.Code(label="grafanalib Dashboard JSON", language="json") json_file = gr.File(label="下載 dashboard.json") demo_csv_file = gr.File(label="下載示範資料 demo.csv") df_view = gr.Dataframe(label="資料預覽(含 rolling)", wrap=True) gr.Markdown("### 🔎 點位詳情(對應地圖彈窗中的 #ID)") point_selector = gr.Dropdown(label="選擇點位(#ID | 時間 | 值)", choices=[], value=None) detail_view = gr.Dataframe(label="選取點詳細資料", wrap=True) # 根據來源探勘欄位(drive 時讀取下拉的 URL) def probe_columns(source, file, preset_url): sheet_url = preset_url if source == "drive" else "" df = load_data(source, file, sheet_url) numeric_cols = [c for c in df.columns if c not in ["time", "lat", "lon", "pid"] and pd.api.types.is_numeric_dtype(df[c])] return gr.CheckboxGroup(choices=numeric_cols, value=numeric_cols[:2]), df source_radio.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) file_in.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) preset_dd.change(probe_columns, inputs=[source_radio, file_in, preset_dd], outputs=[series_multiselect, df_view]) # 初次載入:預設用第一個 Google 連結 demo.load( lambda: pipeline("drive", None, DRIVE_PRESETS[0], [], False, "5", "viridis", "OpenStreetMap"), inputs=None, outputs=[ plot1, plot2, plot3, map_out, json_box, json_file, df_view, demo_csv_file, point_selector, detail_view ] ) # 產生 / 重新產生 run_btn.click( pipeline, inputs=[source_radio, file_in, preset_dd, series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd], outputs=[ plot1, plot2, plot3, map_out, json_box, json_file, df_view, demo_csv_file, point_selector, detail_view ] ) regen_btn.click( regenerate_demo, inputs=[series_multiselect, dual_axis_chk, rolling_dd, cmap_dd, tiles_dd, point_selector], outputs=[ plot1, plot2, plot3, map_out, json_box, json_file, df_view, demo_csv_file, point_selector, detail_view ] ) point_selector.change( update_detail, inputs=[df_view, point_selector], outputs=[detail_view] ) if __name__ == "__main__": demo.launch()