Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import gradio as gr | |
| import pandas as pd | |
| import requests | |
| import json | |
| import faiss | |
| import nest_asyncio | |
| import sys | |
| import boto3 | |
| from pathlib import Path | |
| from bs4 import BeautifulSoup | |
| from typing import Union, List | |
| import asyncio | |
| from anthropic import Anthropic | |
| from openai import OpenAI | |
| from llama_index.core import ( | |
| StorageContext, | |
| ServiceContext, | |
| VectorStoreIndex, | |
| Settings, | |
| load_index_from_storage | |
| ) | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.core.llms import ChatMessage | |
| from llama_index.core.schema import IndexNode | |
| from llama_index.core.storage.docstore import SimpleDocumentStore | |
| from llama_index.retrievers.bm25 import BM25Retriever | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| # from llama_index.vector_stores.faiss import FaissVectorStore | |
| from llama_index.core.retrievers import QueryFusionRetriever | |
| from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step | |
| from llama_index.core.schema import NodeWithScore | |
| from llama_index.core.prompts import PromptTemplate | |
| from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer | |
| from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") | |
| aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| anthropic_api_key=os.getenv("ANTHROPIC_API_KEY") | |
| embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") | |
| Settings.embed_model = embed_model | |
| Settings.context_window = 20000 | |
| Settings.chunk_size = 2048 | |
| Settings.similarity_top_k = 20 | |
| # Параметри S3 | |
| BUCKET_NAME = "legal-position" | |
| PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити | |
| LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 | |
| # Ініціалізація клієнта S3 | |
| s3_client = boto3.client( | |
| "s3", | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key, | |
| region_name="eu-north-1" | |
| ) | |
| # # Ініціалізація клієнта S3 | |
| # s3_client = boto3.client( | |
| # "s3", | |
| # aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), | |
| # aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| # region_name="eu-north-1" | |
| # ) | |
| # Створюємо локальну директорію, якщо вона не існує | |
| LOCAL_DIR.mkdir(parents=True, exist_ok=True) | |
| # Функція для завантаження файлу з S3 | |
| def download_s3_file(bucket_name, s3_key, local_path): | |
| s3_client.download_file(bucket_name, s3_key, str(local_path)) | |
| print(f"Завантажено: {s3_key} -> {local_path}") | |
| # Функція для завантаження всієї папки з S3 у локальну директорію | |
| def download_s3_folder(bucket_name, prefix, local_dir): | |
| response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
| if 'Contents' in response: | |
| for obj in response['Contents']: | |
| s3_key = obj['Key'] | |
| # Пропускаємо "папку" (кореневий префікс) у S3 | |
| if s3_key.endswith('/'): | |
| continue | |
| # Визначаємо локальний шлях, де буде збережений файл | |
| local_file_path = local_dir / Path(s3_key).relative_to(prefix) | |
| local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно | |
| # Завантажуємо файл | |
| s3_client.download_file(bucket_name, s3_key, str(local_file_path)) | |
| print(f"Завантажено: {s3_key} -> {local_file_path}") | |
| # Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local` | |
| # download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) # !!! тимчасово відключено | |
| # PERSIST_DIR = "/home/docsa/Legal_Position/Save_index" | |
| # Apply nest_asyncio to handle nested async calls | |
| nest_asyncio.apply() | |
| class RetrieverEvent(Event): | |
| nodes: list[NodeWithScore] | |
| state_lp_json = gr.State() | |
| state_nodes = gr.State() | |
| from enum import Enum | |
| class ModelProvider(str, Enum): | |
| OPENAI = "openai" | |
| ANTHROPIC = "anthropic" | |
| class ModelName(str, Enum): | |
| # OpenAI models | |
| GPT4o = "gpt-4o" | |
| GPT4o_MINI = "gpt-4o-mini" | |
| # Anthropic models | |
| CLAUDE3_5_SONNET = "claude-3-5-sonnet-latest" | |
| CLAUDE3_5_HAIKU = "claude-3-5-haiku-latest" | |
| class LLMAnalyzer: | |
| def __init__(self, provider: ModelProvider, model_name: ModelName): | |
| self.provider = provider | |
| self.model_name = model_name | |
| if provider == ModelProvider.OPENAI: | |
| self.client = OpenAI(model=model_name) | |
| elif provider == ModelProvider.ANTHROPIC: | |
| # Додаємо API ключ при ініціалізації | |
| self.client = Anthropic(api_key=anthropic_api_key) | |
| else: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| async def analyze(self, prompt: str, response_schema: dict) -> str: | |
| if self.provider == ModelProvider.OPENAI: | |
| return await self._analyze_with_openai(prompt, response_schema) | |
| else: | |
| return await self._analyze_with_anthropic(prompt, response_schema) | |
| async def _analyze_with_openai(self, prompt: str, response_schema: dict) -> str: | |
| messages = [ | |
| ChatMessage(role="system", | |
| content="Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."), | |
| ChatMessage(role="user", content=prompt) | |
| ] | |
| # Правильний формат для response_format | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "relevant_positions_schema", # Додаємо обов'язкове поле name | |
| "schema": response_schema | |
| } | |
| } | |
| response = self.client.chat( | |
| messages=messages, | |
| response_format=response_format, | |
| temperature=0 | |
| ) | |
| return response.message.content | |
| async def _analyze_with_anthropic(self, prompt: str, response_schema: dict) -> str: | |
| response = self.client.messages.create( # Прибрали await | |
| model=self.model_name, | |
| max_tokens=2000, | |
| messages=[ | |
| { | |
| "role": "assistant", | |
| "content": "Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| return response.content[0].text | |
| class PrecedentAnalysisWorkflow(Workflow): | |
| def __init__(self, provider: ModelProvider = ModelProvider.OPENAI, | |
| model_name: ModelName = ModelName.GPT4o_MINI): | |
| super().__init__() | |
| self.analyzer = LLMAnalyzer(provider, model_name) | |
| async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent: | |
| try: | |
| # Отримуємо параметри з події з дефолтними значеннями | |
| query = ev.get("query", "") | |
| question = ev.get("question", "") | |
| nodes = ev.get("nodes", []) | |
| # Перевірка на пусті значення | |
| if not query: | |
| return StopEvent(result="Помилка: Не надано текст нового рішення (query)") | |
| if not nodes: | |
| return StopEvent(result="Помилка: Не надано правові позиції для аналізу (nodes)") | |
| # Підготовка контексту | |
| context_parts = [] | |
| for i, node in enumerate(nodes, 1): | |
| node_text = node.node.text if hasattr(node, 'node') else node.text | |
| metadata = node.node.metadata if hasattr(node, 'node') else node.metadata | |
| lp_id = metadata.get('lp_id', f'unknown_{i}') | |
| context_parts.append(f"Source {i} (ID: {lp_id}):\n{node_text}") | |
| context_str = "\n\n".join(context_parts) | |
| # Схема відповіді | |
| response_schema = { | |
| "type": "object", | |
| "properties": { | |
| "relevant_positions": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "lp_id": {"type": "string"}, | |
| "source_index": {"type": "string"}, | |
| "description": {"type": "string"} | |
| }, | |
| "required": ["lp_id", "source_index", "description"] | |
| } | |
| } | |
| }, | |
| "required": ["relevant_positions"] | |
| } | |
| # Формування промпту | |
| prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( | |
| query=query, | |
| question=question if question else "Загальний аналіз релевантності", | |
| context_str=context_str | |
| ) | |
| # Отримання відповіді від моделі | |
| response_content = await self.analyzer.analyze(prompt, response_schema) | |
| try: | |
| parsed_response = json.loads(response_content) | |
| if "relevant_positions" in parsed_response: | |
| response_lines = [] | |
| for position in parsed_response["relevant_positions"]: | |
| position_text = ( | |
| f"* [{position['source_index']}] {position['description']} " | |
| ) | |
| response_lines.append(position_text) | |
| response_text = "\n".join(response_lines) | |
| return StopEvent(result=response_text) | |
| else: | |
| return StopEvent(result="Не знайдено релевантних правових позицій") | |
| except json.JSONDecodeError: | |
| return StopEvent(result="Помилка обробки відповіді від AI") | |
| except Exception as e: | |
| return StopEvent(result=f"Error during analysis: {str(e)}") | |
| # Формування промпту та отримання відповіді | |
| prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( | |
| query=query, | |
| question=question if question else "Загальний аналіз релевантності", | |
| context_str=context_str | |
| ) | |
| messages = [ | |
| ChatMessage(role="system", content="Ти - кваліфікований юрист-аналітик."), | |
| ChatMessage(role="user", content=prompt) | |
| ] | |
| response = llm_analyse.chat( | |
| messages=messages, | |
| response_format=response_format | |
| ) | |
| try: | |
| parsed_response = json.loads(response.message.content) | |
| if "relevant_positions" in parsed_response: | |
| # Форматуємо результат | |
| response_lines = [] | |
| for position in parsed_response["relevant_positions"]: | |
| position_text = ( | |
| f"* [{position['source_index']}]: {position['description']} " | |
| ) | |
| response_lines.append(position_text) | |
| response_text = "\n".join(response_lines) | |
| return StopEvent(result=response_text) | |
| else: | |
| return StopEvent(result="Помилка: відповідь не містить аналізу правових позицій") | |
| except json.JSONDecodeError: | |
| return StopEvent(result="Помилка обробки відповіді від AI") | |
| def parse_doc_ids(doc_ids): | |
| if doc_ids is None: | |
| return [] | |
| if isinstance(doc_ids, list): | |
| return [str(id).strip('[]') for id in doc_ids] | |
| if isinstance(doc_ids, str): | |
| cleaned = doc_ids.strip('[]').replace(' ', '') | |
| if cleaned: | |
| return [id.strip() for id in cleaned.split(',')] | |
| return [] | |
| def get_links_html(doc_ids): | |
| parsed_ids = parse_doc_ids(doc_ids) | |
| if not parsed_ids: | |
| return "" | |
| links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" | |
| for doc_id in parsed_ids] | |
| return ", ".join(links) | |
| def parse_lp_ids(lp_ids): | |
| if lp_ids is None: | |
| return [] | |
| if isinstance(lp_ids, (str, int)): | |
| cleaned = str(lp_ids).strip('[]').replace(' ', '') | |
| if cleaned: | |
| return [cleaned] | |
| return [] | |
| def get_links_html_lp(lp_ids): | |
| parsed_ids = parse_lp_ids(lp_ids) | |
| if not parsed_ids: | |
| return "" | |
| links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids] | |
| return ", ".join(links) | |
| def initialize_components(): | |
| try: | |
| # Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3 | |
| persist_path = Path("Save_Index_Local") | |
| # Перевірка існування локальної директорії | |
| if not persist_path.exists(): | |
| raise FileNotFoundError(f"Directory not found: {persist_path}") | |
| # Перевірка наявності необхідних файлів і папок | |
| required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] | |
| missing_files = [f for f in required_files if not (persist_path / f).exists()] | |
| if missing_files: | |
| raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") | |
| # Ініціалізація компонентів | |
| global retriever_bm25 | |
| # Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json` | |
| docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json")) | |
| # Ініціалізація `BM25Retriever` з папки `bm25_retriever_es` | |
| bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) | |
| # Ініціалізація `QueryFusionRetriever` з налаштуваннями | |
| retriever_bm25 = QueryFusionRetriever( | |
| [ | |
| bm25_retriever, | |
| ], | |
| similarity_top_k=Settings.similarity_top_k, | |
| num_queries=1, | |
| use_async=True, | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error initializing components: {str(e)}", file=sys.stderr) | |
| return False | |
| def extract_court_decision_text(url): | |
| response = requests.get(url) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| unwanted_texts = [ | |
| "Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", | |
| "З метою упередження перешкоджанню стабільній роботі Реєстру" | |
| ] | |
| decision_text = "" | |
| for paragraph in soup.find_all('p'): | |
| text = paragraph.get_text(separator="\n").strip() | |
| if not any(unwanted_text in text for unwanted_text in unwanted_texts): | |
| decision_text += text + "\n" | |
| return decision_text.strip() | |
| # Constants for JSON schema | |
| LEGAL_POSITION_SCHEMA = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "lp_schema", | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "title": {"type": "string", "description": "Title of the legal position"}, | |
| "text": {"type": "string", "description": "Text of the legal position"}, | |
| "proceeding": {"type": "string", "description": "Type of court proceedings"}, | |
| "category": {"type": "string", "description": "Category of the legal position"}, | |
| }, | |
| "required": ["title", "text", "proceeding", "category"], | |
| "additionalProperties": False | |
| }, | |
| "strict": True | |
| } | |
| } | |
| def generate_legal_position(court_decision_text, comment_input): | |
| """ | |
| Генерує правову позицію на основі тексту судового рішення. | |
| Args: | |
| court_decision_text (str): Текст судового рішення для аналізу | |
| user_question (str): Питання користувача (наразі не використовується) | |
| Returns: | |
| dict: Словник з правовою позицією або повідомленням про помилку | |
| """ | |
| try: | |
| # Ініціалізація моделі | |
| llm_lp = OpenAI( | |
| model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", | |
| temperature=0 | |
| ) | |
| # Формування повідомлень для чату | |
| # Формуємо контент з урахуванням коментаря | |
| content = LEGAL_POSITION_PROMPT.format( | |
| court_decision_text=court_decision_text, | |
| comment=comment_input if comment_input else "Коментар відсутній" | |
| ) | |
| # Формування повідомлень для чату | |
| messages = [ | |
| ChatMessage(role="system", content=SYSTEM_PROMPT), | |
| ChatMessage(role="user", content=content), | |
| ] | |
| # Отримання відповіді від моделі | |
| response = llm_lp.chat(messages, response_format=LEGAL_POSITION_SCHEMA) | |
| # Обробка відповіді | |
| parsed_response = json.loads(response.message.content) | |
| # Перевірка наявності обов'язкових полів | |
| if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): | |
| return parsed_response | |
| return { | |
| "title": "Error: Missing required fields in response", | |
| "text": response.message.content, | |
| "proceeding": "Unknown", | |
| "category": "Error" | |
| } | |
| except json.JSONDecodeError: | |
| return { | |
| "title": "Error parsing response", | |
| "text": response.message.content, | |
| "proceeding": "Unknown", | |
| "category": "Error" | |
| } | |
| except Exception as e: | |
| return { | |
| "title": "Unexpected error", | |
| "text": str(e), | |
| "proceeding": "Unknown", | |
| "category": "Error" | |
| } | |
| def create_gradio_interface(): | |
| async def generate_position_action(url): | |
| try: | |
| court_decision_text = extract_court_decision_text(url) | |
| legal_position_json = generate_legal_position(court_decision_text, comment_input) | |
| position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n" | |
| return position_output_content, legal_position_json | |
| except Exception as e: | |
| return f"Error during position generation: {str(e)}", None | |
| async def search_with_ai_action(legal_position_json): | |
| try: | |
| query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"] | |
| nodes = await retriever_bm25.aretrieve(query_text) | |
| sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n" | |
| for index, node in enumerate(nodes, start=1): | |
| source_title = node.node.metadata.get('title') | |
| doc_ids = node.node.metadata.get('doc_id') | |
| lp_ids = node.node.metadata.get('lp_id') | |
| links = get_links_html(doc_ids) | |
| links_lp = get_links_html_lp(lp_ids) | |
| sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n" | |
| return sources_output, nodes | |
| except Exception as e: | |
| return f"Error during search: {str(e)}", None | |
| async def analyze_action(legal_position_json, question, nodes, provider, model_name): | |
| try: | |
| workflow = PrecedentAnalysisWorkflow( | |
| provider=ModelProvider(provider), | |
| model_name=ModelName(model_name) | |
| ) | |
| query = ( | |
| f"{legal_position_json['title']}: " | |
| f"{legal_position_json['text']}: " | |
| f"{legal_position_json['proceeding']}: " | |
| f"{legal_position_json['category']}" | |
| ) | |
| response_text = await workflow.run( | |
| query=query, | |
| question=question, | |
| nodes=nodes | |
| ) | |
| output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n" | |
| output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n" | |
| analysis_lines = response_text.split('\n') | |
| for line in analysis_lines: | |
| if line.startswith('* ['): | |
| index = line[3:line.index(']')] | |
| node = nodes[int(index) - 1] | |
| source_node = node.node | |
| source_title = source_node.metadata.get('title', 'Невідомий заголовок') | |
| source_text_lp = node.text | |
| doc_ids = source_node.metadata.get('doc_id') | |
| lp_id = source_node.metadata.get('lp_id') | |
| links = get_links_html(doc_ids) | |
| links_lp = get_links_html_lp(lp_id) | |
| output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n" | |
| return output | |
| except Exception as e: | |
| return f"Error during analysis: {str(e)}" | |
| def update_model_choices(provider): | |
| if provider == ModelProvider.OPENAI.value: | |
| return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("gpt")]) | |
| else: | |
| return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("claude")]) | |
| with gr.Blocks() as app: | |
| # Далі ваш код інтерфейсу... | |
| gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення") | |
| with gr.Row(): | |
| comment_input = gr.Textbox(label="Коментар до формування короткого змісту судового рішення:") | |
| url_input = gr.Textbox(label="URL судового рішення:") | |
| question_input = gr.Textbox(label="Уточнююче питання для аналізу:") | |
| with gr.Row(): | |
| provider_dropdown = gr.Dropdown( | |
| choices=[p.value for p in ModelProvider], | |
| value=ModelProvider.OPENAI.value, | |
| label="Провайдер AI", | |
| ) | |
| model_dropdown = gr.Dropdown( | |
| choices=[m.value for m in ModelName if m.value.startswith("gpt")], | |
| value=ModelName.GPT4o_MINI.value, | |
| label="Модель", | |
| ) | |
| with gr.Row(): | |
| generate_position_button = gr.Button("Генерувати короткий зміст позиції суду") | |
| search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False) | |
| analyze_button = gr.Button("Аналіз", interactive=False) | |
| position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням") | |
| search_output = gr.Markdown(label="Результат пошуку") | |
| analysis_output = gr.Markdown(label="Результат аналізу") | |
| state_lp_json = gr.State() | |
| state_nodes = gr.State() | |
| # Підключення функцій до кнопок | |
| generate_position_button.click( | |
| fn=generate_position_action, | |
| inputs=url_input, | |
| outputs=[position_output, state_lp_json] | |
| ).then( | |
| fn=lambda: gr.update(interactive=True), | |
| inputs=None, | |
| outputs=search_with_ai_button | |
| ) | |
| search_with_ai_button.click( | |
| fn=search_with_ai_action, | |
| inputs=state_lp_json, | |
| outputs=[search_output, state_nodes] | |
| ).then( | |
| fn=lambda: gr.update(interactive=True), | |
| inputs=None, | |
| outputs=analyze_button | |
| ) | |
| analyze_button.click( | |
| fn=analyze_action, | |
| inputs=[state_lp_json, question_input, state_nodes, provider_dropdown, model_dropdown], | |
| outputs=analysis_output | |
| ) | |
| provider_dropdown.change( | |
| fn=update_model_choices, | |
| inputs=provider_dropdown, | |
| outputs=model_dropdown | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| if initialize_components(): | |
| print("Components initialized successfully!") | |
| app = create_gradio_interface() | |
| app.launch(share=True) | |
| else: | |
| print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) | |
| sys.exit(1) | |