Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import asyncio | |
| import os | |
| import sys | |
| import time | |
| import gradio as gr | |
| import uuid | |
| from datasets import load_dataset | |
| from huggingface_hub import whoami | |
| from loguru import logger | |
| from pathlib import Path | |
| from yourbench_space.config import generate_and_save_config | |
| from yourbench_space.utils import ( | |
| SubprocessManagerGroup, | |
| save_files, | |
| update_dataset, | |
| STAGES, | |
| is_running_locally | |
| ) | |
| from yourbench_space.evaluation import create_eval_file, run_evaluations | |
| from yourbench_space.leaderboard_space.env import HF_TOKEN | |
| project_description = """ | |
| # YourBench 🚀 | |
| **Dynamic Benchmark Generation for Language Models** | |
| Quickly create zero-shot benchmarks from your documents – keeping models accurate and adaptable | |
| - 📖 [FAQ](#) | |
| - 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space) | |
| """ | |
| logger.remove() | |
| logger.add(sys.stderr, level="INFO") | |
| # Global to store all managers per session | |
| MANAGERS = SubprocessManagerGroup() | |
| USER_ID_SESSION_MAP: dict[str, str] = dict() | |
| docs_path = Path(__file__).parent / "docs.md" | |
| citation_content = ( | |
| docs_path.read_text().split("# Citation")[-1].strip() | |
| if docs_path.exists() | |
| else "# Citation\n\nDocumentation file not found." | |
| ) | |
| def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State): | |
| manager = MANAGERS.get(session_state) | |
| if manager is None: # should not be possible | |
| return ( | |
| "❌ Config generation failed.", | |
| gr.update(visible=False, interactive=False), | |
| ) | |
| session_uid = session_state.value | |
| config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path) | |
| for _ in range(5): | |
| time.sleep(0.5) | |
| if config_path.exists(): | |
| return ( | |
| "✅ Config saved!", | |
| gr.update(value=str(config_path), visible=True, interactive=True), | |
| ) | |
| return ( | |
| "❌ Config generation failed.", | |
| gr.update(visible=False, interactive=False), | |
| ) | |
| final_dataset = None | |
| def update_process_status(session_state: gr.State): | |
| """Update process status and include exit details if process has terminated""" | |
| if session_state is None: | |
| return gr.update(value=False, label="Not running") | |
| manager = MANAGERS.get(session_state.value) | |
| if manager is None: | |
| return gr.update(value=False, label="Not running") | |
| is_running = manager.is_running() | |
| if not is_running: | |
| exit_code, exit_reason = manager.get_exit_details() | |
| status_text = f"Process Status: Stopped - {exit_reason}, exit code - {exit_code}" if exit_reason else "Process Status: Stopped" | |
| return gr.update(value=False, label=status_text) | |
| return gr.update(value=True, label="Process Status: Running") | |
| def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None): | |
| if oauth_token is None and not is_running_locally(): | |
| gr.Warning('You need to log in to use this Space') | |
| return | |
| new_env = os.environ.copy() | |
| if oauth_token: | |
| new_env["HF_TOKEN"] = oauth_token.token | |
| new_env["DATASET_PREFIX"] = hf_dataset_name | |
| MANAGERS.start_process(session_uid, custom_env=new_env) | |
| def update_hf_org_dropdown(oauth_token: gr.OAuthToken | None): | |
| if oauth_token is None: | |
| return gr.Dropdown([], label="Organization") | |
| try: | |
| user_info = whoami(oauth_token.token) | |
| org_names = [org["name"] for org in user_info.get("orgs", [])] | |
| user_name = user_info.get("name", "Unknown User") | |
| org_names.insert(0, user_name) | |
| return gr.Dropdown(org_names, value=user_name, label="Organization") | |
| except Exception as e: | |
| return gr.Dropdown([], label="Organization") | |
| def switch_to_run_generation_tab(): | |
| return gr.Tabs(selected=1) | |
| def enable_button(files): | |
| return gr.update(interactive=bool(files)) | |
| def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_name): | |
| # Test dataset existence | |
| eval_ds_name = f"{org_name}/{eval_name}" | |
| # Test dataset existence | |
| try: | |
| load_dataset(eval_ds_name, streaming=True, token=oauth_token.token) | |
| except Exception as e: | |
| print(f"Error while loading the dataset: {e}") | |
| return | |
| # Run evaluations | |
| create_eval_file(eval_ds_name) | |
| status = asyncio.run(run_evaluations(eval_ds_name=eval_ds_name, org=org_name)) | |
| # Create space | |
| from huggingface_hub import HfApi | |
| repo_id = f"{org_name}/leaderboard_yourbench_{eval_ds_name.replace('/', '_')}" | |
| api = HfApi() | |
| try: | |
| api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token) | |
| api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token) | |
| api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token) | |
| api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token) | |
| api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token) | |
| except Exception as e: | |
| status = "Evaluation" + status + "\nLeaderboard creation:" + e | |
| return status | |
| def init_session(profile: gr.OAuthProfile | None): | |
| """Update session on load""" | |
| if is_running_locally(): | |
| username = "local" | |
| elif profile: | |
| username = profile.username | |
| else: | |
| username = None | |
| local_uuid = USER_ID_SESSION_MAP.get(username, str(uuid.uuid4())) | |
| if manager := MANAGERS.get(local_uuid): | |
| if manager.is_running(): | |
| logger.info(f"Found existing running session for {local_uuid}, restoring") | |
| return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) | |
| else: | |
| logger.info(f"Found existing stale session for {local_uuid}, starting new") | |
| MANAGERS.remove(local_uuid) | |
| local_uuid = str(uuid.uuid4()) | |
| if username: | |
| USER_ID_SESSION_MAP[username] = local_uuid | |
| MANAGERS.create(local_uuid) | |
| logger.info(f"Started session for {local_uuid}") | |
| return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid)) | |
| with gr.Blocks(theme=gr.themes.Default()) as app: | |
| # We initialize the session state with the user randomly generated uuid | |
| # Using uuid4 makes collision cases extremely unlikely even for concurrent users | |
| session_state = gr.State() | |
| gr.Markdown(project_description) | |
| with gr.Tabs() as tabs: | |
| with gr.Tab("Setup", id=0): | |
| with gr.Row(): | |
| with gr.Accordion("Hugging Face Settings"): | |
| login_btn = gr.LoginButton() | |
| hf_org_dropdown = gr.Dropdown( | |
| choices=[], label="Organization", allow_custom_value=True | |
| ) | |
| app.load( | |
| update_hf_org_dropdown, inputs=None, outputs=hf_org_dropdown | |
| ) | |
| hf_dataset_name = gr.Textbox( | |
| label="Dataset name", | |
| value="yourbench", | |
| info="Name of your new evaluation dataset", | |
| ) | |
| with gr.Accordion("Upload documents"): | |
| file_input = gr.File( | |
| label="Upload text files", | |
| file_count="multiple", | |
| file_types=[".txt", ".md", ".html", ".pdf"], | |
| ) | |
| output = gr.Textbox(label="Log") | |
| file_input.upload( | |
| save_files, | |
| inputs=[session_state, file_input], | |
| outputs = output, | |
| ) | |
| preview_button = gr.Button("Generate New Config", interactive=False) | |
| log_message = gr.Textbox(label="Log Message", visible=True) | |
| download_button = gr.File( | |
| label="Download Config", visible=False, interactive=False | |
| ) | |
| file_input.change(enable_button, inputs=file_input, outputs=preview_button) | |
| preview_button.click( | |
| generate_and_return, | |
| inputs=[hf_org_dropdown, hf_dataset_name, session_state], | |
| outputs=[log_message, download_button], | |
| ) | |
| preview_button.click( | |
| switch_to_run_generation_tab, | |
| inputs=None, | |
| outputs=tabs, | |
| ) | |
| with gr.Tab("Run Generation", id=1): | |
| with gr.Row(): | |
| start_button = gr.Button("Start Task") | |
| start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name]) | |
| stop_button = gr.Button("Stop Task") | |
| stop_button.click(MANAGERS.stop_process, inputs=session_state) | |
| kill_button = gr.Button("Kill Task") | |
| kill_button.click(MANAGERS.kill_process, inputs=session_state) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Accordion("Log Output", open=True): | |
| log_output = gr.Code(language=None, lines=20, interactive=False) | |
| process_status = gr.Checkbox(label="Process Status", interactive=False) | |
| status_timer = gr.Timer(2.0, active=True) | |
| status_timer.tick(update_process_status, inputs=session_state, outputs=process_status) | |
| with gr.Column(): | |
| with gr.Accordion("Stages", open=True): | |
| stages_table = gr.CheckboxGroup( | |
| choices=STAGES, | |
| value=[], | |
| label="Pipeline Stages Completed", | |
| interactive=False, | |
| ) | |
| with gr.Accordion("Ingestion"): | |
| ingestion_df = gr.DataFrame() | |
| with gr.Accordion("Summarization"): | |
| summarization_df = gr.DataFrame() | |
| with gr.Accordion("Single-Hop"): | |
| single_hop = gr.DataFrame() | |
| with gr.Accordion("Answer Generation"): | |
| answers_df = gr.DataFrame() | |
| stages_table.change( | |
| update_dataset, inputs=[stages_table, hf_org_dropdown, hf_dataset_name], outputs=[ingestion_df, summarization_df, single_hop, answers_df] | |
| ) | |
| # TODO: this timer should only be active when the second tab is passed to active for the first time | |
| log_timer = gr.Timer(1.0, active=True) | |
| log_timer.tick( | |
| MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table] | |
| ) | |
| with gr.Tab("Evaluate", id=2, visible=False): | |
| with gr.Row(): | |
| btn_launch_evals = gr.Button("Launch evaluations") | |
| status = gr.Textbox(label="Status") | |
| btn_launch_evals.click(run_evaluation_pipeline, [hf_org_dropdown, hf_dataset_name], status) | |
| app.load(init_session, outputs=session_state) | |
| app.launch(allowed_paths=["/app"]) | |