Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import os | |
| import sys | |
| import time | |
| import uuid | |
| import asyncio | |
| from pathlib import Path | |
| from loguru import logger | |
| import gradio as gr | |
| from datasets import load_dataset | |
| from huggingface_hub import HfApi, whoami | |
| from yourbench_space import PATH | |
| from yourbench_space.utils import ( | |
| STAGES, | |
| SubprocessManagerGroup, | |
| save_files, | |
| update_dataset, | |
| map_stage_names, | |
| is_running_locally, | |
| on_generation_succsess, | |
| ) | |
| from yourbench_space.config import generate_and_save_config | |
| from yourbench_space.evaluation import run_evaluations, create_eval_file | |
| project_description = """ | |
| # YourBench 🚀 | |
| **Dynamic Benchmark Generation for Language Models** | |
| Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable | |
| - 💻 [Yourbench GitHub](https://github.com/huggingface/yourbench) | |
| """ | |
| logger.remove() | |
| logger.add(sys.stderr, level="INFO") | |
| # Global to store all managers per session | |
| MANAGERS = SubprocessManagerGroup() | |
| USER_ID_SESSION_MAP: dict[str, str] = {} | |
| docs_path = Path(__file__).parent / "docs.md" | |
| citation_content = ( | |
| docs_path.read_text().split("# Citation")[-1].strip() | |
| if docs_path.exists() | |
| else "# Citation\n\nDocumentation file not found." | |
| ) | |
| def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State): | |
| manager = MANAGERS.get(session_state) | |
| if manager is None: # should not be possible | |
| return ( | |
| "❌ Config generation failed", | |
| gr.update(visible=False, interactive=False), | |
| ) | |
| session_uid = session_state.value | |
| config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path) | |
| for _ in range(5): | |
| time.sleep(0.5) | |
| if config_path.exists(): | |
| gr.Success("✅ Config generated successfully!") | |
| return ( | |
| "✅ Config saved successfully!", | |
| gr.update(value=str(config_path), visible=True, interactive=True), | |
| ) | |
| gr.Error("Failed to generate config") | |
| return ( | |
| "❌ Config generation failed", | |
| gr.update(visible=False, interactive=False), | |
| ) | |
| final_dataset = None | |
| def update_process_status(session_state: gr.State): | |
| """Update process status and include exit details if process has terminated""" | |
| if session_state is None: | |
| return gr.update(value=False, label="Not running") | |
| manager = MANAGERS.get(session_state.value) | |
| if manager is None: | |
| return gr.update(value=False, label="Not running") | |
| is_running = manager.is_running() | |
| if not is_running: | |
| exit_code, exit_reason = manager.get_exit_details() | |
| status_text = ( | |
| f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" | |
| if exit_reason | |
| else "Process Status: Stopped" | |
| ) | |
| return gr.update(value=False, label=status_text) | |
| return gr.update(value=True, label="Process Status: Running") | |
| def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None): | |
| if oauth_token is None and not is_running_locally(): | |
| gr.Warning("You need to log in to use this Space") | |
| return | |
| new_env = os.environ.copy() | |
| if oauth_token: | |
| new_env["HF_TOKEN"] = oauth_token.token | |
| new_env["DATASET_PREFIX"] = hf_dataset_name | |
| MANAGERS.start_process(session_uid, custom_env=new_env) | |
| def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None): | |
| if oauth_token is None: | |
| return gr.Dropdown([], label="Organization") | |
| try: | |
| user_info = whoami(oauth_token.token) | |
| org_names = [org["name"] for org in user_info.get("orgs", [])] | |
| user_name = user_info.get("name", "Unknown User") | |
| org_names.insert(0, user_name) | |
| return gr.Dropdown(org_names, value=user_name, label="Organization") | |
| except Exception as e: | |
| return gr.Dropdown([], label="Organization") | |
| def switch_to_run_generation_tab(): | |
| return gr.Tabs(selected=1) | |
| def enable_button(files): | |
| return gr.update(interactive=bool(files)) | |
| def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name, config_name="lighteval"): | |
| eval_ds_name = f"{org_name}/{eval_name}" | |
| repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}" | |
| folder_path = str(Path(PATH) / "yourbench_space" / "leaderboard_space") | |
| try: | |
| load_dataset(eval_ds_name, name=config_name, streaming=True, token=oauth_token.token) | |
| except Exception as e: | |
| logger.error(f"Failed to load dataset '{eval_ds_name}': {e}") | |
| return "❌ Failed: Dataset loading error" | |
| try: | |
| create_eval_file(eval_ds_name) | |
| status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name)) | |
| except Exception as e: | |
| logger.error(f"Evaluation error: {e}") | |
| return f"❌ Failed: Evaluation error\n{e}" | |
| api = HfApi() | |
| space_was_regenerated = False | |
| try: | |
| api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk="gradio", | |
| token=oauth_token.token, | |
| ) | |
| except Exception as e: | |
| if "409" in str(e) and "already created this space repo" in str(e): | |
| logger.info(f"Space '{repo_id}' already exists. Deleting and regenerating it.") | |
| try: | |
| api.delete_repo(repo_id=repo_id, repo_type="space", token=oauth_token.token) | |
| api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk="gradio", | |
| token=oauth_token.token, | |
| ) | |
| space_was_regenerated = True | |
| except Exception as delete_err: | |
| logger.error(f"Failed to delete and recreate space '{repo_id}': {delete_err}") | |
| return f"✅ Evaluation succeeded\n❌ Failed: Could not recreate space\n{delete_err}" | |
| else: | |
| logger.error(f"Space creation error: {e}") | |
| return f"✅ Evaluation succeeded\n❌ Failed: Space creation error\n{e}" | |
| try: | |
| api.upload_folder( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| folder_path=folder_path, | |
| token=oauth_token.token, | |
| ) | |
| api.add_space_secret( | |
| repo_id=repo_id, | |
| key="HF_TOKEN", | |
| value=oauth_token.token, | |
| token=oauth_token.token, | |
| ) | |
| api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token) | |
| api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token) | |
| except Exception as e: | |
| logger.error(f"Failed during space setup: {e}") | |
| return f"✅ Evaluation succeeded\n❌ Failed: Space setup error\n{e}" | |
| if space_was_regenerated: | |
| return f"✅ Evaluation succeeded\n🔁 Space '{repo_id}' was regenerated successfully" | |
| return f"✅ Evaluation and Space creation completed successfully for: {repo_id}" | |
| def init_session(profile: gr.OAuthProfile | None): | |
| """Update session on load""" | |
| if is_running_locally(): | |
| username = "local" | |
| elif profile: | |
| username = profile.username | |
| else: | |
| username = None | |
| local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4())) | |
| if manager := MANAGERS.get(local_uuid): | |
| if manager.is_running(): | |
| logger.info(f"Found existing running session for {local_uuid}, restoring") | |
| return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) | |
| else: | |
| logger.info(f"Found existing stale session for {local_uuid}, starting new") | |
| MANAGERS.remove(local_uuid) | |
| local_uuid = str(uuid.uuid4()) | |
| if username: | |
| USER_ID_SESSION_MAP[username] = local_uuid | |
| MANAGERS.create(local_uuid) | |
| logger.info(f"Started session for {local_uuid}") | |
| return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) | |
| btn_launch_evals = gr.Button( | |
| "🚀 Launch Evaluation", | |
| visible=True, | |
| interactive=True, # Start non-interactive | |
| variant="primary", | |
| ) | |
| with gr.Blocks(theme=gr.themes.Default()) as app: | |
| session_state = gr.State() | |
| gr.Markdown(project_description) | |
| with gr.Tabs() as tabs: | |
| with gr.Tab("Choose Documents & Settings", id=0): | |
| with gr.Column(): | |
| gr.Markdown("### 📄 Choose your documents and settings") | |
| gr.Markdown( | |
| "Upload your source documents that will form the knowledge base for your benchmark. Set a Hugging Face organization and dataset name." | |
| ) | |
| gr.Markdown( | |
| "This step also generates a config file for running the benchmark pipeline. You can download it to run YourBench locally." | |
| ) | |
| with gr.Row(): | |
| with gr.Accordion("Hugging Face Settings"): | |
| login_btn = gr.LoginButton() | |
| hf_org_dropdown = gr.Dropdown(choices=[], label="Organization", allow_custom_value=True) | |
| app.load(update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown) | |
| hf_dataset_name = gr.Textbox( | |
| label="Dataset name", | |
| value="yourbench", | |
| info="Name of your new evaluation dataset", | |
| ) | |
| with gr.Accordion("Upload Files"): | |
| file_input = gr.File( | |
| label="Upload text files", | |
| file_count="multiple", | |
| file_types=[".txt", ".md", ".html", ".pdf"], | |
| ) | |
| output = gr.Textbox(label="Log") | |
| file_input.upload( | |
| save_files, | |
| inputs=[session_state, file_input], | |
| outputs=output, | |
| ) | |
| delete_button = gr.Button("Delete Uploaded Files", visible=False) | |
| preview_button = gr.Button("Generate New Config", interactive=False) | |
| log_message = gr.Textbox(label="Log Message", visible=True) | |
| download_button = gr.File(label="Download Config", visible=False, interactive=False) | |
| file_input.change( | |
| lambda files: gr.update(visible=bool(files)), | |
| inputs=file_input, | |
| outputs=delete_button, | |
| ) | |
| file_input.change(enable_button, inputs=file_input, outputs=preview_button) | |
| def clean_and_confirm(uid): | |
| MANAGERS.clean_workdir(uid) | |
| return ( | |
| "🗑️ All uploaded files have been deleted!", | |
| gr.update(value=None), | |
| gr.update(interactive=False), | |
| ) | |
| delete_button.click( | |
| clean_and_confirm, | |
| inputs=session_state, | |
| outputs=[output, file_input, preview_button], | |
| ) | |
| preview_button.click( | |
| generate_and_return, | |
| inputs=[hf_org_dropdown, hf_dataset_name, session_state], | |
| outputs=[log_message, download_button], | |
| ) | |
| preview_button.click( | |
| switch_to_run_generation_tab, | |
| inputs=None, | |
| outputs=tabs, | |
| ) | |
| with gr.Tab("Run Benchmark Pipeline", id=1): | |
| with gr.Column(): | |
| gr.Markdown("### ⚙️ Run the benchmark generation pipeline") | |
| gr.Markdown( | |
| "Start the pipeline to process documents, generate questions, and build the private evaluation dataset. Watch logs, track progress, and preview the results." | |
| ) | |
| with gr.Row(): | |
| start_button = gr.Button("Start Task") | |
| stop_button = gr.Button("Stop Task") | |
| kill_button = gr.Button("Kill Task") | |
| start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name]) | |
| stop_button.click(MANAGERS.stop_process, inputs=session_state) | |
| kill_button.click(MANAGERS.kill_process, inputs=session_state) | |
| process_status = gr.Checkbox(label="Process Status", interactive=False) | |
| status_timer = gr.Timer(2.0, active=True) | |
| status_timer.tick(update_process_status, inputs=session_state, outputs=process_status) | |
| with gr.Row(): | |
| with gr.Accordion("Stages", open=True): | |
| stages_table = gr.CheckboxGroup( | |
| choices=map_stage_names(STAGES), | |
| value=[], | |
| label="Pipeline Stages Completed", | |
| container=False, | |
| interactive=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Accordion("Log Output", open=True): | |
| log_output = gr.Code(language=None, lines=20, interactive=False) | |
| with gr.Column(): | |
| with gr.Accordion("Ingestion Preview"): | |
| ingestion_df = gr.DataFrame() | |
| with gr.Accordion("Summarization Preview"): | |
| summarization_df = gr.DataFrame() | |
| with gr.Accordion("Single Shot Preview"): | |
| single_shot_df = gr.DataFrame() | |
| with gr.Accordion("Multi Hop Preview"): | |
| multi_hop_df = gr.DataFrame() | |
| with gr.Accordion("Lighteval Preview"): | |
| lighteval_df = gr.DataFrame() | |
| stages_table.change( | |
| update_dataset, | |
| inputs=[stages_table, hf_org_dropdown, hf_dataset_name], | |
| outputs=[ingestion_df, summarization_df, single_shot_df, multi_hop_df, lighteval_df], | |
| ) | |
| stages_table.change( | |
| on_generation_succsess, | |
| inputs=stages_table, | |
| outputs=[tabs, btn_launch_evals], | |
| ) | |
| # TODO: this timer should only be active when the second tab is passed to active for the first time | |
| log_timer = gr.Timer(1.0, active=True) | |
| log_timer.tick( | |
| MANAGERS.read_and_get_output, | |
| inputs=session_state, | |
| outputs=[log_output, stages_table], | |
| ) | |
| with gr.Tab("Evaluate Models on Benchmark", id=2): | |
| with gr.Column(): | |
| gr.Markdown("### 🧪 Evaluate models on your benchmark") | |
| gr.Markdown( | |
| "Runs the evaluation with [Lighteval](https://github.com/huggingface/lighteval) on the resulted dataset using 5+ open models, then deploys a leaderboard as a Hugging Face Space under your org." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| btn_launch_evals.render() | |
| with gr.Column(): | |
| clear_status_btn = gr.Button("Clear", variant="secondary") | |
| with gr.Accordion("Evaluation Log", open=True): | |
| eval_status = gr.Textbox(label="", lines=6, interactive=False, show_label=False) | |
| btn_launch_evals.click( | |
| run_evaluation_pipeline, | |
| [hf_org_dropdown, hf_dataset_name, gr.State("lighteval")], | |
| eval_status, | |
| ) | |
| clear_status_btn.click(lambda: "", outputs=eval_status) | |
| app.load(init_session, outputs=session_state) | |
| app.launch(allowed_paths=[PATH]) | |