import gradio as gr import os import random import time from datetime import datetime from functools import partial import json import io from huggingface_hub import HfApi from huggingface_hub.hf_api import HfHubHTTPError import traceback from itertools import combinations # ==== 全局配置 (部分保持不变) ==== # ... (BASE_IMAGE_DIR, TARGET_DIR, METHOD_ROOTS, SUBJECTS, etc. 保持不变) ... BASE_IMAGE_DIR = "/data/images/images" TARGET_DIR_BASENAME = "gt" TARGET_DIR = os.path.join(BASE_IMAGE_DIR, TARGET_DIR_BASENAME) METHOD_ROOTS = [] if os.path.exists(BASE_IMAGE_DIR): try: METHOD_ROOTS = [ os.path.join(BASE_IMAGE_DIR, d) for d in os.listdir(BASE_IMAGE_DIR) if os.path.isdir(os.path.join(BASE_IMAGE_DIR, d)) and \ d != TARGET_DIR_BASENAME and \ not d.startswith('.') ] if not METHOD_ROOTS: print(f"警告:在 '{BASE_IMAGE_DIR}' 中没有找到有效的方法目录 (除了 '{TARGET_DIR_BASENAME}')。") else: print(f"已识别的方法根目录: {METHOD_ROOTS}") except Exception as e: print(f"错误:在扫描 '{BASE_IMAGE_DIR}' 时发生错误: {e}"); METHOD_ROOTS = [] else: print(f"警告:基础目录 '{BASE_IMAGE_DIR}' 不存在。将无法加载候选图片。") SUBJECTS = ["subj01", "subj02", "subj05", "subj07"] SENTINEL_TRIAL_INTERVAL = 20 NUM_TRIALS_PER_RUN = 100 LOG_BATCH_SIZE = 5 DATASET_REPO_ID = "YanmHa/image-aligned-experiment-data" INDIVIDUAL_LOGS_FOLDER = "individual_choice_logs" BATCH_LOG_FOLDER = "run_logs_batch" CSS = ".gr-block {margin-top: 4px !important; margin-bottom: 4px !important;} .compact_button { padding: 4px 8px; min-width: auto; }" # ==== 全局持久化历史记录 ==== DATA_SUBDIR_NAME = "experiment_data_storage" # 子目录名 # 确保脚本有权限获取CWD并创建目录 # (在Hugging Face Spaces中,通常应用有权限在其工作目录下创建子目录) base_path = os.getcwd() # 获取当前工作目录,例如 /home/user/app full_subdir_path = os.path.join(base_path, DATA_SUBDIR_NAME) if not os.path.exists(full_subdir_path): try: os.makedirs(full_subdir_path) print(f"成功创建子目录: {full_subdir_path}") except Exception as e: print(f"错误:创建子目录 '{full_subdir_path}' 失败: {e}") # 如果创建失败,这里应该有一个处理,比如退出或使用根目录作为后备 # 为了简单起见,如果失败,后续路径仍然会尝试用这个subdir,可能会出错,或者您可以设置一个标志 GLOBAL_HISTORY_FILE = os.path.join(full_subdir_path, "global_experiment_shown_pairs.json") # 检查子目录是否真的创建成功了 if not os.path.isdir(full_subdir_path): print(f"警告:子目录 '{full_subdir_path}' 未能成功创建或不是一个目录。全局历史文件将尝试保存到根目录。") GLOBAL_HISTORY_FILE = os.path.join(base_path, "global_experiment_shown_pairs.json") # 退回到之前的逻辑(根目录) print(f"全局历史文件将被加载/保存到: {GLOBAL_HISTORY_FILE}") # 打印最终确定的绝对路径 global_shown_pairs_cache = {} global_history_has_unsaved_changes = False print(f"调试:检查 '{base_path}' (CWD) 的内容:") try: for item in os.listdir(base_path): item_path_full = os.path.join(base_path, item) item_type = "DIR" if os.path.isdir(item_path_full) else "FILE" print(f" - CWD Item: {item} ({item_type})") except Exception as e: print(f" 错误:列出CWD '{base_path}' 内容失败: {e}") print(f"调试:检查子目录 '{full_subdir_path}' 是否真的存在且为目录...") if os.path.exists(full_subdir_path) and os.path.isdir(full_subdir_path): print(f" 确认:'{full_subdir_path}' 存在并且是一个目录。") print(f" 调试:'{full_subdir_path}' 的内容:") try: for item in os.listdir(full_subdir_path): item_path_in_subdir = os.path.join(full_subdir_path, item) item_type_in_subdir = "DIR" if os.path.isdir(item_path_in_subdir) else "FILE" print(f" - Subdir Item: {item} ({item_type_in_subdir})") except Exception as e: print(f" 错误:列出子目录 '{full_subdir_path}' 内容失败: {e}") else: print(f" 警告:'{full_subdir_path}' 不存在或不是一个目录 (检查结果)。") def load_global_shown_pairs(): global global_shown_pairs_cache, global_history_has_unsaved_changes if os.path.exists(GLOBAL_HISTORY_FILE): try: with open(GLOBAL_HISTORY_FILE, 'r', encoding='utf-8') as f: data_from_file = json.load(f) global_shown_pairs_cache = { target_img: {frozenset(pair) for pair in pairs_list} for target_img, pairs_list in data_from_file.items() } print(f"已成功从 '{GLOBAL_HISTORY_FILE}' 加载全局已展示图片对历史。") except Exception as e: print(f"错误:加载全局历史文件 '{GLOBAL_HISTORY_FILE}' 失败: {e}。将使用空历史记录。") global_shown_pairs_cache = {} else: print(f"信息:全局历史文件 '{GLOBAL_HISTORY_FILE}' 未找到。将创建新的空历史记录。") global_shown_pairs_cache = {} global_history_has_unsaved_changes = False # 初始化或加载后,标记为无未保存更改 def save_global_shown_pairs(): global global_shown_pairs_cache, global_history_has_unsaved_changes # 确保可以修改这个flag print(f"--- save_global_shown_pairs --- 当前工作目录: {os.getcwd()}") # 调试信息 print(f"--- save_global_shown_pairs --- 尝试保存到文件: {os.path.abspath(GLOBAL_HISTORY_FILE)}") # 调试信息 # print("尝试保存全局图片对历史...") # 调试信息 try: data_to_save = { target_img: [sorted(list(pair_fset)) for pair_fset in pairs_set] for target_img, pairs_set in global_shown_pairs_cache.items() } temp_file = GLOBAL_HISTORY_FILE + ".tmp" with open(temp_file, 'w', encoding='utf-8') as f: json.dump(data_to_save, f, ensure_ascii=False, indent=2) os.replace(temp_file, GLOBAL_HISTORY_FILE) print(f"已成功将全局已展示图片对历史保存到 '{GLOBAL_HISTORY_FILE}'。") global_history_has_unsaved_changes = False # 保存成功后重置标志 return True except Exception as e: print(f"错误:保存全局历史文件 '{GLOBAL_HISTORY_FILE}' 失败: {e}") return False load_global_shown_pairs() # ==== 加载所有可用的目标图片 (保持不变) ==== master_image_list = [] if os.path.exists(TARGET_DIR): try: master_image_list = sorted( [f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))], key=lambda x: int(os.path.splitext(x)[0]) ) except ValueError: master_image_list = sorted([f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))]) if master_image_list: print(f"警告: '{TARGET_DIR}' 文件名非纯数字,按字母排序。") if not master_image_list: print(f"警告:在 '{TARGET_DIR}' 中无有效图片。") elif not os.path.exists(TARGET_DIR) and os.path.exists(BASE_IMAGE_DIR): print(f"错误:目标目录 '{TARGET_DIR}' 未找到。") if not master_image_list: print(f"关键错误:由于 '{TARGET_DIR}' 问题,无目标图片,实验无法进行。") # ==== 辅助函数 ==== def get_next_trial_info(current_trial_idx_in_run, current_run_image_list_for_trial, num_trials_in_this_run_for_trial): global TARGET_DIR, METHOD_ROOTS, SUBJECTS, SENTINEL_TRIAL_INTERVAL, global_shown_pairs_cache, global_history_has_unsaved_changes # ... (函数开始部分与上一版相同,直到哨兵试验逻辑结束) ... if not current_run_image_list_for_trial or current_trial_idx_in_run >= num_trials_in_this_run_for_trial: return None, current_trial_idx_in_run img_filename_original = current_run_image_list_for_trial[current_trial_idx_in_run] target_full_path = os.path.join(TARGET_DIR, img_filename_original) trial_number_for_display = current_trial_idx_in_run + 1 pool = [] for m_root_path in METHOD_ROOTS: method_name = os.path.basename(m_root_path) for s_id in SUBJECTS: base, ext = os.path.splitext(img_filename_original) reconstructed_filename = f"{base}_0{ext}" candidate_path = os.path.join(m_root_path, s_id, reconstructed_filename) if os.path.exists(candidate_path): internal_label = f"{method_name}/{s_id}/{reconstructed_filename}" pool.append((internal_label, candidate_path)) trial_info = {"image_id": img_filename_original, "target_path": target_full_path, "cur_no": trial_number_for_display, "is_sentinel": False, "left_display_label": "N/A", "left_internal_label": "N/A", "left_path": None, "right_display_label": "N/A", "right_internal_label": "N/A", "right_path": None} is_potential_sentinel_trial = (trial_number_for_display > 0 and trial_number_for_display % SENTINEL_TRIAL_INTERVAL == 0) if is_potential_sentinel_trial: # 哨兵试验逻辑不变 if not pool: print(f"警告:哨兵图 '{img_filename_original}' (trial {trial_number_for_display}) 无候选。") else: print(f"生成哨兵试验 for '{img_filename_original}' (trial {trial_number_for_display})") trial_info["is_sentinel"] = True sentinel_candidate_target_tuple = ("目标图像", target_full_path) random_reconstruction_candidate_tuple = random.choice(pool) candidates_for_sentinel = [ (("目标图像", target_full_path), sentinel_candidate_target_tuple[0]), (("重建图", random_reconstruction_candidate_tuple[1]), random_reconstruction_candidate_tuple[0]) ] random.shuffle(candidates_for_sentinel) trial_info.update({ "left_display_label": candidates_for_sentinel[0][0][0], "left_path": candidates_for_sentinel[0][0][1], "left_internal_label": candidates_for_sentinel[0][1], "right_display_label": candidates_for_sentinel[1][0][0], "right_path": candidates_for_sentinel[1][0][1], "right_internal_label": candidates_for_sentinel[1][1], }) else: # 常规试验 if len(pool) < 2: print(f"警告:常规图 '{img_filename_original}' (trial {trial_number_for_display}) 候选少于2 (找到 {len(pool)})。此试验无法进行。") return None, current_trial_idx_in_run target_global_history_set = global_shown_pairs_cache.setdefault(img_filename_original, set()) all_possible_pairs_in_pool = [] for c1, c2 in combinations(pool, 2): pair_labels_fset = frozenset({c1[0], c2[0]}) all_possible_pairs_in_pool.append( ((c1, c2), pair_labels_fset) ) unseen_globally_pairs_with_data = [ item for item in all_possible_pairs_in_pool if item[1] not in target_global_history_set ] selected_candidates_tuples = None if unseen_globally_pairs_with_data: chosen_pair_data_and_labels = random.choice(unseen_globally_pairs_with_data) selected_candidates_tuples = chosen_pair_data_and_labels[0] chosen_pair_frozenset = chosen_pair_data_and_labels[1] target_global_history_set.add(chosen_pair_frozenset) global_history_has_unsaved_changes = True # <--- 标记全局历史已更新 # 不再在此处调用 save_global_shown_pairs() # print(f"调试:目标 '{img_filename_original}': 新全局唯一对 {chosen_pair_frozenset} 已添加至缓存。未保存更改标志: {global_history_has_unsaved_changes}") else: # ... (处理所有对都已展示过的情况,与上一版相同) ... print(f"警告:目标图 '{img_filename_original}': 来自当前池的所有 ({len(all_possible_pairs_in_pool)}) 个候选对均已在全局展示过。") if all_possible_pairs_in_pool: print("将随机选择一个重复的对(全局重复)。") chosen_pair_data_and_labels = random.choice(all_possible_pairs_in_pool) selected_candidates_tuples = chosen_pair_data_and_labels[0] else: print(f"错误:即使允许全局重复,也无法从池中选择图片对(池大小 {len(pool)})。") return None, current_trial_idx_in_run display_order_candidates = list(selected_candidates_tuples) if random.random() > 0.5: display_order_candidates = display_order_candidates[::-1] trial_info.update({ "left_display_label": "候选图 1", "left_path": display_order_candidates[0][1], "left_internal_label": display_order_candidates[0][0], "right_display_label": "候选图 2", "right_path": display_order_candidates[1][1], "right_internal_label": display_order_candidates[1][0], }) return trial_info, current_trial_idx_in_run + 1 # ==== 批量保存用户选择日志函数 (保持不变) ==== def save_collected_logs_batch(list_of_log_entries, user_identifier_str, batch_identifier): global DATASET_REPO_ID, BATCH_LOG_FOLDER if not list_of_log_entries: print("批量保存用户日志:没有累积的日志。") return True # 认为无日志即成功 identifier_safe = str(user_identifier_str if user_identifier_str else "unknown_user_session").replace('.', '_').replace(':', '_').replace('/', '_').replace(' ', '_') print(f"用户 {identifier_safe} - 准备批量保存 {len(list_of_log_entries)} 条选择日志 (批次标识: {batch_identifier})...") try: token = os.getenv("HF_TOKEN") if not token: print("错误:HF_TOKEN 未设置。无法批量保存选择日志。"); return False if not DATASET_REPO_ID: print("错误:DATASET_REPO_ID 未配置。无法批量保存选择日志。"); return False api = HfApi(token=token) timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f') batch_filename = f"batch_user-{identifier_safe}_id-{batch_identifier}_{timestamp_str}_logs-{len(list_of_log_entries)}.jsonl" path_in_repo = f"{BATCH_LOG_FOLDER}/{identifier_safe}/{batch_filename}" jsonl_content = "" for log_entry in list_of_log_entries: try: if isinstance(log_entry, dict): jsonl_content += json.dumps(log_entry, ensure_ascii=False) + "\n" else: print(f"警告:批量保存选择日志时,条目非字典:{log_entry}") except Exception as json_err: print(f"错误:批量保存选择日志序列化单条时出错: {log_entry}. 错误: {json_err}") jsonl_content += json.dumps({"error": "serialization_failed_in_batch_user_log", "original_data_preview": str(log_entry)[:100],"timestamp": datetime.now().isoformat()}, ensure_ascii=False) + "\n" if not jsonl_content.strip(): print(f"用户 {identifier_safe} (批次 {batch_identifier}) 无可序列化选择日志。"); return True log_bytes = jsonl_content.encode('utf-8') file_like_object = io.BytesIO(log_bytes) print(f"准备批量上传选择日志文件: {path_in_repo} ({len(log_bytes)} bytes)") api.upload_file( path_or_fileobj=file_like_object, path_in_repo=path_in_repo, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"Batch user choice logs for {identifier_safe}, batch_id {batch_identifier} ({len(list_of_log_entries)} entries)" ) print(f"批量选择日志已成功保存到 HF Dataset: {DATASET_REPO_ID}/{path_in_repo}") return True except Exception as e: print(f"批量保存选择日志 (user {identifier_safe}, batch_id {batch_identifier}) 失败: {e}"); traceback.print_exc() return False # ==== 主要的 Gradio 事件处理函数 ==== def process_experiment_step( s_trial_idx_val, s_run_no_val, s_user_logs_val, s_current_trial_data_val, s_user_session_id_val, s_current_run_image_list_val, s_num_trials_this_run_val, action_type=None, choice_value=None, request: gr.Request = None ): global master_image_list, NUM_TRIALS_PER_RUN, outputs_ui_components_definition, LOG_BATCH_SIZE, global_history_has_unsaved_changes # ... (函数开始部分与上一版类似) ... output_s_trial_idx = s_trial_idx_val; output_s_run_no = s_run_no_val output_s_user_logs = list(s_user_logs_val); output_s_current_trial_data = dict(s_current_trial_data_val) if s_current_trial_data_val else {} output_s_user_session_id = s_user_session_id_val; output_s_current_run_image_list = list(s_current_run_image_list_val) output_s_num_trials_this_run = s_num_trials_this_run_val user_ip_fallback = request.client.host if request else "unknown_ip" user_identifier_for_logging = output_s_user_session_id if output_s_user_session_id else user_ip_fallback len_ui_outputs = len(outputs_ui_components_definition) def create_ui_error_tuple(message, progress_msg_text): return (gr.update(visible=False),) * 3 + ("", "", message, progress_msg_text) + (gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False)) + (gr.update(visible=False),) def create_no_change_tuple(): return (gr.update(),) * len_ui_outputs user_id_display_text = output_s_user_session_id if output_s_user_session_id else "用户ID待分配" if action_type == "record_choice": # ... (日志记录逻辑与上一版相同) ... if output_s_current_trial_data.get("data") and output_s_current_trial_data["data"].get("left_internal_label"): chosen_internal_label = (output_s_current_trial_data["data"]["left_internal_label"] if choice_value == "left" else output_s_current_trial_data["data"]["right_internal_label"]) parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "N/A", "N/A", "N/A" if chosen_internal_label == "目标图像": parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "TARGET", "GT", output_s_current_trial_data["data"]["image_id"] else: parts = chosen_internal_label.split('/'); if len(parts) == 3: parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = parts[0].strip(), parts[1].strip(), parts[2].strip() elif len(parts) == 2: parsed_chosen_method, parsed_chosen_subject = parts[0].strip(), parts[1].strip() elif len(parts) == 1: parsed_chosen_method = parts[0].strip() log_entry = { "timestamp": datetime.now().isoformat(), "user_identifier": user_identifier_for_logging, "run_no": output_s_run_no, "image_id": output_s_current_trial_data["data"]["image_id"], "left_internal_label": output_s_current_trial_data["data"]["left_internal_label"], "right_internal_label": output_s_current_trial_data["data"]["right_internal_label"], "chosen_side": choice_value, "chosen_internal_label": chosen_internal_label, "chosen_method": parsed_chosen_method, "chosen_subject": parsed_chosen_subject, "chosen_filename": parsed_chosen_filename, "trial_sequence_in_run": output_s_current_trial_data["data"]["cur_no"], "is_sentinel": output_s_current_trial_data["data"]["is_sentinel"] } output_s_user_logs.append(log_entry) print(f"用户 {user_identifier_for_logging} 记录选择 (img: {log_entry['image_id']})。当前批次日志数: {len(output_s_user_logs)}") # !!! 修改:当用户日志达到批量大小时,同时尝试保存全局历史(如果它有更改)!!! if len(output_s_user_logs) >= LOG_BATCH_SIZE: print(f"累积用户选择日志达到 {LOG_BATCH_SIZE} 条,准备批量保存...") batch_id_for_filename = f"run{output_s_run_no}_trialidx{output_s_trial_idx}_logcount{len(output_s_user_logs)}" # 1. 保存用户选择日志 user_logs_save_success = save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename) if user_logs_save_success: print("批量用户选择日志已成功(或尝试)保存,将清空累积的用户选择日志列表。") output_s_user_logs = [] # 清空已保存的用户日志 else: print("警告:批量用户选择日志保存失败。选择日志将继续累积,下次达到阈值时重试。") # 2. 检查并保存全局图片对历史(如果自上次保存后有更改) if global_history_has_unsaved_changes: print("检测到全局图片对历史自上次保存后有更新,将一并保存...") save_global_shown_pairs() # 此函数内部会在成功保存后将 global_history_has_unsaved_changes 置为 False else: print("全局图片对历史自上次保存后无更新,无需保存。") else: # 处理记录选择时数据为空的错误 print(f"用户 {user_identifier_for_logging} 错误:记录选择时数据为空!") error_ui_updates = create_ui_error_tuple("记录选择时内部错误。", f"用户ID: {user_id_display_text} | 进度:{output_s_trial_idx}/{output_s_num_trials_this_run}") # 返回所有状态变量的当前值以及错误UI更新 return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates # ... (start_experiment 逻辑与上一版相同) ... if action_type == "start_experiment": is_first = (output_s_num_trials_this_run == 0 and output_s_trial_idx == 0 and output_s_run_no == 1) is_completed_for_restart = (output_s_num_trials_this_run > 0 and output_s_trial_idx >= output_s_num_trials_this_run) if is_first or is_completed_for_restart: if not master_image_list: error_ui = create_ui_error_tuple("错误: 无可用目标图片!", f"用户ID: {user_id_display_text} | 进度: 0/0"); return 0, output_s_run_no, output_s_user_logs, {}, output_s_user_session_id, [], 0, *error_ui if is_completed_for_restart: output_s_run_no += 1 num_avail = len(master_image_list); run_size = min(num_avail, NUM_TRIALS_PER_RUN) if run_size == 0: error_ui = create_ui_error_tuple("错误: 采样图片数为0!", f"用户ID: {user_id_display_text} | 进度: 0/0"); return 0, output_s_run_no, output_s_user_logs, {}, output_s_user_session_id, [], 0, *error_ui output_s_current_run_image_list = random.sample(master_image_list, run_size) output_s_num_trials_this_run = run_size output_s_trial_idx = 0 output_s_current_trial_data = {} print(f"开始/继续轮次 {output_s_run_no} (用户ID: {output_s_user_session_id}). 随机选择 {output_s_num_trials_this_run} 张图片.") else: print(f"用户 {user_identifier_for_logging} 在第 {output_s_run_no} 轮,试验 {output_s_trial_idx} 点击开始,但轮次未完成。忽略。") no_change_ui = create_no_change_tuple() return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *no_change_ui # ... (轮次结束处理与上一版相同) ... if output_s_trial_idx >= output_s_num_trials_this_run and output_s_num_trials_this_run > 0: print(f"用户 {output_s_user_session_id} 已完成第 {output_s_run_no} 轮。等待下一批或下一轮开始。") # 检查是否有未保存的全局历史,即使日志批次未满,也可能在轮次结束时考虑保存 # 但当前逻辑是仅在日志批次满时保存全局历史,这里可以保持一致或添加额外逻辑 if global_history_has_unsaved_changes: print(f"提示:轮次 {output_s_run_no} 结束,仍有未保存的全局图片对历史更改。将在下次日志批量保存时一并处理。") prog_text = f"用户ID: {output_s_user_session_id} | 进度:{output_s_num_trials_this_run}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮 🎉" ui_updates = list(create_ui_error_tuple(f"🎉 第 {output_s_run_no} 轮完成!请点击“开始试验 / 下一轮”继续或开始新批次。", prog_text)) ui_updates[7]=gr.update(interactive=True); ui_updates[8]=gr.update(interactive=False); ui_updates[9]=gr.update(interactive=False) ui_updates[0]=gr.update(value=None,visible=False); ui_updates[1]=gr.update(value=None,visible=False); ui_updates[2]=gr.update(value=None,visible=False) yield output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_updates; return # ... (获取并显示下一个试验的逻辑,与上一版相同,调用 get_next_trial_info 不需要 user_id for history) ... if not output_s_current_run_image_list or output_s_num_trials_this_run == 0: error_ui = create_ui_error_tuple("错误: 无法加载试验图片 (列表为空)", f"用户ID: {user_id_display_text} | 进度: N/A") return output_s_trial_idx, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, [], 0, *error_ui trial_info, next_s_trial_idx_for_state = get_next_trial_info( output_s_trial_idx, output_s_current_run_image_list, output_s_num_trials_this_run ) if trial_info is None: print(f"错误:用户 {user_identifier_for_logging},轮次 {output_s_run_no},试验 {output_s_trial_idx}: get_next_trial_info 返回 None。") error_msg_display = "无法加载下一个试验,可能是因为候选图片不足或所有唯一组合已用尽。" if len(METHOD_ROOTS) * len(SUBJECTS) < 2 : error_msg_display = "候选图片来源不足,无法形成对比试验。" error_ui_updates = create_ui_error_tuple(error_msg_display, f"用户ID: {user_id_display_text} | 进度:{output_s_trial_idx}/{output_s_num_trials_this_run}") output_s_current_trial_data = {"data": None} return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates output_s_current_trial_data = {"data": trial_info} prog_text = f"用户ID: {output_s_user_session_id} | 进度:{trial_info['cur_no']}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮" ui_show_target_updates = list(create_no_change_tuple()) ui_show_target_updates[0]=gr.update(value=trial_info["target_path"],visible=True); ui_show_target_updates[1]=gr.update(value=None,visible=False); ui_show_target_updates[2]=gr.update(value=None,visible=False) ui_show_target_updates[3]=""; ui_show_target_updates[4]=""; ui_show_target_updates[5]="请观察原图…"; ui_show_target_updates[6]=prog_text ui_show_target_updates[7]=gr.update(interactive=False); ui_show_target_updates[8]=gr.update(interactive=False); ui_show_target_updates[9]=gr.update(interactive=False) yield next_s_trial_idx_for_state, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_target_updates time.sleep(3) ui_show_candidates_updates = list(create_no_change_tuple()) ui_show_candidates_updates[0]=gr.update(value=None,visible=False); ui_show_candidates_updates[1]=gr.update(value=trial_info["left_path"],visible=True); ui_show_candidates_updates[2]=gr.update(value=trial_info["right_path"],visible=True) ui_show_candidates_updates[3]=gr.update(value=trial_info["left_display_label"], visible=False); ui_show_candidates_updates[4]=gr.update(value=trial_info["right_display_label"], visible=False) ui_show_candidates_updates[5]="请选择更像原图的一张"; ui_show_candidates_updates[6]=prog_text ui_show_candidates_updates[7]=gr.update(interactive=False); ui_show_candidates_updates[8]=gr.update(interactive=True); ui_show_candidates_updates[9]=gr.update(interactive=True) yield next_s_trial_idx_for_state, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_candidates_updates # ==== Gradio UI 定义 和 程序入口 (保持不变) ==== # ... (welcome_page_markdown, handle_agree_and_start, gr.Blocks, if __name__ == "__main__": etc.) ... welcome_page_markdown = """ ## 欢迎加入实验! 您好!非常感谢您抽出宝贵时间参与我们的视觉偏好评估实验。您的选择将帮助我们改进重建算法,让机器生成的图像更贴近人类视觉体验! 1. **实验目的** 通过比较两幅 重建图像 与原始 目标图像 的相似度。 2. **操作流程** * 点击下方的「我已阅读并同意开始实验」按钮。 * 然后点击主实验界面的「开始试验 / 下一轮」按钮。 * 系统先展示一张 **目标图像**,持续 3 秒。 * 随后自动切换到 **两张重建图像**。 * 根据刚才的观察记忆,选出您认为与目标图像最相似的一张。 * 选择后系统会自动进入下一轮比较。 3. **温馨提示** * 请勿刷新或关闭页面,以免中断实验。 * 若图片加载稍有延迟,请耐心等待;持续异常可联系邮箱 yangminghan@bupt.edu.cn。 * 本实验将保护您的任何个人隐私信息,所有数据仅用于学术研究,请您认真选择和填写。 4. **奖励说明** * 完成全部轮次后,请截图记录您所完成的实验总数(可累积,页面左下角将显示进度,请保证截取到为您分配的ID,轮次)。 * 将截图发送至邮箱 yangminghan@bupt.edu.cn,我们将在核验后发放奖励。 再次感谢您的参与与支持!您每一次认真选择都对我们的研究意义重大。祝您一切顺利,实验愉快! """ def handle_agree_and_start(name, gender, age, education, request: gr.Request): error_messages_list = [] if not name or str(name).strip() == "": error_messages_list.append("姓名 不能为空。") if gender is None or str(gender).strip() == "": error_messages_list.append("性别 必须选择。") if age is None: error_messages_list.append("年龄 不能为空。") elif not (isinstance(age, (int, float)) and 1 <= age <= 120): try: num_age = float(age); except (ValueError, TypeError): error_messages_list.append("年龄必须是一个有效的数字。") else: if not (1 <= num_age <= 120): error_messages_list.append("年龄必须在 1 到 120 之间。") if education is None or str(education).strip() == "其他": error_messages_list.append("学历 必须选择。") if error_messages_list: full_error_message = "请修正以下错误:\n" + "\n".join([f"- {msg}" for msg in error_messages_list]) print(f"用户输入验证失败: {full_error_message}") return gr.update(), False, gr.update(visible=True), gr.update(visible=False), full_error_message s_name = str(name).strip().replace(" ","_").replace("/","_").replace("\\","_") s_gender = str(gender).strip().replace(" ","_").replace("/","_").replace("\\","_") s_age = str(int(float(age))) s_education = str(education).strip().replace(" ","_").replace("/","_").replace("\\","_") user_id_str = f"N-{s_name}_G-{s_gender}_A-{s_age}_E-{s_education}" print(f"用户信息收集完毕,生成用户ID: {user_id_str}") return user_id_str, True, gr.update(visible=False), gr.update(visible=True), "" with gr.Blocks(css=CSS, title="图像重建主观评估") as demo: s_show_experiment_ui = gr.State(False); s_trial_index = gr.State(0); s_run_no = gr.State(1) s_user_logs = gr.State([]); s_current_trial_data = gr.State({}); s_user_session_id = gr.State(None) s_current_run_image_list = gr.State([]); s_num_trials_this_run = gr.State(0) welcome_container = gr.Column(visible=True) experiment_container = gr.Column(visible=False) with welcome_container: gr.Markdown(welcome_page_markdown) with gr.Row(): user_name_input = gr.Textbox(label="请输入您的姓名或代号 (例如 张三 或 User001)", placeholder="例如:张三 -> ZS"); user_gender_input = gr.Radio(label="性别", choices=["男", "女"]) with gr.Row(): user_age_input = gr.Number(label="年龄 (请输入1-120的整数)", minimum=1, maximum=120, step=1); user_education_input = gr.Dropdown(label="学历", choices=["其他","初中及以下","高中(含中专)", "大专(含在读)", "本科(含在读)", "硕士(含在读)", "博士(含在读)"]) welcome_error_msg = gr.Markdown(value="") btn_agree_and_start = gr.Button("我已阅读上述说明并同意参与实验") with experiment_container: gr.Markdown("## 🧠 图像重建主观评估实验"); gr.Markdown(f"每轮实验大约有 {NUM_TRIALS_PER_RUN} 次比较。") with gr.Row(): with gr.Column(scale=1, min_width=300): left_img = gr.Image(label="左候选图", visible=False, height=400, interactive=False); left_lbl = gr.Textbox(label="左图信息", visible=False, interactive=False, max_lines=1); btn_left = gr.Button("选择左图 (更相似)", interactive=False, elem_classes="compact_button") with gr.Column(scale=1, min_width=300): right_img = gr.Image(label="右候选图", visible=False, height=400, interactive=False); right_lbl = gr.Textbox(label="右图信息", visible=False, interactive=False, max_lines=1); btn_right = gr.Button("选择右图 (更相似)", interactive=False, elem_classes="compact_button") with gr.Row(): target_img = gr.Image(label="目标图像 (观察3秒后消失)", visible=False, height=400, interactive=False) with gr.Row(): status_text = gr.Markdown(value="请点击“开始试验 / 下一轮”按钮。") with gr.Row(): progress_text = gr.Markdown() with gr.Row(): btn_start = gr.Button("开始试验 / 下一轮") file_out_placeholder = gr.File(label=" ", visible=False, interactive=False) outputs_ui_components_definition = [ target_img, left_img, right_img, left_lbl, right_lbl, status_text, progress_text, btn_start, btn_left, btn_right, file_out_placeholder ] click_inputs_base = [ s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id, s_current_run_image_list, s_num_trials_this_run ] event_outputs = [ s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id, s_current_run_image_list, s_num_trials_this_run, *outputs_ui_components_definition ] btn_agree_and_start.click(fn=handle_agree_and_start, inputs=[user_name_input, user_gender_input, user_age_input, user_education_input], outputs=[s_user_session_id, s_show_experiment_ui, welcome_container, experiment_container, welcome_error_msg]) btn_start.click(fn=partial(process_experiment_step, action_type="start_experiment"), inputs=click_inputs_base, outputs=event_outputs, queue=True) btn_left.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="left"), inputs=click_inputs_base, outputs=event_outputs, queue=True) btn_right.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="right"), inputs=click_inputs_base, outputs=event_outputs, queue=True) if __name__ == "__main__": if not master_image_list: print("\n关键错误:程序无法启动,因无目标图片。"); exit() else: print(f"从 '{TARGET_DIR}' 加载 {len(master_image_list)} 张目标图片。每轮选 {NUM_TRIALS_PER_RUN} 张。") if not METHOD_ROOTS: print(f"警告: '{BASE_IMAGE_DIR}' 无候选方法子目录。") else: print(f"方法根目录: {METHOD_ROOTS}") if not SUBJECTS: print("警告: SUBJECTS 列表为空。") else: print(f"Subjects: {SUBJECTS}") print(f"用户选择日志保存到 Dataset: '{DATASET_REPO_ID}' 的 '{BATCH_LOG_FOLDER}/' 文件夹") if not os.getenv("HF_TOKEN"): print("警告: HF_TOKEN 未设置。日志无法保存到Hugging Face Dataset。\n 请在 Space Secrets 中设置 HF_TOKEN。") else: print("HF_TOKEN 已找到。") print(f"全局图片对历史将从 '{GLOBAL_HISTORY_FILE}' 加载/保存到此文件。") path_to_allow_serving_from = BASE_IMAGE_DIR allowed_paths_list = [] if os.path.exists(path_to_allow_serving_from) and os.path.isdir(path_to_allow_serving_from): allowed_paths_list.append(os.path.abspath(path_to_allow_serving_from)) print(f"Gradio `demo.launch()` 配置 allowed_paths: {allowed_paths_list}") else: print(f"关键警告:图片基础目录 '{path_to_allow_serving_from}' ({os.path.abspath(path_to_allow_serving_from) if path_to_allow_serving_from else 'N/A'}) 不存在或非目录。") print("启动 Gradio 应用...") if allowed_paths_list: demo.launch(allowed_paths=allowed_paths_list) else: demo.launch()