Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import io | |
| import os | |
| import re | |
| import pathlib | |
| import shutil | |
| import subprocess | |
| import gradio as gr | |
| import pandas as pd | |
| from collections import defaultdict | |
| from datasets import load_dataset | |
| from loguru import logger | |
| from typing import List, Union, Optional | |
| STAGES = [ | |
| "ingestion", | |
| "upload_ingest_to_hub", | |
| "summarization", | |
| "chunking", | |
| "single_shot_question_generation", | |
| "answer_generation", | |
| #"evaluate_models", | |
| #"create_leaderboard" | |
| # "judge_answers", # to uncomment when fixed | |
| ] | |
| def is_running_locally() -> bool: | |
| """ | |
| Returns True if Gradio is running locally, False if it's running in a Hugging Face Space. | |
| """ | |
| return os.getenv("SPACE_ID") is None # SPACE_ID is set in Hugging Face Spaces | |
| def save_files(oauth_token: gr.OAuthToken | None, session_state: gr.State, files: List[pathlib.Path]) -> str: | |
| """Save uploaded files to the UPLOAD_DIRECTORY/uuid safely""" | |
| if oauth_token is None and not is_running_locally(): | |
| gr.Warning('You need to log in to use this Space') | |
| return | |
| saved_paths = [] | |
| for file in [file.name for file in files]: | |
| try: | |
| source_path = pathlib.Path(file) | |
| upload_directory_uuid = pathlib.Path(f"/app/{session_state.value}/uploaded_files") | |
| # Ensure the upload directory exists | |
| upload_directory_uuid.mkdir(parents=True, exist_ok=True) | |
| destination_path = upload_directory_uuid / source_path.name | |
| if not source_path.exists(): | |
| print(f"File not found: {source_path}") | |
| continue # Skip missing files | |
| shutil.move(str(source_path), str(destination_path)) | |
| saved_paths.append(str(destination_path)) | |
| except Exception as e: | |
| print(f"Error moving file {file}: {e}") | |
| return ( | |
| f"Files saved to: {', '.join(saved_paths)}" | |
| if saved_paths | |
| else "No files were saved" | |
| ) | |
| def update_dataset(stages: list, hf_org: str, hf_prefix: str, oauth_token: gr.OAuthToken): | |
| """ | |
| Updates the dataset based on the provided stages and dataset configuration. | |
| """ | |
| ingestion_df = pd.DataFrame() | |
| summarization_df = pd.DataFrame() | |
| single_hop_df = pd.DataFrame() | |
| answers_df = pd.DataFrame() | |
| # Construct dataset name from config | |
| dataset_name = f"{hf_org}/{hf_prefix}" | |
| if "ingestion" in stages: | |
| # TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits) | |
| ingestion_ds = load_dataset(dataset_name, name="ingested", split="train", streaming=True, token=oauth_token.token).select_columns("document_text") | |
| ingestion_df = pd.DataFrame([next(iter(ingestion_ds)) for _ in range(1)]) # only one row | |
| if "summarization" in stages: | |
| summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True, token=oauth_token.token).select_columns(['raw_document_summary', 'document_summary', 'summarization_model']) | |
| summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(1)]) | |
| if "single_shot_question_generation" in stages: | |
| single_hop_ds = load_dataset(dataset_name, name="single_shot_question_generation", split="train", streaming=True, token=oauth_token.token) | |
| single_hop_df = pd.DataFrame([next(iter(single_hop_ds)) for _ in range(5)]) | |
| if "answer_generation" in stages: | |
| answers_ds = load_dataset(dataset_name, name="answer_generation", split="train", streaming=True, token=oauth_token.token) | |
| answers_df = pd.DataFrame([next(iter(answers_ds)) for _ in range(5)]) | |
| return (ingestion_df, summarization_df, single_hop_df, answers_df) | |
| class SubprocessManagerGroup: | |
| """Instanciates one manager per user (should be used as a singleton class)""" | |
| def __init__(self): | |
| self.managers: dict[str, SubprocessManager] = {} | |
| def grab_uuid(uid: Union[str, gr.State]): | |
| """If a gradio session state is provided, we pull the uuid from its value - else we assume the str is the uuid""" | |
| if isinstance(uid, gr.State): | |
| uid = uid.value | |
| return uid | |
| def create(self, uid: Union[str, gr.State]): | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| self.managers[uid] = SubprocessManager(uid) | |
| def get(self, uid: Union[str, "gr.State"]) -> Optional["SubprocessManager"]: | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| return self.managers.get(uid) | |
| def remove(self, uid: Union[str, gr.State]): | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| if manager := self.managers.get(uid): | |
| manager.stop_process() | |
| del self.managers[uid] | |
| def start_process(self, uid: Union[str, gr.State], custom_env: dict | None): | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| self.managers[uid].start_process(custom_env=custom_env) | |
| def stop_process(self, uid: Union[str, gr.State]): | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| self.managers[uid].stop_process() | |
| def kill_process(self, uid: Union[str, gr.State]): | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| self.managers[uid].kill_process() | |
| def read_and_get_output(self, uid: Union[str, gr.State]): | |
| if uid is None: | |
| return "", [] | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| return self.managers[uid].read_and_get_output() | |
| def is_running(self, uid: Union[str, gr.State]) -> bool: | |
| uid = SubprocessManagerGroup.grab_uuid(uid) | |
| if manager := self.managers.get(uid): | |
| return manager.is_running() | |
| return False | |
| class SubprocessManager: | |
| def __init__(self, session_uid: str): | |
| self.session_uid = session_uid | |
| self.path = pathlib.Path(f"/app/{session_uid}") | |
| self.path.mkdir(parents=True, exist_ok=True) | |
| self.config_path = pathlib.Path(f"{self.path}/config.yml") | |
| self.command = ["uv", "run", "yourbench", f"--config", str(self.config_path)] | |
| self.process = None | |
| self.output_stream = io.StringIO() | |
| self.exit_code = None | |
| def start_process(self, custom_env: dict | None): | |
| """Start the subprocess.""" | |
| if self.is_running(): | |
| logger.info("Process is already running") | |
| return | |
| self.output_stream = io.StringIO() | |
| self.exit_code = None | |
| try: | |
| logger.info(f"Starting process with command: {' '.join(self.command)}") | |
| self.process = subprocess.Popen( | |
| self.command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Combine stderr with stdout | |
| text=True, | |
| bufsize=1, | |
| start_new_session=True, | |
| env=custom_env, | |
| ) | |
| os.set_blocking(self.process.stdout.fileno(), False) | |
| logger.info(f"Started process with PID: {self.process.pid}") | |
| except Exception as e: | |
| logger.error(f"Failed to start process: {str(e)}") | |
| return | |
| def read_and_get_output(self): | |
| """Read subprocess output, capture it, and return log and completed stages.""" | |
| current_output = "" | |
| completed_stages = [] | |
| if self.process and self.process.stdout: | |
| try: | |
| while True: | |
| line = self.process.stdout.readline() | |
| if line: | |
| self.output_stream.write(line) | |
| else: | |
| break | |
| except BlockingIOError: | |
| pass | |
| current_output = self.output_stream.getvalue() | |
| completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output))) | |
| return current_output, completed_stages | |
| def stop_process(self): | |
| """Terminate the subprocess.""" | |
| if not self.is_running(): | |
| logger.info("Process is not running") | |
| return | |
| logger.info("Sending SIGTERM to the Process") | |
| try: | |
| self.process.terminate() | |
| self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate | |
| logger.info(f"Process terminated by user with exit code {self.exit_code}") | |
| except subprocess.TimeoutExpired: | |
| logger.warning("Process did not terminate within timeout, sending SIGKILL") | |
| self.kill_process() | |
| def kill_process(self): | |
| """Forcefully kill the subprocess""" | |
| if not self.is_running(): | |
| logger.info("Process is not running") | |
| return | |
| logger.info("Sending SIGKILL to the Process") | |
| try: | |
| self.process.kill() | |
| self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed | |
| logger.info(f"Process killed by user with exit code {self.exit_code}") | |
| except subprocess.TimeoutExpired: | |
| logger.error("Process could not be killed within timeout") | |
| def is_running(self): | |
| """Check if the subprocess is still running""" | |
| if self.process is None: | |
| return False | |
| return self.process.poll() is None | |
| def get_exit_details(self): | |
| """Return exit code and reason if process has terminated""" | |
| if self.process is None: | |
| return None, "Process was never started" | |
| if self.is_running(): | |
| return None, "Process is still running" | |
| if not self.exit_code is None and self.exit_code != 0 : | |
| return self.exit_code, "Process exited abnormaly" | |
| return self.exit_code, "Process exited normaly" | |
| def __del__(self): | |
| """Stop the process when object is deleted""" | |
| if self.process: | |
| self.process.kill() | |