Spaces:
Sleeping
Sleeping
| # enhanced_search_v2.py | |
| # --------------------- | |
| # Motor de busca híbrido e em camadas para procedimentos médicos. | |
| # Versão final consolidada com todas as otimizações. | |
| # | |
| # Funcionalidades Principais: | |
| # 1. Correção Ortográfica PRÉ-BUSCA: Corrige termos inválidos ANTES de qualquer busca. | |
| # 2. Camada 0 para Busca Literal Robusta: Encontra correspondências exatas da frase, | |
| # ignorando caixa, acentos, pontuação e espaçamento. | |
| # 3. Early Exit Otimizado: Interrompe a busca com log claro e correto. | |
| # 4. Busca em Múltiplas Camadas: Da mais restrita (literal) à mais abrangente (ponderada). | |
| # 5. Pontuação por Relevância (IDF): Palavras raras têm mais peso. | |
| # 6. Limpeza de Dados: Zera campos do Rol para procedimentos que não são do Rol. | |
| # 7. Reordenação Semântica: Usa o MiniLM-L6-v2 para entender o significado e reordenar. | |
| # 8. Feedback de Usuário: Prioriza resultados validados pela comunidade. | |
| # 9. Lida com múltiplos procedimentos (linhas do DB) para um mesmo código TUSS. | |
| import pandas as pd | |
| import re | |
| from thefuzz import process, fuzz | |
| from unidecode import unidecode | |
| import time | |
| from sentence_transformers import util | |
| import torch | |
| import math | |
| from collections import defaultdict | |
| # --- FUNÇÕES AUXILIARES DE NORMALIZAÇÃO --- | |
| ### ALTERAÇÃO ### | |
| def literal_normalize_text(text): | |
| """ | |
| Normaliza o texto para busca literal (Camada 0): minúsculas, sem acentos, | |
| substitui pontuação por espaço e padroniza espaços em branco. | |
| """ | |
| if pd.isna(text): return "" | |
| normalized = unidecode(str(text).lower()) | |
| # CORREÇÃO: Substitui caracteres não-alfanuméricos por um espaço em vez de removê-los. | |
| # Isso impede que palavras como "sangue." e "O" se fundam em "sangueO". | |
| normalized = re.sub(r'[^\w\s]', ' ', normalized) | |
| return re.sub(r'\s+', ' ', normalized).strip() | |
| def normalize_text(text): | |
| """Normaliza o texto para busca por tokens (palavras): minúsculas, sem acentos e espaços extras.""" | |
| if pd.isna(text): return "" | |
| return unidecode(str(text).lower().strip()) | |
| def get_longest_word(query_text): | |
| """Extrai a palavra mais longa de uma query (usado no fallback).""" | |
| words = re.findall(r'\b\w{4,}\b', query_text) | |
| if not words: return "" | |
| return max(words, key=len) | |
| # --- FUNÇÕES DE FORMATAÇÃO E DESTAQUE --- | |
| # No arquivo enhanced_search_v2.py | |
| def format_result(row_data, match_type="", score=0): | |
| """ | |
| Formata uma linha do DataFrame em um dicionário de resultado padrão. | |
| Aplica a regra de negócio para limpar dados se o procedimento não for do Rol. | |
| """ | |
| data = row_data.copy() | |
| if data.get('Correlacao_Rol', '').strip().lower() != 'sim': | |
| data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', '' | |
| data['PAC'], data['DUT'] = '---', '---' | |
| else: | |
| data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não' | |
| # Lógica da DUT corrigida aqui | |
| original_dut_value = data.get('DUT', '').strip() | |
| # CORREÇÃO: A verificação agora aceita números com ponto decimal (ex: "65.1") | |
| # A lógica é: se o valor, após remover o primeiro '.', for composto apenas de dígitos, é válido. | |
| if original_dut_value and original_dut_value.replace('.', '', 1).isdigit(): | |
| data['DUT'] = f'Sim, DUT nº {original_dut_value}' | |
| else: | |
| data['DUT'] = 'Não' | |
| standard_columns = [ | |
| 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', | |
| 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', | |
| 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', | |
| 'Sinonimo_3', 'Sinonimo_4', 'Semantico' | |
| ] | |
| formatted_data = {col: data.get(col, '') for col in standard_columns} | |
| result = {"score": round(score), "match_type": match_type} | |
| result.update(formatted_data) | |
| return result | |
| def _highlight_matches(results, query): | |
| """Adiciona tags <b></b> em volta das palavras da query nos resultados.""" | |
| if not query or not results: return results | |
| stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} | |
| query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords} | |
| cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] | |
| for result in results: | |
| for col in cols_to_highlight: | |
| original_text = result.get(col, '') | |
| highlighted_text = original_text | |
| if original_text and query_words: | |
| for word in sorted(list(query_words), key=len, reverse=True): | |
| pattern = r'\b(' + re.escape(word) + r')\b' | |
| highlighted_text = re.sub(pattern, r'<b>\1</b>', highlighted_text, flags=re.IGNORECASE) | |
| result[f"{col}_highlighted"] = highlighted_text | |
| return results | |
| # --- FUNÇÕES DE CARREGAMENTO DE DADOS --- | |
| def load_and_prepare_database(db_path): | |
| """ | |
| Carrega o CSV, cria colunas normalizadas, campo de texto único, pesos IDF e um conjunto | |
| de todas as palavras únicas da base de dados. | |
| """ | |
| try: | |
| print(f"Carregando e preparando a base de dados de: {db_path}...") | |
| df_original = pd.read_csv(db_path, dtype=str).fillna('') | |
| search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] | |
| df_normalized = df_original.copy() | |
| df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text) | |
| df_normalized['Codigo_TUSS_norm'] = df_normalized['Codigo_TUSS'].apply(normalize_text) | |
| df_normalized['full_text_norm'] = "" | |
| for col in search_cols: | |
| if col in df_normalized.columns: | |
| df_normalized[f'{col}_literal'] = df_normalized[col].apply(literal_normalize_text) | |
| df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text) | |
| df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm'] | |
| print("Calculando pesos IDF e dicionário da base...") | |
| num_documents = len(df_normalized) | |
| doc_freq = defaultdict(int) | |
| db_word_set = set() | |
| for text in df_normalized['full_text_norm']: | |
| words = set(text.split()) | |
| db_word_set.update(words) | |
| for word in words: | |
| if word: doc_freq[word] += 1 | |
| db_word_set.discard('') | |
| print(f"Dicionário da base de dados criado com {len(db_word_set)} palavras únicas.") | |
| idf_scores = {word: math.log(num_documents / (freq + 1)) for word, freq in doc_freq.items()} | |
| print(f"Pesos IDF calculados para {len(idf_scores)} palavras.") | |
| print("Criando corpus para busca fuzzy...") | |
| fuzzy_search_corpus = [] | |
| for index, row in df_normalized.iterrows(): | |
| for col in search_cols: | |
| if col in df_original.columns and f'{col}_norm' in row and pd.notna(row[f'{col}_norm']): | |
| val = row[f'{col}_norm'] | |
| if val: fuzzy_search_corpus.append((val, index, f'{col}_norm')) | |
| print(f"Base de dados pronta com {len(df_original)} procedimentos.") | |
| return df_original, df_normalized, fuzzy_search_corpus, idf_scores, db_word_set | |
| except Exception as e: | |
| print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise | |
| def load_general_dictionary(path): | |
| try: | |
| print(f"Carregando dicionário geral de português de: {path}...") | |
| with open(path, 'r', encoding='utf-8') as f: | |
| words = {normalize_text(line.strip()) for line in f if line.strip()} | |
| print(f"Dicionário geral carregado com {len(words)} palavras.") | |
| return words | |
| except FileNotFoundError: return set() | |
| except Exception as e: return set() | |
| def load_correction_corpus(dict_path, column_name='Termo_Correto'): | |
| try: | |
| print(f"Carregando corpus de correção de: {dict_path}...") | |
| df_dict = pd.read_csv(dict_path, dtype=str).fillna('') | |
| if column_name not in df_dict.columns: return [], [] | |
| original_corpus = df_dict[column_name].dropna().astype(str).tolist() | |
| normalized_corpus = [normalize_text(term) for term in original_corpus] | |
| print(f"Corpus de correção carregado com {len(original_corpus)} termos.") | |
| return original_corpus, normalized_corpus | |
| except FileNotFoundError: return [], [] | |
| except Exception as e: return [], [] | |
| # --- FUNÇÃO DE RECLASSIFICAÇÃO SEMÂNTICA --- | |
| def rerank_with_semantic_model(original_query, results_list, model): | |
| if not model or not results_list: return results_list | |
| semantic_columns = ['Descricao_TUSS', 'Procedimento_Rol', 'SUBGRUPO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] | |
| corpus_texts = [". ".join(sorted(list({res.get(col) for col in semantic_columns if res.get(col) and isinstance(res.get(col), str)}))) for res in results_list] | |
| try: | |
| query_embedding = model.encode(original_query, convert_to_tensor=True, show_progress_bar=False) | |
| corpus_embeddings = model.encode(corpus_texts, convert_to_tensor=True, show_progress_bar=False) | |
| cosine_scores = util.cos_sim(query_embedding, corpus_embeddings) | |
| except Exception as e: return results_list | |
| for i, result in enumerate(results_list): | |
| result['semantic_score'] = round(max(0, cosine_scores[0][i].item()) * 100) | |
| result['hybrid_score'] = result['semantic_score'] + result.get('score', 0) | |
| return sorted(results_list, key=lambda x: (x.get('score', 0) == 100, x.get('hybrid_score', 0)), reverse=True) | |
| # --- FUNÇÃO INTERNA DE BUSCA COM CAMADAS --- | |
| def _run_search_layers(literal_query, normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, idf_scores, limit_per_layer): | |
| """Executa as camadas de busca e retorna o nome da camada de saída em caso de early exit.""" | |
| matched_indices = set() | |
| stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} | |
| query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1] | |
| ### ALTERAÇÃO ### | |
| # --- CAMADA 0: Busca Literal (Contida) --- | |
| # Esta é a lógica correta para encontrar uma frase inteira dentro de um texto maior. | |
| if literal_query: | |
| temp_results = [] | |
| literal_cols = ['Codigo_TUSS_literal', 'Descricao_TUSS_literal', 'Procedimento_Rol_literal'] | |
| for col in literal_cols: | |
| if col in df_normalized.columns: | |
| # Lógica restaurada: Busca pela frase/palavra inteira contida no texto. | |
| # O '\b' (word boundary) garante que "sulta" não encontre "consulta", | |
| # mas "consulta" encontre "consulta com especialista". | |
| # Isso agora funciona corretamente graças à correção em literal_normalize_text. | |
| mask = df_normalized[col].str.contains(r'\b' + re.escape(literal_query) + r'\b', na=False) | |
| matches = df_normalized[mask] | |
| for index, _ in matches.iterrows(): | |
| if index not in matched_indices: | |
| match_type = "Código Literal" if "Codigo" in col else "Texto Literal" | |
| temp_results.append(format_result(df_original.loc[index], match_type, 100)) | |
| matched_indices.add(index) | |
| if temp_results: | |
| response["results_by_layer"]["literal_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] | |
| return "Busca Literal" | |
| # --- CAMADA 1: Busca Normalizada Exata --- | |
| # Esta camada agora serve para quando o texto da busca é *exatamente* igual ao da célula. | |
| temp_results = [] | |
| if normalized_query: | |
| exact_code_matches = df_normalized[df_normalized['Codigo_TUSS_norm'] == normalized_query] | |
| for index, _ in exact_code_matches.iterrows(): | |
| if index not in matched_indices: | |
| temp_results.append(format_result(df_original.loc[index], "Código Exato (Normalizado)", 100)) | |
| matched_indices.add(index) | |
| for col in ['Descricao_TUSS_norm', 'Procedimento_Rol_norm']: | |
| if col in df_normalized.columns: | |
| exact_text_matches = df_normalized[df_normalized[col] == normalized_query] | |
| for index, _ in exact_text_matches.iterrows(): | |
| if index not in matched_indices: | |
| temp_results.append(format_result(df_original.loc[index], "Exato (Normalizado)", 100)) | |
| matched_indices.add(index) | |
| if temp_results: | |
| response["results_by_layer"]["exact_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] | |
| return "Normalizada Exata" | |
| # --- CAMADA 2: Busca Lógica 'E' --- | |
| temp_results = [] | |
| if query_words: | |
| mask = pd.Series(True, index=df_normalized.index) | |
| for word in query_words: | |
| mask &= df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(word) + r'\b', na=False) | |
| for index, row in df_normalized[mask & ~df_normalized.index.isin(matched_indices)].iterrows(): | |
| score = fuzz.WRatio(normalized_query, row.get('Descricao_TUSS_norm', '')) | |
| if score > 85: | |
| temp_results.append(format_result(df_original.loc[index], "Busca Lógica (E)", score)) | |
| matched_indices.add(index) | |
| response["results_by_layer"]["logical_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer] | |
| # --- CAMADA 3: Busca 'Quase Exata' (Fuzzy) --- | |
| temp_results = [] | |
| processed_indices_layer3 = set() | |
| if fuzzy_search_corpus: | |
| # --- INÍCIO DO CÓDIGO DE DEPURAÇÃO --- | |
| print("\n" + "="*20 + " DEBUG CAMADA 3 " + "="*20) | |
| print(f"Query para Fuzzy: '{normalized_query}'") | |
| print(f"Tamanho do Fuzzy Corpus: {len(fuzzy_search_corpus)}") | |
| # --- FIM DO CÓDIGO DE DEPURAÇÃO --- | |
| almost_exact_matches = process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], | |
| scorer=fuzz.token_set_ratio, limit=limit_per_layer * 3, score_cutoff=90) | |
| # --- INÍCIO DO CÓDIGO DE DEPURAÇÃO --- | |
| print("\n" + "="*20 + " DEBUG CAMADA 3 " + "="*20) | |
| print(f"Query para Fuzzy: '{normalized_query}'") | |
| print(f"Tamanho do Fuzzy Corpus: {len(fuzzy_search_corpus)}") | |
| # --- FIM DO CÓDIGO DE DEPURAÇÃO --- | |
| for match_text, score in almost_exact_matches: | |
| if score == 100 and match_text == normalized_query: continue | |
| corpus_items = [item for item in fuzzy_search_corpus if item[0] == match_text] | |
| for _, original_index, _ in corpus_items: | |
| if original_index not in matched_indices and original_index not in processed_indices_layer3: | |
| temp_results.append(format_result(df_original.loc[original_index], "Quase Exato", 98)) | |
| matched_indices.add(original_index) | |
| processed_indices_layer3.add(original_index) | |
| response["results_by_layer"]["almost_exact_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] | |
| # --- CAMADA 4: Busca por Termos Validados --- | |
| temp_results = [] | |
| if query_words: | |
| mask_c4 = pd.Series(True, index=df_normalized.index) | |
| for word in query_words: | |
| mask_c4 &= df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(word) + r'\b', na=False) | |
| for index, row in df_normalized[mask_c4 & ~df_normalized.index.isin(matched_indices)].iterrows(): | |
| score = fuzz.WRatio(normalized_query, row.get('Descricao_TUSS_norm', '')) | |
| if score > 75: | |
| temp_results.append(format_result(df_original.iloc[index], "Termos Validados (E)", score)) | |
| matched_indices.add(index) | |
| response["results_by_layer"]["contains_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer] | |
| # --- CAMADA 5: Busca Ponderada (IDF) --- | |
| temp_results = [] | |
| if query_words and idf_scores: | |
| max_idf = max(idf_scores.values()) if idf_scores else 1.0 | |
| total_query_idf = sum(idf_scores.get(word, max_idf) for word in query_words) | |
| regex_pattern = '|'.join(re.escape(word) for word in query_words) | |
| mask = df_normalized['full_text_norm'].str.contains(regex_pattern, na=False) | |
| candidate_df = df_normalized[mask & ~df_normalized.index.isin(matched_indices)] | |
| for index, row in candidate_df.iterrows(): | |
| weighted_score = sum(idf_scores.get(word, 0) for word in query_words if word in row['full_text_norm'].split()) | |
| normalized_score = (weighted_score / total_query_idf) * 90 if total_query_idf > 0 else 0 | |
| if query_words and row.get('full_text_norm', '').strip().startswith(query_words[0]): | |
| normalized_score = min(normalized_score + 10, 95) | |
| temp_results.append(format_result(df_original.loc[index], "Busca Ponderada (IDF)", normalized_score)) | |
| matched_indices.add(index) | |
| response["results_by_layer"]["term_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer * 4] | |
| # --- CAMADA 6: Fallback --- | |
| total_found_primary = sum(len(v) for v in response["results_by_layer"].values()) | |
| if total_found_primary == 0 and normalized_query: | |
| temp_results = [] | |
| longest_word = get_longest_word(normalized_query) | |
| if longest_word: | |
| mask_c6 = df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(longest_word) + r'\b', na=False) | |
| for index, row in df_normalized[mask_c6 & ~df_normalized.index.isin(matched_indices)].iterrows(): | |
| temp_results.append(format_result(df_original.loc[index], f"Palavra-Chave '{longest_word}'", 80)) | |
| response["results_by_layer"]["keyword_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] | |
| return None | |
| # --- FUNÇÃO PRINCIPAL QUE ORQUESTRA A BUSCA --- | |
| def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus, | |
| portuguese_word_set, idf_scores, db_word_set, | |
| limit_per_layer=10, semantic_model=None, | |
| user_best_matches_counts=None, user_feedback_threshold=10): | |
| SEMANTIC_RERANK_LIMIT = 40 | |
| start_time = time.time() | |
| original_query = str(query).strip() | |
| response = {"search_log": [], | |
| "results_by_layer": {"literal_matches": [], "exact_matches": [], "logical_matches": [], | |
| "almost_exact_matches": [], "contains_matches": [], | |
| "term_matches": [], "keyword_matches": []}, | |
| "final_semantic_results": [], "was_corrected": False, "original_query": original_query, | |
| "corrected_query": ""} | |
| if not original_query: | |
| response["search_log"].append("Query vazia, busca não realizada.") | |
| return response | |
| response["search_log"].append(f"Buscando por: '{original_query}'") | |
| # ETAPA 1: CORREÇÃO ORTOGRÁFICA PRÉ-BUSCA | |
| stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} | |
| query_after_correction = original_query | |
| original_correction_corpus, normalized_correction_corpus = correction_corpus | |
| valid_words = portuguese_word_set.union(db_word_set) | |
| if valid_words and original_correction_corpus: | |
| words_from_query, corrected_words, made_correction = original_query.split(), [], False | |
| for word in words_from_query: | |
| norm_word = normalize_text(word) | |
| if norm_word in stopwords or len(norm_word) < 4: | |
| corrected_words.append(word) | |
| continue | |
| if norm_word not in valid_words: | |
| match_norm, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.token_set_ratio) | |
| if score >= 85: | |
| match_index = normalized_correction_corpus.index(match_norm) | |
| corrected_word_original = original_correction_corpus[match_index] | |
| corrected_word = corrected_word_original | |
| if word.istitle(): corrected_word = corrected_word.title() | |
| elif word.isupper(): corrected_word = corrected_word.upper() | |
| corrected_words.append(corrected_word) | |
| made_correction = True | |
| else: | |
| corrected_words.append(word) | |
| else: | |
| corrected_words.append(word) | |
| if made_correction: | |
| query_after_correction = " ".join(corrected_words) | |
| response["was_corrected"] = True | |
| response["corrected_query"] = query_after_correction | |
| response["search_log"].append(f"Query corrigida para: '{query_after_correction}'.") | |
| # ETAPA 2: PREPARAÇÃO DAS QUERIES PARA AS CAMADAS | |
| literal_query = literal_normalize_text(query_after_correction) | |
| cleaned_query = " ".join([word for word in query_after_correction.split() if normalize_text(word) not in stopwords]) | |
| normalized_query = normalize_text(cleaned_query) | |
| if not cleaned_query.strip() and not literal_query.strip(): | |
| response["search_log"].append("Query resultante é vazia. Busca não realizada.") | |
| return response | |
| if cleaned_query != query_after_correction: | |
| response["search_log"].append(f"Query limpa (sem stop words): '{cleaned_query}'") | |
| # Para proibir buscas com apenas um caractere | |
| if len(cleaned_query.strip()) <= 1: | |
| # Pega o termo que seria buscado para exibir no log. Pode ser vazio. | |
| term_to_log = cleaned_query.strip() | |
| response["search_log"].append(f"Busca por '{term_to_log}' ignorada. A busca deve conter no mínimo 2 caracteres.") | |
| response["final_semantic_results"] = [] | |
| return response | |
| # ETAPA 3: EXECUÇÃO DA BUSCA | |
| exit_layer_name = _run_search_layers(literal_query, normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, | |
| idf_scores, limit_per_layer) | |
| # ETAPA 4: AGREGAÇÃO E REORDENAÇÃO DOS RESULTADOS | |
| all_candidates = [] | |
| layer_order = ["literal_matches", "exact_matches", "logical_matches", "almost_exact_matches", "contains_matches", "term_matches", "keyword_matches"] | |
| layer_names_map = {"literal_matches": "0. Busca Literal", "exact_matches": "1. Normalizada Exata", "logical_matches": "2. Lógica 'E'", | |
| "almost_exact_matches": "3. Quase Exatos (Fuzzy)", "contains_matches": "4. Termos Validados", | |
| "term_matches": "5. Busca Ponderada (IDF)", "keyword_matches": "6. Fallback (Palavra-Chave)"} | |
| if exit_layer_name: | |
| response["search_log"].append(f"--- [OTIMIZAÇÃO] Resultado de alta confiança encontrado na camada '{exit_layer_name}'. Busca interrompida. ---") | |
| response["search_log"].append("\n--- Detalhamento da Busca por Camadas ---") | |
| for layer_key in layer_order: | |
| layer_results = response["results_by_layer"].get(layer_key, []) | |
| num_results = len(layer_results) | |
| response["search_log"].append(f"[{layer_names_map.get(layer_key, layer_key)}]: {num_results} resultado(s)") | |
| all_candidates.extend(layer_results) | |
| # Lógica de feedback do usuário | |
| feedback_prioritized_tuss_votes = {} | |
| if user_best_matches_counts and all_candidates: | |
| query_norm_for_feedback = normalize_text(response.get("corrected_query") or original_query) | |
| feedback_for_query = user_best_matches_counts.get(query_norm_for_feedback, {}) | |
| for tuss_code, votes in feedback_for_query.items(): | |
| if votes >= user_feedback_threshold: | |
| feedback_prioritized_tuss_votes[tuss_code] = votes | |
| if feedback_prioritized_tuss_votes: | |
| response["search_log"].append(f"\nFeedback de usuários qualificado encontrado.") | |
| for result in all_candidates: | |
| if result.get('Codigo_TUSS') in feedback_prioritized_tuss_votes: | |
| result['is_user_best_match'] = True | |
| result['feedback_votes'] = feedback_prioritized_tuss_votes[result.get('Codigo_TUSS')] | |
| response["search_log"].append(f"\n--- Análise e Reordenação ---\nTotal de candidatos encontrados: {len(all_candidates)}") | |
| query_for_highlight = response.get("corrected_query") or cleaned_query | |
| all_candidates = _highlight_matches(all_candidates, query_for_highlight) | |
| final_list = [] | |
| if all_candidates: | |
| query_for_semantic = response.get("corrected_query") or cleaned_query | |
| prioritized_candidates = [res for res in all_candidates if res.get('is_user_best_match')] | |
| non_prioritized_candidates = [res for res in all_candidates if not res.get('is_user_best_match')] | |
| if semantic_model and prioritized_candidates: | |
| reranked_prioritized = rerank_with_semantic_model(query_for_semantic, prioritized_candidates, semantic_model) | |
| prioritized_results_sorted = sorted(reranked_prioritized, key=lambda x: (x.get('feedback_votes', 0), x.get('semantic_score', 0)), reverse=True) | |
| else: | |
| prioritized_results_sorted = sorted(prioritized_candidates, key=lambda x: (x.get('feedback_votes', 0), x.get('score', 0)), reverse=True) | |
| final_list.extend(prioritized_results_sorted) | |
| if semantic_model and non_prioritized_candidates: | |
| candidates_for_rerank = non_prioritized_candidates[:SEMANTIC_RERANK_LIMIT] | |
| reranked_non_prioritized = rerank_with_semantic_model(query_for_semantic, candidates_for_rerank, semantic_model) | |
| final_list.extend(reranked_non_prioritized) | |
| seen_reranked_codes = {r.get('Codigo_TUSS') for r in reranked_non_prioritized} | |
| for candidate in non_prioritized_candidates: | |
| if candidate.get('Codigo_TUSS') not in seen_reranked_codes: | |
| final_list.append(candidate) | |
| else: | |
| final_list.extend(sorted(non_prioritized_candidates, key=lambda x: x.get('score', 0), reverse=True)) | |
| response["search_log"].append(f"Lista final de resultados combinada: {len(final_list)} itens antes do limite.") | |
| response["final_semantic_results"] = final_list[:10] | |
| else: | |
| response["search_log"].append("Nenhum resultado final para exibir.") | |
| response["final_semantic_results"] = [] | |
| end_time = time.time() | |
| response["search_duration_seconds"] = round(end_time - start_time, 4) | |
| response["search_log"].append(f"\nBusca completa em {response['search_duration_seconds']} segundos.") | |
| print(f"\n\n==================== LOG DE DEPURAÇÃO (QUERY: '{original_query}') ====================") | |
| return response |