Spaces:
Sleeping
Sleeping
| # enhanced_search_v2.py (Versão Final com Normalização Literal Corrigida) | |
| ################################################################################################### | |
| # | |
| # RESUMO DAS CORREÇÕES E MELHORIAS: | |
| # | |
| # 1. NORMALIZAÇÃO LITERAL INTELIGENTE (CORREÇÃO PRINCIPAL): | |
| # - A função `literal_normalize_text` foi aprimorada para remover símbolos e espaços | |
| # do INÍCIO e do FIM da string, tanto na query quanto nos dados do banco. | |
| # - Isso corrige o bug em que o "Early Exit" falhava com queries que continham lixo | |
| # de copiar/colar (como '/////'), restaurando a inteligência da busca literal. | |
| # | |
| # 2. PRÉ-PROCESSAMENTO ROBUSTO MANTIDO: | |
| # - A lógica de correção ortográfica criteriosa, que preserva a ordem e só corrige | |
| # palavras fora dos dicionários, permanece. | |
| # | |
| # 3. ARQUITETURA FINAL MANTIDA: | |
| # - O fluxo com Early Exit, Blocos Protegido/Amplo e Otimização com IA continua sendo | |
| # a base do buscador. | |
| # | |
| ################################################################################################### | |
| import pandas as pd | |
| import re | |
| from thefuzz import process, fuzz | |
| from unidecode import unidecode | |
| import time | |
| from sentence_transformers import util | |
| import torch | |
| import math | |
| from collections import defaultdict | |
| from rank_bm25 import BM25Okapi | |
| # --- Bloco 1: Funções Auxiliares de Normalização e Limpeza --- # | |
| def literal_normalize_text(text): | |
| """ | |
| Normalização para a busca literal. Remove acentos, converte para minúsculas | |
| e remove quaisquer símbolos/espaços do início e do fim da string. | |
| """ | |
| if pd.isna(text): return "" | |
| normalized = unidecode(str(text).lower()) | |
| # CORREÇÃO: Remove lixo do início e do fim, mas preserva símbolos internos (ex: / em multissitio/ressincronizador) | |
| return re.sub(r'^\W+|\W+$', '', normalized).strip() | |
| def clean_symbols_from_query(text): | |
| """Remove uma gama completa de aspas e símbolos da query para a busca geral.""" | |
| if pd.isna(text): return "" | |
| return re.sub(r"[´`'\"/*]", "", str(text)).strip() | |
| def normalize_text(text): | |
| """Normalização completa, limpando tudo exceto letras, números e espaços.""" | |
| if pd.isna(text): return "" | |
| normalized = unidecode(str(text).lower()) | |
| normalized = re.sub(r'[^\w\s]', ' ', normalized) | |
| return re.sub(r'\s+', ' ', normalized).strip() | |
| def get_longest_word(query_text): | |
| """Função de fallback para encontrar a palavra mais longa em uma query.""" | |
| words = re.findall(r'\b\w{4,}\b', query_text) | |
| if not words: return "" | |
| return max(words, key=len) | |
| # --- Bloco 2: Funções de Formatação e Destaque de Resultados --- # | |
| def format_result(row_data, row_index, match_type="", score=0, **kwargs): | |
| """Formata uma linha do DataFrame em um dicionário de resultado padronizado, aplicando regras de negócio.""" | |
| data = row_data.copy() | |
| is_rol = data.get('Correlacao_Rol', '').strip().lower() == 'sim' | |
| if not is_rol: | |
| data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', '' | |
| data['PAC'], data['DUT'] = '---', '---' | |
| else: | |
| data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não' | |
| original_dut_value = data.get('DUT', '').strip() | |
| if original_dut_value and original_dut_value.replace('.', '', 1).isdigit(): | |
| data['DUT'] = f'Sim, DUT nº {original_dut_value}' | |
| else: data['DUT'] = 'Não' | |
| standard_columns = [ 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico' ] | |
| formatted_data = {col: data.get(col, '') for col in standard_columns} | |
| result = { "row_index": row_index, "score": round(score), "text_score": round(score), "semantic_score": 0, "match_type": match_type, "is_rol_procedure": is_rol } | |
| result.update(formatted_data) | |
| result.update(kwargs) | |
| return result | |
| def _highlight_matches(results, query): | |
| """Adiciona tags <b></b> ao redor das palavras da query nos resultados para destaque no frontend.""" | |
| if not query or not results: return results | |
| stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} | |
| query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords} | |
| cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] | |
| for result in results: | |
| for col in cols_to_highlight: | |
| original_text = result.get(col, '') | |
| if original_text and query_words: | |
| highlighted_text = original_text | |
| for word in sorted(list(query_words), key=len, reverse=True): | |
| pattern = r'\b(' + re.escape(word) + r')\b' | |
| highlighted_text = re.sub(pattern, r'<b>\1</b>', highlighted_text, flags=re.IGNORECASE) | |
| result[f"{col}_highlighted"] = highlighted_text | |
| else: | |
| result[f"{col}_highlighted"] = original_text | |
| return results | |
| # --- Bloco 3: Funções de Carregamento de Dados e Modelos --- # | |
| def load_and_prepare_database(db_path): | |
| """Carrega a base de dados principal, a normaliza e pré-calcula todas as estruturas de dados para a busca.""" | |
| try: | |
| print(f"Carregando e preparando a base de dados de: {db_path}...") | |
| df_original = pd.read_csv(db_path, dtype=str).fillna('') | |
| search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO'] | |
| df_normalized = df_original.copy() | |
| df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text) | |
| df_normalized['Procedimento_Rol_literal'] = df_normalized['Procedimento_Rol'].apply(literal_normalize_text) | |
| df_normalized['Descricao_TUSS_literal'] = df_normalized['Descricao_TUSS'].apply(literal_normalize_text) | |
| df_normalized['full_text_norm'] = "" | |
| for col in search_cols: | |
| if col in df_normalized.columns: | |
| df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text) | |
| df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm'] | |
| print("Criando modelo BM25 e calculando frequência de palavras...") | |
| tokenized_corpus = [doc.split() for doc in df_normalized['full_text_norm']] | |
| bm25_model = BM25Okapi(tokenized_corpus, k1=1.2) | |
| print(f"Modelo BM25 otimizado com k1={bm25_model.k1}.") | |
| doc_freq = defaultdict(int) | |
| for doc_words in tokenized_corpus: | |
| for word in set(doc_words): doc_freq[word] += 1 | |
| print("Combinando frequências de palavras (masculino/feminino)...") | |
| combined_doc_freq = {} | |
| processed_words = set() | |
| for word, freq in doc_freq.items(): | |
| if word in processed_words: continue | |
| pair_word = None | |
| if word.endswith('o'): pair_word = word[:-1] + 'a' | |
| elif word.endswith('a'): pair_word = word[:-1] + 'o' | |
| if pair_word and pair_word in doc_freq: | |
| combined_freq = freq + doc_freq[pair_word] | |
| combined_doc_freq[word] = combined_freq | |
| combined_doc_freq[pair_word] = combined_freq | |
| processed_words.add(word); processed_words.add(pair_word) | |
| else: | |
| combined_doc_freq[word] = freq | |
| print("Criando corpus para busca fuzzy...") | |
| fuzzy_search_corpus = [] | |
| for index, row in df_normalized.iterrows(): | |
| for col in search_cols: | |
| if f'{col}_norm' in row and pd.notna(row[f'{col}_norm']): | |
| if val := row[f'{col}_norm']: fuzzy_search_corpus.append((val, index)) | |
| print(f"Base de dados pronta com {len(df_original)} procedimentos.") | |
| return df_original, df_normalized, fuzzy_search_corpus, bm25_model, combined_doc_freq | |
| except Exception as e: | |
| print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise | |
| def load_correction_corpus(dict_path, column_name='Termo_Correto'): | |
| """Carrega um corpus para correção ortográfica, retornando termos originais, normalizados e um set de palavras.""" | |
| try: | |
| df_dict = pd.read_csv(dict_path, dtype=str).fillna('') | |
| if column_name not in df_dict.columns: return [], [], set() | |
| original_corpus = df_dict[column_name].dropna().astype(str).tolist() | |
| normalized_corpus = [normalize_text(term) for term in original_corpus] | |
| db_word_set = {word for term in normalized_corpus for word in term.split()} | |
| return original_corpus, normalized_corpus, db_word_set | |
| except (FileNotFoundError, Exception): return [], [], set() | |
| def load_general_dictionary(path): | |
| """Carrega o dicionário geral de português.""" | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: words = {normalize_text(line.strip()) for line in f if line.strip()} | |
| print(f"Dicionário geral carregado com {len(words)} palavras.") | |
| return words | |
| except (FileNotFoundError, Exception): return set() | |
| # --- Bloco 4: Funções de Reclassificação, Boosts e IA --- # | |
| def create_unified_document_text(result_dict): | |
| """Cria um único texto representativo de um procedimento para ser usado pelo Cross-Encoder da IA.""" | |
| text_parts = { result_dict.get(key, '') for key in ['Descricao_TUSS', 'Procedimento_Rol', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4']} | |
| return ". ".join(sorted([part for part in text_parts if part and str(part).strip()])) | |
| def rerank_with_cross_encoder(query, results_list, model): | |
| """Usa a IA (Cross-Encoder) para calcular o score semântico. Para o Bloco Amplo, também reordena a lista.""" | |
| if not model or not results_list or not query: | |
| return results_list, "Cross-Encoder não fornecido ou lista de candidatos vazia." | |
| sentence_pairs = [[query, create_unified_document_text(result)] for result in results_list] | |
| if not sentence_pairs: return results_list, "Não foram encontrados pares para reordenar." | |
| try: | |
| raw_scores = model.predict(sentence_pairs, show_progress_bar=False) | |
| semantic_scores = torch.sigmoid(torch.tensor(raw_scores)).numpy() * 100 | |
| for i, result in enumerate(results_list): result['semantic_score'] = round(semantic_scores[i]) | |
| def conditional_hybrid_sort_key(result): | |
| sem_score = result.get('semantic_score', 0); txt_score = result.get('text_score', 0) | |
| if sem_score >= 85: return (1, sem_score, txt_score) | |
| else: return (0, (sem_score * 0.6) + (txt_score * 0.4), sem_score) | |
| reranked_results = sorted(results_list, key=conditional_hybrid_sort_key, reverse=True) | |
| log_message = f"Scores semânticos calculados para {len(reranked_results)} candidatos." | |
| return reranked_results, log_message | |
| except Exception as e: | |
| log_message = f"Erro no Cross-Encoder: {e}"; print(log_message) | |
| return results_list, log_message | |
| def apply_boosts(results, query_words, doc_freq): | |
| """Aplica boosts de relevância para jargões e termos ultra-raros ao Bloco Amplo.""" | |
| if not results or not query_words: return results, [] | |
| log_messages = [] | |
| jargon_terms = {w for w in query_words if doc_freq.get(w, 0) <= 15 and len(w) > 4} | |
| if jargon_terms: | |
| boosted_count = 0 | |
| for r in results: | |
| text = normalize_text(create_unified_document_text(r)) | |
| if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in jargon_terms): | |
| r['text_score'] = min(r['text_score'] * 1.2, 97) | |
| boosted_count += 1 | |
| if boosted_count > 0: log_messages.append(f"Boost de Jargão para '{list(jargon_terms)}' ({boosted_count} afetados).") | |
| rare_terms = {w for w in query_words if doc_freq.get(w, 0) <= 5 and len(w) > 4} | |
| if rare_terms: | |
| boosted_count = 0 | |
| for r in results: | |
| text = normalize_text(create_unified_document_text(r)) | |
| if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in rare_terms): | |
| r['text_score'] = min(r['text_score'] * 1.3, 97) | |
| boosted_count += 1 | |
| if boosted_count > 0: log_messages.append(f"Boost Ultra-Raro para '{list(rare_terms)}' ({boosted_count} afetados).") | |
| for r in results: r['score'] = r['text_score'] | |
| return results, log_messages | |
| # --- Bloco 5: Função Principal de Orquestração da Busca --- # | |
| def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus, | |
| valid_words_set, bm25_model, doc_freq, | |
| cross_encoder_model=None, user_best_matches_counts=None, user_feedback_threshold=10): | |
| start_time = time.time(); original_query = str(query).strip() | |
| response = { "search_log": [], "final_semantic_results": [], "original_query": original_query, "corrected_query": "", "was_corrected": False } | |
| if not original_query: response["search_log"].append("Query vazia."); return response | |
| response["search_log"].append(f"Buscando por: '{original_query}'") | |
| # --- Etapa 1: Pré-processamento inicial --- | |
| query_para_literal = literal_normalize_text(original_query) | |
| response["search_log"].append(f"\n--- Etapa 1: Pré-processamento da Query ---") | |
| response["search_log"].append(f"Query para busca literal (após limpeza de bordas): '{query_para_literal}'") | |
| # --- Etapa 2: Camada de Early Exit --- | |
| response["search_log"].append("\n--- Etapa 2: Camada de Early Exit (Literal) ---") | |
| literal_results = [] | |
| seen_indices = set() | |
| literal_fields = ['Codigo_TUSS_literal', 'Procedimento_Rol_literal', 'Descricao_TUSS_literal'] | |
| for field in literal_fields: | |
| mask = df_normalized[field] == query_para_literal | |
| for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]: | |
| literal_results.append(format_result(df_original.loc[index], index, "Literal Exata", 100)) | |
| seen_indices.add(index) | |
| if literal_results: | |
| response["search_log"].append(f"Encontrado(s) {len(literal_results)} resultado(s) literal(is). ATIVANDO EARLY EXIT.") | |
| results_with_semantic, log_msg = rerank_with_cross_encoder(normalize_text(original_query), literal_results, cross_encoder_model) | |
| response["search_log"].append(f"Cálculo de Score Semântico: {log_msg}") | |
| final_list = sorted(results_with_semantic, key=lambda x: x['text_score'], reverse=True) | |
| response["final_semantic_results"] = _highlight_matches(final_list, original_query) | |
| end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4) | |
| response["search_log"].append(f"\nBusca concluída em {response['search_duration_seconds']} segundos.") | |
| print("\n".join(response["search_log"])) | |
| return response | |
| else: | |
| response["search_log"].append("Nenhum resultado literal encontrado. Prosseguindo para a busca completa.") | |
| # --- Etapa 3: Busca Completa (se não houve Early Exit) --- | |
| query_para_geral = clean_symbols_from_query(original_query) | |
| response["search_log"].append(f"Query para busca geral (após limpeza de símbolos): '{query_para_geral}'") | |
| original_correction_corpus, normalized_correction_corpus = correction_corpus | |
| stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} | |
| final_query_words = [] | |
| words_were_corrected = False | |
| for word in query_para_geral.split(): | |
| norm_word = normalize_text(word) | |
| is_candidate_for_correction = ( len(norm_word) > 2 and norm_word not in stopwords and not norm_word.isdigit() and norm_word not in valid_words_set ) | |
| if is_candidate_for_correction: | |
| best_match, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.ratio) | |
| if score > 85: | |
| original_term_index = normalized_correction_corpus.index(best_match) | |
| corrected_word = original_correction_corpus[original_term_index] | |
| final_query_words.append(corrected_word) | |
| words_were_corrected = True | |
| else: | |
| final_query_words.append(word) | |
| else: | |
| final_query_words.append(word) | |
| query_corrigida = " ".join(final_query_words) | |
| if words_were_corrected and query_corrigida.strip() != query_para_geral.strip(): | |
| response["was_corrected"] = True | |
| response["corrected_query"] = query_corrigida | |
| response["search_log"].append(f"Correção ortográfica aplicada: '{query_para_geral}' -> '{query_corrigida}'") | |
| normalized_query = normalize_text(query_corrigida) | |
| query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1] | |
| final_list = [] | |
| response["search_log"].append("\n--- Etapa 3.1: Verificação de Feedback de Usuário ---") | |
| query_norm_fb = normalize_text(response.get("corrected_query") or original_query) | |
| if user_best_matches_counts and query_norm_fb in user_best_matches_counts: | |
| tuss_codes_by_votes = sorted(user_best_matches_counts[query_norm_fb].items(), key=lambda item: item[1], reverse=True) | |
| for tuss_code, votes in tuss_codes_by_votes: | |
| if votes >= user_feedback_threshold: | |
| matching_rows = df_original[df_original['Codigo_TUSS'] == tuss_code] | |
| for index, row in matching_rows.iterrows(): | |
| if index not in seen_indices: | |
| result = format_result(row, index, "Prioridade por Feedback", 101) | |
| result['feedback_votes'] = votes | |
| final_list.append(result) | |
| seen_indices.add(index) | |
| response["search_log"].append(f"{len(final_list)} resultado(s) adicionado(s) por feedback de usuário.") | |
| protected_candidates = [] | |
| response["search_log"].append("\n--- Etapa 3.2: Coleta de Candidatos do Bloco Protegido ---") | |
| exact_fields = [f"{col}_norm" for col in ['Codigo_TUSS', 'Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']] | |
| for field in exact_fields: | |
| if field in df_normalized.columns: | |
| for index in df_normalized.index[(df_normalized[field] == normalized_query) & (~df_normalized.index.isin(seen_indices))]: | |
| protected_candidates.append(format_result(df_original.loc[index], index, "Exato", 100)); seen_indices.add(index) | |
| if len(query_words) > 1: | |
| phrase_pattern = r'\b' + re.escape(normalized_query) + r'\b' | |
| for index in df_normalized.index[df_normalized['full_text_norm'].str.contains(phrase_pattern, na=False, regex=True) & ~df_normalized.index.isin(seen_indices)]: | |
| protected_candidates.append(format_result(df_original.loc[index], index, "Frase Exata", 99)); seen_indices.add(index) | |
| search_fields_norm = [f"{col}_norm" for col in ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']] | |
| for field in search_fields_norm: | |
| if field in df_normalized.columns: | |
| mask = pd.Series(True, index=df_normalized.index) | |
| for word in query_words: mask &= df_normalized[field].str.contains(r'\b' + re.escape(word) + r'\b', na=False) | |
| for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]: | |
| protected_candidates.append(format_result(df_original.loc[index], index, "Lógica (E)", 98)); seen_indices.add(index) | |
| broad_candidates = [] | |
| response["search_log"].append("\n--- Etapa 3.3: Coleta de Candidatos do Bloco Amplo ---") | |
| for match_text, score in process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], scorer=fuzz.token_set_ratio, limit=20, score_cutoff=88): | |
| for _, index in [item for item in fuzzy_search_corpus if item[0] == match_text]: | |
| if index not in seen_indices: | |
| broad_candidates.append(format_result(df_original.loc[index], index, "Aproximação", score)); seen_indices.add(index) | |
| if query_words: | |
| bm25_scores = bm25_model.get_scores(query_words) | |
| max_score = max(bm25_scores) if any(s > 0 for s in bm25_scores) else 1.0 | |
| top_n_indices = sorted(range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True)[:50] | |
| for i in top_n_indices: | |
| if bm25_scores[i] > 0 and df_normalized.index[i] not in seen_indices: | |
| index = df_normalized.index[i] | |
| broad_candidates.append(format_result(df_original.loc[index], index, "Relevância (BM25)", (bm25_scores[i]/max_score)*95)); seen_indices.add(index) | |
| response["search_log"].append("\n--- Etapa 4: Processamento dos Blocos ---") | |
| if protected_candidates: | |
| unique_protected = list({r['row_index']: r for r in protected_candidates}.values()) | |
| protected_with_scores, log_msg = rerank_with_cross_encoder(normalized_query, unique_protected, cross_encoder_model) | |
| response["search_log"].append(f"Bloco Protegido: {log_msg}. Ordem textual mantida.") | |
| final_list.extend(sorted(protected_with_scores, key=lambda x: x['text_score'], reverse=True)) | |
| if broad_candidates: | |
| unique_broad = list({r['row_index']: r for r in broad_candidates}.values()) | |
| unique_broad_sorted = sorted(unique_broad, key=lambda x: x['text_score'], reverse=True)[:30] | |
| response["search_log"].append(f"Bloco Amplo: {len(unique_broad_sorted)} candidatos selecionados para otimização.") | |
| boosted_broad, boost_logs = apply_boosts(unique_broad_sorted, query_words, doc_freq) | |
| for log in boost_logs: response["search_log"].append(log) | |
| reranked_broad, log_msg = rerank_with_cross_encoder(normalized_query, boosted_broad, cross_encoder_model) | |
| response["search_log"].append(f"Bloco Amplo: {log_msg}") | |
| final_list.extend(reranked_broad) | |
| response["final_semantic_results"] = _highlight_matches(final_list[:20], query_corrigida) | |
| end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4) | |
| response["search_log"].append(f"\nBusca completa em {response['search_duration_seconds']} segundos.") | |
| print("\n\n" + "="*20 + " LOG DE BUSCA FINAL " + "="*20) | |
| print("\n".join(response["search_log"])) | |
| return response |