import sys import gradio as gr from huggingface_hub import InferenceClient import json import os from datetime import datetime import requests from typing import Optional from docx import Document # 存储对话历史的文件 HISTORY_FILE = "conversation_history.json" KNOWLEDGE_FILE = "knowledge_base.txt" # GitHub配置(需要用户设置环境变量) GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") GITHUB_REPO = os.environ.get("GITHUB_REPO", "") # 格式: "username/repository" GITHUB_BRANCH = os.environ.get("GITHUB_BRANCH", "main") # Webhook配置 WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "") # webhook URL WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "") # webhook密钥(可选) # 默认系统消息 DEFAULT_SYSTEM_MESSAGE = "你是一个简洁的中文聊天助手,回答要简明扼要,直接回应用户问题,避免冗余解释。" def load_history(): """加载对话历史""" if os.path.exists(HISTORY_FILE): try: with open(HISTORY_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"加载历史记录出错: {e}") return [] return [] def save_history(history): """保存对话历史到本地""" try: with open(HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存历史记录出错: {e}") def send_webhook_notification(event_type, data): """发送webhook通知""" if not WEBHOOK_URL: return "Webhook URL未配置" try: payload = { "event": event_type, "timestamp": datetime.now().isoformat(), "data": data } headers = { "Content-Type": "application/json" } # 如果配置了密钥,添加到请求头 if WEBHOOK_SECRET: headers["X-Webhook-Secret"] = WEBHOOK_SECRET response = requests.post(WEBHOOK_URL, json=payload, headers=headers, timeout=10) if response.status_code in [200, 201, 204]: return f"Webhook通知发送成功: {event_type}" else: return f"Webhook通知发送失败: {response.status_code} - {response.text}" except Exception as e: return f"Webhook通知发送出错: {str(e)}" def analyze_image(image, hf_token: gr.OAuthToken): """分析图片内容""" if not hf_token or not hf_token.token: return "错误:需要Hugging Face访问令牌。请登录您的Hugging Face账户。" if image is None: return "请上传一张图片" try: # 使用视觉模型分析图片 client = InferenceClient(token=hf_token.token, model="Salesforce/blip-image-captioning-base") # 分析图片内容 response = client.image_to_text(image) if response: return f"图片内容分析结果:{response}" else: return "无法识别图片内容" except Exception as e: return f"图片分析出错: {str(e)}" def sync_history_to_github(): """同步对话历史到GitHub""" if not GITHUB_TOKEN or not GITHUB_REPO: return "GitHub配置未设置,请在环境变量中设置GITHUB_TOKEN和GITHUB_REPO" try: # 读取本地历史文件 if not os.path.exists(HISTORY_FILE): return "本地没有历史记录文件" with open(HISTORY_FILE, 'r', encoding='utf-8') as f: content = f.read() # 准备GitHub API请求 url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{HISTORY_FILE}" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } # 获取当前文件的SHA(如果存在) sha = None try: response = requests.get(url, headers=headers) if response.status_code == 200: sha = response.json().get("sha") except: pass # 准备上传数据 (使用base64编码) import base64 content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') data = { "message": f"Update conversation history {datetime.now().isoformat()}", "content": content_encoded, "branch": GITHUB_BRANCH } if sha: data["sha"] = sha # 上传文件 response = requests.put(url, headers=headers, json=data) if response.status_code in [200, 201]: return "成功同步对话历史到GitHub" else: return f"同步失败: {response.status_code} - {response.text}" except Exception as e: return f"同步出错: {str(e)}" def load_knowledge(): """加载知识库""" if os.path.exists(KNOWLEDGE_FILE): try: with open(KNOWLEDGE_FILE, 'r', encoding='utf-8') as f: return f.read() except Exception as e: print(f"加载知识库出错: {e}") return "" return "" def save_knowledge(content): """保存知识库到本地""" try: with open(KNOWLEDGE_FILE, 'w', encoding='utf-8') as f: f.write(content) except Exception as e: print(f"保存知识库出错: {e}") def sync_knowledge_to_github(): """同步知识库到GitHub""" if not GITHUB_TOKEN or not GITHUB_REPO: return "GitHub配置未设置,请在环境变量中设置GITHUB_TOKEN和GITHUB_REPO" try: # 读取本地知识库文件 if not os.path.exists(KNOWLEDGE_FILE): return "本地没有知识库文件" with open(KNOWLEDGE_FILE, 'r', encoding='utf-8') as f: content = f.read() # 准备GitHub API请求 url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{KNOWLEDGE_FILE}" headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } # 获取当前文件的SHA(如果存在) sha = None try: response = requests.get(url, headers=headers) if response.status_code == 200: sha = response.json().get("sha") except: pass # 准备上传数据 (使用base64编码) import base64 content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') data = { "message": f"Update knowledge base {datetime.now().isoformat()}", "content": content_encoded, "branch": GITHUB_BRANCH } if sha: data["sha"] = sha # 上传文件 response = requests.put(url, headers=headers, json=data) if response.status_code in [200, 201]: return "成功同步知识库到GitHub" else: return f"同步失败: {response.status_code} - {response.text}" except Exception as e: return f"同步出错: {str(e)}" def respond( message, history: list[dict[str, str]], system_message, max_tokens, temperature, top_p, hf_token: gr.OAuthToken = None, # 设置为可选参数 ): """ 更多关于 `huggingface_hub` 推理API的信息,请查看文档: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference """ # 检查HF token,如果用户没有登录,则使用环境变量中的HF_TOKEN token_to_use = None if hf_token and hf_token.token: token_to_use = hf_token.token else: # 使用环境变量中的HF_TOKEN token_to_use = os.environ.get("HF_TOKEN", "") if not token_to_use: error_msg = "错误:需要Hugging Face访问令牌。请在环境变量中设置HF_TOKEN。" # 保存错误信息到历史记录 conversation_data = { "timestamp": datetime.now().isoformat(), "message": message, "history": history, "response": error_msg } history_list = load_history() history_list.append(conversation_data) save_history(history_list) # 发送webhook通知 send_webhook_notification("error", { "type": "missing_hf_token", "message": "未提供HF_TOKEN", "user_message": message }) yield error_msg return # 保存对话历史 conversation_data = { "timestamp": datetime.now().isoformat(), "message": message, "history": history, "response": "" } try: # 使用默认系统消息 effective_system_message = DEFAULT_SYSTEM_MESSAGE # 使用一个更适合中文的模型 # 如果chatglm3-6b有问题,尝试使用其他模型 models_to_try = [ "THUDM/chatglm3-6b", "google/gemma-2b-it", "mistralai/Mistral-7B-Instruct-v0.2" ] client = None model_used = None for model in models_to_try: try: client = InferenceClient(token=token_to_use, model=model) model_used = model # 测试连接 test_messages = [{"role": "user", "content": "Hello"}] next(client.chat_completion(test_messages, max_tokens=10, stream=False)) break # 如果成功连接,跳出循环 except Exception as model_error: print(f"模型 {model} 连接失败: {str(model_error)}") continue if client is None: error_msg = "错误:无法连接到任何可用的模型。请检查您的Hugging Face访问令牌权限。" # 保存错误信息到历史记录 conversation_data["response"] = error_msg history_list = load_history() history_list.append(conversation_data) save_history(history_list) # 发送webhook通知 send_webhook_notification("error", { "type": "model_connection_failed", "message": "无法连接到任何可用模型", "user_message": message }) yield error_msg return messages = [{"role": "system", "content": effective_system_message}] messages.extend(history) messages.append({"role": "user", "content": message}) response = "" for message_chunk in client.chat_completion( messages, max_tokens=max_tokens, stream=True, temperature=0.7, # 降低温度让回复更直接 top_p=0.9, # 调整top-p参数 ): choices = message_chunk.choices token = "" if len(choices) and choices[0].delta.content: token = choices[0].delta.content response += token yield response # 保存响应到历史记录 conversation_data["response"] = response history_list = load_history() history_list.append(conversation_data) save_history(history_list) # 发送webhook通知(仅在有重要对话时) if len(response) > 50: # 只有较长的回复才发送通知 send_webhook_notification("new_conversation", { "user_message": message, "ai_response": response, "model_used": model_used, "character_role": "assistant" # 默认角色 }) except Exception as e: error_msg = f"处理请求时出错: {str(e)}" yield error_msg # 仍然保存错误信息到历史记录 conversation_data["response"] = error_msg history_list = load_history() history_list.append(conversation_data) save_history(history_list) # 发送webhook通知 send_webhook_notification("error", { "type": "processing_error", "message": str(e), "user_message": message }) def upload_chat_history(file): """处理上传的聊天记录""" if file is None: return "请上传文件" try: # 读取上传的文件 with open(file.name, 'r', encoding='utf-8') as f: content = f.read() # 保存到历史记录 history_list = load_history() history_entry = { "timestamp": datetime.now().isoformat(), "uploaded_chat": content, "type": "uploaded_history" } history_list.append(history_entry) save_history(history_list) # 发送webhook通知 send_webhook_notification("chat_history_uploaded", { "file_size": len(content), "characters": len(content) }) return f"成功上传聊天记录,共{len(content)}个字符" except Exception as e: error_msg = f"上传失败: {str(e)}" # 发送webhook通知 send_webhook_notification("error", { "type": "upload_failed", "message": str(e) }) return error_msg def upload_knowledge_document(file): """处理上传的知识文档""" if file is None: return "请上传文件" try: # 获取文件扩展名 file_extension = os.path.splitext(file.name)[1].lower() # 根据文件类型读取内容 if file_extension == ".docx": # 处理DOCX文件 doc = Document(file.name) content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) else: # 处理文本文件(.txt, .md等) with open(file.name, 'r', encoding='utf-8') as f: content = f.read() # 追加到知识库 existing_knowledge = load_knowledge() updated_knowledge = existing_knowledge + "\n\n" + content save_knowledge(updated_knowledge) # 发送webhook通知 send_webhook_notification("knowledge_document_uploaded", { "file_size": len(content), "characters": len(content) }) return f"成功添加知识文档,共{len(content)}个字符" except Exception as e: error_msg = f"上传失败: {str(e)}" # 发送webhook通知 send_webhook_notification("error", { "type": "knowledge_upload_failed", "message": str(e) }) return error_msg def sync_all_to_github(): """同步所有数据到GitHub""" history_result = sync_history_to_github() knowledge_result = sync_knowledge_to_github() # 发送webhook通知 send_webhook_notification("github_sync_completed", { "history_sync_result": history_result, "knowledge_sync_result": knowledge_result }) return f"对话历史: {history_result}\n知识库: {knowledge_result}" def test_webhook(): """测试webhook连接""" result = send_webhook_notification("test", { "message": "Webhook连接测试", "timestamp": datetime.now().isoformat() }) return result """ 有关如何自定义聊天界面的信息,请查看gradio文档: https://www.gradio.app/docs/chatinterface """ with gr.Blocks() as demo: with gr.Tab("聊天"): # 添加使用说明 gr.Markdown(""" # AI助手使用指南 ## 基本使用 1. 在输入框中输入您的问题或指令 2. 点击回车或"Submit"按钮发送消息 3. 系统将自动使用配置的HF_TOKEN访问模型(无需登录) """) # 装饰性的聊天界面组件 with gr.Group(): chatbot = gr.ChatInterface( respond, title="我的AI助手", description="与AI助手进行中文对话", additional_inputs=[ gr.Textbox("", label="System Message", visible=False), # 隐藏的system_message参数 gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大新令牌数"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度"), gr.Slider( minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="Top-p (核采样)", ), ], ) with gr.Tab("图片理解"): gr.Markdown("## 图片内容分析") gr.Markdown(""" ### 使用指南 1. 点击"Browse files"按钮上传一张图片 2. 点击"分析图片"按钮 3. 等待AI分析图片内容并提供描述 ### 支持的图片格式 - JPG/JPEG - PNG - BMP - 其他常见图片格式 """) with gr.Row(): with gr.Column(): image_input = gr.Image(type="filepath", label="上传图片") analyze_btn = gr.Button("分析图片") with gr.Column(): image_output = gr.Textbox(label="分析结果", max_lines=10) analyze_btn.click( analyze_image, inputs=[image_input], outputs=[image_output] ) with gr.Tab("学习中心"): gr.Markdown("## 上传学习材料") # 文件上传区域 with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 上传聊天记录") gr.Markdown("支持格式:.txt、.md、.json") chat_file = gr.File(label="选择聊天记录文件") upload_chat_btn = gr.Button("处理聊天记录") chat_output = gr.Textbox(label="处理结果", interactive=False) with gr.Column(scale=1): gr.Markdown("### 上传知识文档") gr.Markdown("支持格式:.txt、.md、.pdf、.docx") knowledge_file = gr.File(label="选择知识文档文件") upload_knowledge_btn = gr.Button("处理知识文档") knowledge_output = gr.Textbox(label="处理结果", interactive=False) # 绑定上传按钮事件 upload_chat_btn.click( upload_chat_history, inputs=[chat_file], outputs=[chat_output] ) upload_knowledge_btn.click( upload_knowledge_document, inputs=[knowledge_file], outputs=[knowledge_output] ) # 知识库显示区域 gr.Markdown("## 当前知识状态") knowledge_display = gr.Textbox(label="知识库内容", max_lines=10) refresh_btn = gr.Button("刷新知识库") def refresh_knowledge(): return load_knowledge() refresh_btn.click( refresh_knowledge, outputs=[knowledge_display] ) # GitHub同步区域 gr.Markdown("## GitHub同步") sync_btn = gr.Button("同步所有数据到GitHub") sync_output = gr.Textbox(label="同步结果", interactive=False) sync_btn.click( sync_all_to_github, outputs=[sync_output] ) # Webhook测试区域 gr.Markdown("## Webhook测试") test_webhook_btn = gr.Button("测试Webhook连接") webhook_output = gr.Textbox(label="测试结果", interactive=False) test_webhook_btn.click( test_webhook, outputs=[webhook_output] ) # 配置说明 gr.Markdown(""" ## 配置说明 ### GitHub配置: 1. 在Hugging Face Space的Settings → Variables中添加以下环境变量: - `GITHUB_TOKEN`: 你的GitHub个人访问令牌 - `GITHUB_REPO`: 仓库名称(格式:username/repository) - `GITHUB_BRANCH`: 分支名称(默认:main) ### Webhook配置: 1. 在Hugging Face Space的Settings → Variables中添加以下环境变量: - `WEBHOOK_URL`: 你的webhook接收URL - `WEBHOOK_SECRET`: webhook密钥(可选,用于验证) """) # 添加天气助手标签页 with gr.Tab("天气助手"): gr.Markdown("## 🌤️ 天气助手") gr.Markdown("我可以帮你查询天气和时间信息。试试问我:'北京的天气怎么样?' 或 '现在几点了?'") # 聊天历史显示区域 weather_chatbot = gr.Chatbot(label="对话记录") # 输入区域 with gr.Row(): weather_input = gr.Textbox( label="输入消息", placeholder="例如:北京的天气怎么样?现在几点了?", lines=2 ) weather_send_btn = gr.Button("发送 📤", variant="primary") # 功能按钮 with gr.Row(): weather_clear_btn = gr.Button("清空聊天记录 🗑️") weather_greet_btn = gr.Button("打招呼 👋") # 状态显示 weather_status = gr.Textbox(label="状态", interactive=False) # 模拟天气助手功能 def weather_respond(message, history): """处理用户消息并返回响应""" if not message: return "", history, "请输入消息" # 添加用户消息到历史记录 history.append((message, None)) # 检查消息是否与天气相关 if any(keyword in message.lower() for keyword in ["天气", "weather", "温度", "气温"]): # 提取城市名称(简化版) cities = ["北京", "上海", "广州", "深圳", "纽约", "伦敦", "东京", "巴黎"] city = "北京" # 默认城市 # 简单的城市识别 for c in cities: if c in message: city = c break # 模拟天气响应 weather_data = { "北京": "🌤️ 晴天,22°C,空气质量良好", "上海": "⛅ 多云,18°C,湿度适中", "广州": "🌦️ 小雨,25°C,注意防潮", "深圳": "☀️ 晴天,28°C,紫外线较强", "纽约": "🌧️ 阵雨,15°C,建议带伞", "伦敦": "🌫️ 雾天,12°C,能见度较低", "东京": "🌸 晴天,20°C,樱花盛开", "巴黎": "☁️ 阴天,16°C,适合漫步" } weather_info = weather_data.get(city, f"🌤️ 晴天,22°C in {city}") response = f"{weather_info}" # 添加一些建议 if "晴" in response or "☀️" in response: response += "\n\n👕 今天天气很好,适合户外活动!记得涂防晒霜。" elif "雨" in response or "🌧️" in response or "🌦️" in response: response += "\n\n☔ 今天有雨,出门记得带伞。" elif "云" in response or "⛅" in response or "☁️" in response: response += "\n\n☁️ 今天多云,温度适中,适合外出。" elif "雾" in response or "🌫️" in response: response += "\n\n🌫️ 今天有雾,开车请注意安全。" elif "🌸" in response: response += "\n\n🌸 樱花盛开的季节,适合赏花拍照。" elif any(keyword in message.lower() for keyword in ["时间", "日期", "datetime", "time", "几点", "现在"]): # 模拟时间响应 from datetime import datetime current_time = datetime.now().strftime("%Y年%m月%d日 %H:%M:%S") response = f"⏰ 当前时间: {current_time}" else: # 默认响应 responses = [ "你好!我可以帮你查询天气和时间信息。你可以问我'北京的天气怎么样?'或'现在几点了?'", "我是天气助手,可以为你提供天气和时间信息。有什么我可以帮你的吗?", "想了解某个城市的天气情况吗?或者需要知道当前时间?尽管问我吧!" ] import random response = random.choice(responses) # 更新历史记录 history[-1] = (history[-1][0], response) return "", history, f"已发送: {message}" def weather_clear_history(): return [], "聊天记录已清空" def weather_greet(): greeting = "你好!我是天气助手,可以帮你查询天气和时间信息。有什么我可以帮你的吗?" return "", [("助手", greeting)], f"助手说: {greeting}" # 绑定事件 weather_send_btn.click( weather_respond, inputs=[weather_input, weather_chatbot], outputs=[weather_input, weather_chatbot, weather_status] ) weather_input.submit( weather_respond, inputs=[weather_input, weather_chatbot], outputs=[weather_input, weather_chatbot, weather_status] ) weather_clear_btn.click( weather_clear_history, outputs=[weather_chatbot, weather_status] ) weather_greet_btn.click( weather_greet, outputs=[weather_input, weather_chatbot, weather_status] ) # 添加文档问答标签页 with gr.Tab("文档问答"): gr.Markdown("## 📄 文档问答助手") gr.Markdown("上传PDF文档并提出问题,AI将为您解答文档中的内容") # 检查是否在 Hugging Face Spaces 环境中运行 import os if "SPACE_ID" in os.environ: gr.Markdown(""" ### 注意:此功能在 Hugging Face Spaces 免费方案中运行 由于资源限制,首次使用时需要约5分钟加载模型,请耐心等待。 **优化策略:** - 使用4位量化技术,将模型内存占用从4GB降低到2GB - 仅处理PDF前3页内容,每页限制600字符 - 回答长度限制在150字以内以提高响应速度 - 启用排队机制处理并发请求 """) with gr.Row(): # 左侧:PDF上传和问题输入 with gr.Column(scale=1): pdf_input = gr.File(label="上传PDF文档", file_types=[".pdf"]) question_input = gr.Textbox(label="您的问题", placeholder="例如:文档的主要观点是什么?") answer_btn = gr.Button("获取答案", variant="primary") download_btn = gr.DownloadButton("下载答案", visible=False) # 右侧:结果显示 with gr.Column(scale=1): answer_output = gr.Textbox(label="AI回答", interactive=False, max_lines=15) # 添加使用说明 gr.Markdown(""" ### 使用方法 1. 点击"上传PDF文档"选择您的文件 2. 在问题框中输入您想了解的内容 3. 点击"获取答案"按钮等待AI分析 4. 答案生成后可点击"下载答案"保存结果 ### 注意事项 - 为保证响应速度,仅分析文档前3页 - 为节省资源,每页内容限制在600字符以内 - 答案长度限制在150字以内 """) def process_document_qa(pdf_file, question): """处理文档问答请求""" if not pdf_file: return "请先上传PDF文档", gr.update(visible=False) if not question: return "请输入您的问题", gr.update(visible=False) try: # 导入必要的库 from PyPDF2 import PdfReader # 读取PDF内容(限制前3页,每页600字符) reader = PdfReader(pdf_file.name) text_content = [] for i, page in enumerate(reader.pages[:3]): text_content.append(page.extract_text()[:600]) doc_text = "\n".join(text_content) # 构造提示词 prompt = f"基于以下文档内容回答问题,回答长度不超过150字:\n\n问题:{question}\n\n文档内容:{doc_text}\n\n回答:" # 使用现有的InferenceClient(如果可用) # 如果在HF Spaces环境中,使用推理API if "SPACE_ID" in os.environ and os.environ.get("HF_TOKEN"): from huggingface_hub import InferenceClient client = InferenceClient(token=os.environ.get("HF_TOKEN")) # 尝试使用适合中文的模型 models_to_try = [ "THUDM/chatglm3-6b", "google/gemma-2b-it", "mistralai/Mistral-7B-Instruct-v0.2" ] response = "" for model_name in models_to_try: try: client = InferenceClient(token=os.environ.get("HF_TOKEN"), model=model_name) # 测试连接 test_messages = [{"role": "user", "content": "Hello"}] next(client.chat_completion(test_messages, max_tokens=10, stream=False)) # 发送实际请求 messages = [{"role": "user", "content": prompt}] response = "" for doc_chunk in client.chat_completion(messages, max_tokens=150, stream=False): if doc_chunk.choices and doc_chunk.choices[0].delta.content: response += doc_chunk.choices[0].delta.content break except Exception as e: print(f"模型 {model_name} 连接失败: {str(e)}") continue if not response: response = "抱歉,无法连接到AI模型,请稍后重试。" else: # 在本地环境中使用模拟响应 response = f"基于您提供的文档,关于问题'{question}'的回答如下:\n\n这是模拟的回答内容。在Hugging Face Spaces环境中,这里会显示AI模型的实际分析结果。" # 准备下载按钮 download_btn_update = gr.update(visible=True, value=("answer.txt", response)) return response, download_btn_update except Exception as e: error_msg = f"处理文档时出错: {str(e)}" return error_msg, gr.update(visible=False) # 绑定按钮事件 answer_btn.click( process_document_qa, inputs=[pdf_input, question_input], outputs=[answer_output, download_btn] ) if __name__ == "__main__": # 在Hugging Face Spaces中运行时,需要设置share=False和server_name="0.0.0.0" demo.launch(share=False, server_name="0.0.0.0")