Spaces:
Build error
Build error
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import json | |
| from typing import Dict, List, Optional, Union | |
| from openai import OpenAI | |
| from pathlib import Path | |
| from embedding_cache import EmbeddingCache | |
| class SDCClassifier: | |
| def __init__(self, | |
| openai_api_key: str = None, | |
| cache_path: str = "embeddings_cache.db", | |
| local_model: str = "cambridgeltl/SapBERT-from-PubMedBERT-fulltext", | |
| device: str = None): | |
| """ | |
| Ініціалізація класифікатора SDC | |
| Args: | |
| openai_api_key: API ключ для OpenAI (опціонально, можна взяти з env) | |
| cache_path: шлях до файлу кешу ембедінгів | |
| local_model: назва або шлях до локальної моделі | |
| device: пристрій для локальної моделі ('cuda', 'cpu' або None) | |
| """ | |
| self.client = OpenAI(api_key=openai_api_key or os.getenv("OPENAI_API_KEY")) | |
| self.local_embedder = None | |
| self.using_local = False | |
| if local_model: | |
| from local_embedder import LocalEmbedder | |
| self.local_embedder = LocalEmbedder(local_model, device) | |
| self.using_local = True | |
| self.classes_json = {} | |
| self.class_signatures = None | |
| self.df = None | |
| self.embeddings = None | |
| self.embeddings_mean = None | |
| self.embeddings_std = None | |
| # Створення директорії для кешу | |
| cache_dir = os.path.dirname(cache_path) | |
| if cache_dir and not os.path.exists(cache_dir): | |
| os.makedirs(cache_dir) | |
| # Ініціалізація кешу | |
| self.cache = EmbeddingCache(cache_path) | |
| # Базовий стан | |
| self.base_classes_json = {} | |
| self.base_signatures = None | |
| def load_initial_state(self, classes_file: str, signatures_file: str) -> str: | |
| """ | |
| Завантаження початкового стану при старті застосунку | |
| Args: | |
| classes_file: шлях до файлу з класами | |
| signatures_file: шлях до файлу з signatures | |
| Returns: | |
| str: повідомлення про результат завантаження | |
| """ | |
| try: | |
| self.base_classes_json = self.load_classes(classes_file) | |
| if os.path.exists(signatures_file): | |
| self.base_signatures = self.load_signatures(signatures_file) | |
| # Встановлюємо поточний стан як базовий | |
| self.classes_json = self.base_classes_json.copy() | |
| self.class_signatures = self.base_signatures.copy() if self.base_signatures else None | |
| return f"Завантажено {len(self.base_classes_json)} базових класів" | |
| except Exception as e: | |
| return f"Помилка при завантаженні базового стану: {str(e)}" | |
| def restore_base_state(self) -> None: | |
| """Відновлення базового стану""" | |
| self.classes_json = self.base_classes_json.copy() | |
| self.class_signatures = self.base_signatures.copy() if self.base_signatures else None | |
| def load_classes(self, json_path: Union[str, dict]) -> dict: | |
| """ | |
| Завантаження класів та їх хінтів з JSON файлу або словника | |
| Args: | |
| json_path: шлях до JSON файлу або словник з класами | |
| Returns: | |
| dict: словник класів та їх хінтів | |
| """ | |
| try: | |
| if isinstance(json_path, dict): | |
| self.classes_json = json_path | |
| else: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| self.classes_json = json.load(f) | |
| if not all(isinstance(hints, list) for hints in self.classes_json.values()): | |
| raise ValueError("Кожен клас повинен мати список хінтів") | |
| return self.classes_json | |
| except FileNotFoundError: | |
| print(f"Файл {json_path} не знайдено!") | |
| return {} | |
| except json.JSONDecodeError: | |
| print(f"Помилка читання JSON з файлу {json_path}!") | |
| return {} | |
| def save_signatures(self, filename: str = "signatures.npz") -> None: | |
| """ | |
| Зберігає signatures у NPZ файл | |
| Args: | |
| filename: шлях до файлу для збереження | |
| """ | |
| if self.class_signatures: | |
| np.savez(filename, **self.class_signatures) | |
| def load_signatures(self, filename: str = "signatures.npz") -> Dict[str, np.ndarray]: | |
| """ | |
| Завантажує signatures з NPZ файлу | |
| Args: | |
| filename: шлях до файлу з signatures | |
| Returns: | |
| Dict[str, np.ndarray]: словник signatures | |
| """ | |
| try: | |
| with np.load(filename) as data: | |
| self.class_signatures = {key: data[key] for key in data.files} | |
| return self.class_signatures | |
| except (FileNotFoundError, IOError): | |
| return None | |
| def get_embedding(self, text: str, model_name: str = None) -> list: | |
| """ | |
| Отримання ембедінгу тексту | |
| Args: | |
| text: текст для ембедінгу | |
| model_name: назва моделі (OpenAI) або None для локальної | |
| Returns: | |
| list: ембедінг тексту | |
| """ | |
| # Перевіряємо кеш | |
| cached_embedding = self.cache.get(text, model_name or "local") | |
| if cached_embedding is not None: | |
| return cached_embedding.tolist() | |
| # Отримуємо ембедінг | |
| if self.using_local and model_name is None: | |
| embedding = self.local_embedder.get_embeddings(text)[0] | |
| else: | |
| response = self.client.embeddings.create( | |
| input=text, | |
| model=model_name or "text-embedding-3-large" | |
| ) | |
| embedding = response.data[0].embedding | |
| # Зберігаємо в кеш | |
| self.cache.put(text, model_name or "local", embedding) | |
| return embedding | |
| def get_cache_stats(self) -> dict: | |
| """Отримання статистики кешування""" | |
| return self.cache.get_stats() | |
| def clear_old_cache(self, days: int = 30) -> int: | |
| """Очищення старих записів з кешу""" | |
| return self.cache.clear_old(days) | |
| def embed_hints(self, hint_list: List[str], model_name: str = None) -> np.ndarray: | |
| """ | |
| Створення ембедінгів для списку хінтів | |
| Args: | |
| hint_list: список хінтів | |
| model_name: назва моделі для ембедінгів | |
| Returns: | |
| np.ndarray: матриця ембедінгів | |
| """ | |
| emb_list = [] | |
| total_hints = len(hint_list) | |
| for idx, hint in enumerate(hint_list, 1): | |
| try: | |
| print(f" Отримання embedding {idx}/{total_hints}: '{hint}'") | |
| emb = self.get_embedding(hint, model_name=model_name) | |
| emb_list.append(emb) | |
| except Exception as e: | |
| print(f" Помилка при отриманні embedding для '{hint}': {str(e)}") | |
| continue | |
| if not emb_list: | |
| raise ValueError("Не вдалося отримати жодного embedding") | |
| return np.array(emb_list, dtype=np.float32) | |
| def initialize_signatures(self, | |
| model_name: str = None, | |
| signatures_file: str = "signatures.npz", | |
| force_rebuild: bool = False) -> str: | |
| """ | |
| Ініціалізує signatures: завантажує існуючі або створює нові | |
| Args: | |
| model_name: назва моделі для ембедінгів | |
| signatures_file: шлях до файлу для збереження (None - не зберігати) | |
| force_rebuild: примусово перебудувати signatures | |
| """ | |
| if not self.classes_json: | |
| return "Помилка: Не знайдено жодного класу в classes.json" | |
| print(f"Знайдено {len(self.classes_json)} класів") | |
| # Завантажуємо існуючі signatures | |
| if not force_rebuild and signatures_file and os.path.exists(signatures_file): | |
| try: | |
| loaded_signatures = self.load_signatures(signatures_file) | |
| if loaded_signatures and all(cls in loaded_signatures for cls in self.classes_json): | |
| self.class_signatures = loaded_signatures | |
| print("Успішно завантажено збережені signatures") | |
| return f"Завантажено існуючі signatures для {len(self.class_signatures)} класів" | |
| except Exception as e: | |
| print(f"Помилка при завантаженні signatures: {str(e)}") | |
| try: | |
| self.class_signatures = {} | |
| total_classes = len(self.classes_json) | |
| print(f"Починаємо створення нових signatures для {total_classes} класів...") | |
| for idx, (cls_name, hints) in enumerate(self.classes_json.items(), 1): | |
| if not hints: | |
| print(f"Пропускаємо клас {cls_name} - немає хінтів") | |
| continue | |
| print(f"Обробка класу {cls_name} ({idx}/{total_classes})...") | |
| try: | |
| # Отримуємо ембедінги для всіх хінтів класу | |
| arr = self.embed_hints(hints, model_name=model_name) | |
| # Нормалізуємо кожен ембедінг | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) | |
| arr = arr / norms | |
| # Обчислюємо середній нормалізований ембедінг | |
| self.class_signatures[cls_name] = np.mean(arr, axis=0) | |
| print(f"Успішно створено signature для {cls_name}") | |
| except Exception as e: | |
| print(f"Помилка при створенні signature для {cls_name}: {str(e)}") | |
| continue | |
| if not self.class_signatures: | |
| return "Помилка: Не вдалося створити жодного signature" | |
| # Зберігаємо signatures | |
| if signatures_file: | |
| try: | |
| self.save_signatures(signatures_file) | |
| print("Signatures збережено у файл") | |
| except Exception as e: | |
| print(f"Помилка при збереженні signatures: {str(e)}") | |
| return f"Створено нові signatures для {len(self.class_signatures)} класів" | |
| except Exception as e: | |
| return f"Помилка при створенні signatures: {str(e)}" | |
| def load_data(self, csv_path: str = "messages.csv", emb_path: str = "embeddings.npy") -> str: | |
| """ | |
| Завантаження даних з CSV та NPY файлів | |
| Args: | |
| csv_path: шлях до CSV файлу | |
| emb_path: шлях до NPY файлу з ембедінгами | |
| Returns: | |
| str: повідомлення про результат | |
| """ | |
| self.df = pd.read_csv(csv_path) | |
| emb_local = np.load(emb_path) | |
| assert len(self.df) == len(emb_local), "CSV і embeddings різної довжини!" | |
| self.df["Target"] = "Unlabeled" | |
| self.embeddings_mean = emb_local.mean(axis=0) | |
| self.embeddings_std = emb_local.std(axis=0) | |
| self.embeddings = (emb_local - self.embeddings_mean) / self.embeddings_std | |
| return f"Завантажено {len(self.df)} рядків" | |
| def predict_classes(self, text_embedding: np.ndarray, threshold: float = 0.0) -> Dict[str, float]: | |
| """ | |
| Передбачення класів для одного тексту | |
| Args: | |
| text_embedding: ембедінг тексту | |
| threshold: поріг впевненості | |
| Returns: | |
| Dict[str, float]: словник класів та їх scores | |
| """ | |
| results = {} | |
| for cls, sign in self.class_signatures.items(): | |
| score = float(np.dot(text_embedding, sign)) | |
| if score > threshold: | |
| results[cls] = score | |
| return dict(sorted(results.items(), key=lambda x: x[1], reverse=True)) | |
| def process_single_text(self, text: str, threshold: float = 0.3) -> dict: | |
| """ | |
| Обробка одного тексту | |
| Args: | |
| text: текст для класифікації | |
| threshold: поріг впевненості | |
| Returns: | |
| dict: результати класифікації | |
| """ | |
| if self.class_signatures is None: | |
| return {"error": "Спочатку збудуйте signatures!"} | |
| # Отримуємо ембедінг | |
| emb = np.array(self.get_embedding(text)) | |
| # Нормалізуємо відносно даних навчання, якщо вони доступні | |
| if self.embeddings_mean is not None and self.embeddings_std is not None and not self.using_local: | |
| emb = (emb - self.embeddings_mean) / self.embeddings_std | |
| # Отримуємо передбачення | |
| predictions = self.predict_classes(emb, threshold) | |
| if not predictions: | |
| return {"message": text, "result": "Жодного класу не знайдено"} | |
| # Форматуємо результати | |
| formatted_results = [] | |
| for cls, score in predictions.items(): | |
| # Конвертуємо score в відсотки, обмежуємо до 100% | |
| score_percent = min(abs(float(score)) * 100, 100) | |
| formatted_results.append(f"{cls}: {score_percent:.2f}%") | |
| return { | |
| "message": text, | |
| "result": " ".join(formatted_results) | |
| } | |
| def classify_rows(self, filter_substring: str = "", threshold: float = 0.3) -> pd.DataFrame: | |
| """ | |
| Класифікація всіх або відфільтрованих рядків | |
| Args: | |
| filter_substring: підрядок для фільтрації | |
| threshold: поріг впевненості | |
| Returns: | |
| pd.DataFrame: результати класифікації | |
| """ | |
| if self.class_signatures is None: | |
| raise ValueError("Спочатку збудуйте signatures!") | |
| if self.df is None or self.embeddings is None: | |
| raise ValueError("Дані не завантажені! Спочатку викличте load_data.") | |
| if filter_substring: | |
| filtered_idx = self.df[self.df["Message"].str.contains(filter_substring, | |
| case=False, | |
| na=False)].index | |
| else: | |
| filtered_idx = self.df.index | |
| for cls in self.class_signatures.keys(): | |
| self.df[f"Score_{cls}"] = 0.0 | |
| for i in filtered_idx: | |
| emb_vec = self.embeddings[i] | |
| predictions = self.predict_classes(emb_vec, threshold=threshold) | |
| for cls, score in predictions.items(): | |
| self.df.at[i, f"Score_{cls}"] = score | |
| main_classes = [cls for cls, score in predictions.items() | |
| if score > threshold] | |
| self.df.at[i, "Target"] = "|".join(main_classes) if main_classes else "None" | |
| result_columns = ["Message", "Target"] + [f"Score_{cls}" | |
| for cls in self.class_signatures.keys()] | |
| result_df = self.df.loc[filtered_idx, result_columns].copy() | |
| return result_df.reset_index(drop=True) | |
| def save_results(self, output_path: str = "messages_with_labels.csv") -> str: | |
| """ | |
| Зберігання результатів класифікації | |
| Args: | |
| output_path: шлях для збереження результатів | |
| Returns: | |
| str: повідомлення про результат | |
| """ | |
| if self.df is None: | |
| return "Дані відсутні!" | |
| self.df.to_csv(output_path, index=False) | |
| return f"Дані збережено у файл {output_path}" | |
| def save_model_info(self, path: str = "model_info.json") -> None: | |
| """ | |
| Зберігання інформації про поточний стан моделі | |
| Args: | |
| path: шлях для збереження | |
| """ | |
| info = { | |
| "using_local": self.using_local, | |
| "classes_count": len(self.classes_json), | |
| "signatures_count": len(self.class_signatures) if self.class_signatures else 0, | |
| "cache_stats": self.get_cache_stats(), | |
| } | |
| if self.using_local: | |
| info["local_model"] = self.local_embedder.get_model_info() | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(info, f, indent=2) | |
| def evaluate_classification(self, csv_path: str, threshold: float = 0.3) -> pd.DataFrame: | |
| """ | |
| Оцінка класифікації текстів з CSV файлу | |
| Args: | |
| csv_path: шлях до CSV файлу з колонками Category та Question | |
| threshold: поріг впевненості для класифікації | |
| Returns: | |
| pd.DataFrame: результати класифікації з додатковими метриками | |
| """ | |
| if self.class_signatures is None: | |
| raise ValueError("Спочатку збудуйте signatures!") | |
| # Завантаження даних | |
| df = pd.read_csv(csv_path) | |
| if not {'Category', 'Question'}.issubset(df.columns): | |
| raise ValueError("CSV повинен містити колонки 'Category' та 'Question'") | |
| # Підготовка результатів | |
| results = [] | |
| for idx, row in df.iterrows(): | |
| # Отримуємо ембедінг для питання | |
| emb = np.array(self.get_embedding(row['Question'])) | |
| # Нормалізуємо якщо потрібно | |
| if self.embeddings_mean is not None and self.embeddings_std is not None and not self.using_local: | |
| emb = (emb - self.embeddings_mean) / self.embeddings_std | |
| # Отримуємо всі передбачення | |
| predictions = self.predict_classes(emb, threshold) | |
| # Формуємо список класів за рівнем впевненості | |
| sorted_classes = list(predictions.keys()) | |
| # Знаходимо позицію очікуваного класу | |
| expected_class = row['Category'] | |
| expected_position = sorted_classes.index(expected_class) + 1 if expected_class in sorted_classes else -1 | |
| # Отримуємо рівень впевненості для очікуваного класу | |
| expected_confidence = predictions.get(expected_class, 0.0) | |
| # Додаємо результат | |
| results.append({ | |
| 'Category': row['Category'], | |
| 'Question': row['Question'], | |
| 'ExpectedClassPosition': expected_position, | |
| 'ExpectedClassConfidence': expected_confidence, | |
| 'ClassificationResults': json.dumps(predictions) | |
| }) | |
| return pd.DataFrame(results) | |
| def save_evaluation_results(self, df: pd.DataFrame, output_path: str = "evaluation_results.csv") -> str: | |
| """ | |
| Зберігає результати оцінки класифікації | |
| Args: | |
| df: DataFrame з результатами | |
| output_path: шлях для збереження файлу | |
| Returns: | |
| str: повідомлення про результат | |
| """ | |
| try: | |
| df.to_csv(output_path, index=False) | |
| return f"Результати збережено у файл {output_path}" | |
| except Exception as e: | |
| return f"Помилка при збереженні результатів: {str(e)}" | |
| def get_evaluation_statistics(self, df: pd.DataFrame) -> dict: | |
| """ | |
| Розраховує статистику по результатам класифікації | |
| Args: | |
| df: DataFrame з результатами класифікації | |
| Returns: | |
| dict: статистика класифікації | |
| """ | |
| total = len(df) | |
| found_mask = df['ExpectedClassPosition'] != -1 | |
| correct_first = (df['ExpectedClassPosition'] == 1).sum() | |
| in_top3 = (df['ExpectedClassPosition'].between(1, 3)).sum() | |
| not_found = (~found_mask).sum() | |
| # Середня впевненість для коректних класифікацій | |
| mean_confidence = df[df['ExpectedClassPosition'] == 1]['ExpectedClassConfidence'].mean() | |
| # Підрахунок по діапазонах впевненості | |
| confidence_ranges = { | |
| "90-100%": ((df['ExpectedClassConfidence'] >= 0.9) & found_mask).sum(), | |
| "70-90%": ((df['ExpectedClassConfidence'].between(0.7, 0.9)) & found_mask).sum(), | |
| "50-70%": ((df['ExpectedClassConfidence'].between(0.5, 0.7)) & found_mask).sum(), | |
| "<50%": ((df['ExpectedClassConfidence'] < 0.5) & found_mask).sum() | |
| } | |
| return { | |
| "total_samples": total, | |
| "correct_first_place": { | |
| "count": int(correct_first), | |
| "percentage": round(correct_first/total * 100, 1) | |
| }, | |
| "in_top3": { | |
| "count": int(in_top3), | |
| "percentage": round(in_top3/total * 100, 1) | |
| }, | |
| "not_found": { | |
| "count": int(not_found), | |
| "percentage": round(not_found/total * 100, 1) | |
| }, | |
| "mean_confidence_correct": round(mean_confidence * 100, 1) if not np.isnan(mean_confidence) else 0, | |
| "confidence_distribution": { | |
| k: { | |
| "count": int(v), | |
| "percentage": round(v/total * 100, 1) | |
| } | |
| for k, v in confidence_ranges.items() | |
| } | |
| } | |
| def evaluate_classification(self, csv_path: str, threshold: float = 0.3) -> tuple[pd.DataFrame, dict]: | |
| """ | |
| Оцінка класифікації текстів з CSV файлу | |
| Args: | |
| csv_path: шлях до CSV файлу з колонками Category та Question | |
| threshold: поріг впевненості для класифікації | |
| Returns: | |
| tuple[pd.DataFrame, dict]: результати класифікації та статистика | |
| """ | |
| if self.class_signatures is None: | |
| raise ValueError("Спочатку збудуйте signatures!") | |
| # Завантаження даних | |
| df = pd.read_csv(csv_path) | |
| if not {'Category', 'Question'}.issubset(df.columns): | |
| raise ValueError("CSV повинен містити колонки 'Category' та 'Question'") | |
| # Підготовка результатів | |
| results = [] | |
| for idx, row in df.iterrows(): | |
| # Отримуємо ембедінг для питання | |
| emb = np.array(self.get_embedding(row['Question'])) | |
| # Нормалізуємо якщо потрібно | |
| if self.embeddings_mean is not None and self.embeddings_std is not None and not self.using_local: | |
| emb = (emb - self.embeddings_mean) / self.embeddings_std | |
| # Отримуємо всі передбачення | |
| predictions = self.predict_classes(emb, threshold) | |
| # Формуємо список класів за рівнем впевненості | |
| sorted_classes = list(predictions.keys()) | |
| # Знаходимо позицію очікуваного класу | |
| expected_class = row['Category'] | |
| expected_position = sorted_classes.index(expected_class) + 1 if expected_class in sorted_classes else -1 | |
| # Отримуємо рівень впевненості для очікуваного класу | |
| expected_confidence = predictions.get(expected_class, 0.0) | |
| # Додаємо результат | |
| results.append({ | |
| 'Category': row['Category'], | |
| 'Question': row['Question'], | |
| 'ExpectedClassPosition': expected_position, | |
| 'ExpectedClassConfidence': expected_confidence, | |
| 'ClassificationResults': json.dumps(predictions) | |
| }) | |
| results_df = pd.DataFrame(results) | |
| statistics = self.get_evaluation_statistics(results_df) | |
| return results_df, statistics | |
| def load_model_info(path: str) -> dict: | |
| """ | |
| Завантаження інформації про модель | |
| Args: | |
| path: шлях до файлу з інформацією | |
| Returns: | |
| dict: інформація про модель | |
| """ | |
| with open(path, 'r', encoding='utf-8') as f: | |
| return json.load(f) |