# enhanced_search_v2.py (Versão Final com Normalização Literal Corrigida) ################################################################################################### # # RESUMO DAS CORREÇÕES E MELHORIAS: # # 1. NORMALIZAÇÃO LITERAL INTELIGENTE (CORREÇÃO PRINCIPAL): # - A função `literal_normalize_text` foi aprimorada para remover símbolos e espaços # do INÍCIO e do FIM da string, tanto na query quanto nos dados do banco. # - Isso corrige o bug em que o "Early Exit" falhava com queries que continham lixo # de copiar/colar (como '/////'), restaurando a inteligência da busca literal. # # 2. PRÉ-PROCESSAMENTO ROBUSTO MANTIDO: # - A lógica de correção ortográfica criteriosa, que preserva a ordem e só corrige # palavras fora dos dicionários, permanece. # # 3. ARQUITETURA FINAL MANTIDA: # - O fluxo com Early Exit, Blocos Protegido/Amplo e Otimização com IA continua sendo # a base do buscador. # ################################################################################################### import pandas as pd import re from thefuzz import process, fuzz from unidecode import unidecode import time from sentence_transformers import util import torch import math from collections import defaultdict from rank_bm25 import BM25Okapi # --- Bloco 1: Funções Auxiliares de Normalização e Limpeza --- # def literal_normalize_text(text): """ Normalização para a busca literal. Remove acentos, converte para minúsculas e remove quaisquer símbolos/espaços do início e do fim da string. """ if pd.isna(text): return "" normalized = unidecode(str(text).lower()) # CORREÇÃO: Remove lixo do início e do fim, mas preserva símbolos internos (ex: / em multissitio/ressincronizador) return re.sub(r'^\W+|\W+$', '', normalized).strip() def clean_symbols_from_query(text): """Remove uma gama completa de aspas e símbolos da query para a busca geral.""" if pd.isna(text): return "" return re.sub(r"[´`'\"/*]", "", str(text)).strip() def normalize_text(text): """Normalização completa, limpando tudo exceto letras, números e espaços.""" if pd.isna(text): return "" normalized = unidecode(str(text).lower()) normalized = re.sub(r'[^\w\s]', ' ', normalized) return re.sub(r'\s+', ' ', normalized).strip() def get_longest_word(query_text): """Função de fallback para encontrar a palavra mais longa em uma query.""" words = re.findall(r'\b\w{4,}\b', query_text) if not words: return "" return max(words, key=len) # --- Bloco 2: Funções de Formatação e Destaque de Resultados --- # def format_result(row_data, row_index, match_type="", score=0, **kwargs): """Formata uma linha do DataFrame em um dicionário de resultado padronizado, aplicando regras de negócio.""" data = row_data.copy() is_rol = data.get('Correlacao_Rol', '').strip().lower() == 'sim' if not is_rol: data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', '' data['PAC'], data['DUT'] = '---', '---' else: data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não' original_dut_value = data.get('DUT', '').strip() if original_dut_value and original_dut_value.replace('.', '', 1).isdigit(): data['DUT'] = f'Sim, DUT nº {original_dut_value}' else: data['DUT'] = 'Não' standard_columns = [ 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico' ] formatted_data = {col: data.get(col, '') for col in standard_columns} result = { "row_index": row_index, "score": round(score), "text_score": round(score), "semantic_score": 0, "match_type": match_type, "is_rol_procedure": is_rol } result.update(formatted_data) result.update(kwargs) return result def _highlight_matches(results, query): """Adiciona tags ao redor das palavras da query nos resultados para destaque no frontend.""" if not query or not results: return results stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords} cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] for result in results: for col in cols_to_highlight: original_text = result.get(col, '') if original_text and query_words: highlighted_text = original_text for word in sorted(list(query_words), key=len, reverse=True): pattern = r'\b(' + re.escape(word) + r')\b' highlighted_text = re.sub(pattern, r'\1', highlighted_text, flags=re.IGNORECASE) result[f"{col}_highlighted"] = highlighted_text else: result[f"{col}_highlighted"] = original_text return results # --- Bloco 3: Funções de Carregamento de Dados e Modelos --- # def load_and_prepare_database(db_path): """Carrega a base de dados principal, a normaliza e pré-calcula todas as estruturas de dados para a busca.""" try: print(f"Carregando e preparando a base de dados de: {db_path}...") df_original = pd.read_csv(db_path, dtype=str).fillna('') search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO'] df_normalized = df_original.copy() df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text) df_normalized['Procedimento_Rol_literal'] = df_normalized['Procedimento_Rol'].apply(literal_normalize_text) df_normalized['Descricao_TUSS_literal'] = df_normalized['Descricao_TUSS'].apply(literal_normalize_text) df_normalized['full_text_norm'] = "" for col in search_cols: if col in df_normalized.columns: df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text) df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm'] print("Criando modelo BM25 e calculando frequência de palavras...") tokenized_corpus = [doc.split() for doc in df_normalized['full_text_norm']] bm25_model = BM25Okapi(tokenized_corpus, k1=1.2) print(f"Modelo BM25 otimizado com k1={bm25_model.k1}.") doc_freq = defaultdict(int) for doc_words in tokenized_corpus: for word in set(doc_words): doc_freq[word] += 1 print("Combinando frequências de palavras (masculino/feminino)...") combined_doc_freq = {} processed_words = set() for word, freq in doc_freq.items(): if word in processed_words: continue pair_word = None if word.endswith('o'): pair_word = word[:-1] + 'a' elif word.endswith('a'): pair_word = word[:-1] + 'o' if pair_word and pair_word in doc_freq: combined_freq = freq + doc_freq[pair_word] combined_doc_freq[word] = combined_freq combined_doc_freq[pair_word] = combined_freq processed_words.add(word); processed_words.add(pair_word) else: combined_doc_freq[word] = freq print("Criando corpus para busca fuzzy...") fuzzy_search_corpus = [] for index, row in df_normalized.iterrows(): for col in search_cols: if f'{col}_norm' in row and pd.notna(row[f'{col}_norm']): if val := row[f'{col}_norm']: fuzzy_search_corpus.append((val, index)) print(f"Base de dados pronta com {len(df_original)} procedimentos.") return df_original, df_normalized, fuzzy_search_corpus, bm25_model, combined_doc_freq except Exception as e: print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise def load_correction_corpus(dict_path, column_name='Termo_Correto'): """Carrega um corpus para correção ortográfica, retornando termos originais, normalizados e um set de palavras.""" try: df_dict = pd.read_csv(dict_path, dtype=str).fillna('') if column_name not in df_dict.columns: return [], [], set() original_corpus = df_dict[column_name].dropna().astype(str).tolist() normalized_corpus = [normalize_text(term) for term in original_corpus] db_word_set = {word for term in normalized_corpus for word in term.split()} return original_corpus, normalized_corpus, db_word_set except (FileNotFoundError, Exception): return [], [], set() def load_general_dictionary(path): """Carrega o dicionário geral de português.""" try: with open(path, 'r', encoding='utf-8') as f: words = {normalize_text(line.strip()) for line in f if line.strip()} print(f"Dicionário geral carregado com {len(words)} palavras.") return words except (FileNotFoundError, Exception): return set() # --- Bloco 4: Funções de Reclassificação, Boosts e IA --- # def create_unified_document_text(result_dict): """Cria um único texto representativo de um procedimento para ser usado pelo Cross-Encoder da IA.""" text_parts = { result_dict.get(key, '') for key in ['Descricao_TUSS', 'Procedimento_Rol', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4']} return ". ".join(sorted([part for part in text_parts if part and str(part).strip()])) def rerank_with_cross_encoder(query, results_list, model): """Usa a IA (Cross-Encoder) para calcular o score semântico. Para o Bloco Amplo, também reordena a lista.""" if not model or not results_list or not query: return results_list, "Cross-Encoder não fornecido ou lista de candidatos vazia." sentence_pairs = [[query, create_unified_document_text(result)] for result in results_list] if not sentence_pairs: return results_list, "Não foram encontrados pares para reordenar." try: raw_scores = model.predict(sentence_pairs, show_progress_bar=False) semantic_scores = torch.sigmoid(torch.tensor(raw_scores)).numpy() * 100 for i, result in enumerate(results_list): result['semantic_score'] = round(semantic_scores[i]) def conditional_hybrid_sort_key(result): sem_score = result.get('semantic_score', 0); txt_score = result.get('text_score', 0) if sem_score >= 85: return (1, sem_score, txt_score) else: return (0, (sem_score * 0.6) + (txt_score * 0.4), sem_score) reranked_results = sorted(results_list, key=conditional_hybrid_sort_key, reverse=True) log_message = f"Scores semânticos calculados para {len(reranked_results)} candidatos." return reranked_results, log_message except Exception as e: log_message = f"Erro no Cross-Encoder: {e}"; print(log_message) return results_list, log_message def apply_boosts(results, query_words, doc_freq): """Aplica boosts de relevância para jargões e termos ultra-raros ao Bloco Amplo.""" if not results or not query_words: return results, [] log_messages = [] jargon_terms = {w for w in query_words if doc_freq.get(w, 0) <= 15 and len(w) > 4} if jargon_terms: boosted_count = 0 for r in results: text = normalize_text(create_unified_document_text(r)) if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in jargon_terms): r['text_score'] = min(r['text_score'] * 1.2, 97) boosted_count += 1 if boosted_count > 0: log_messages.append(f"Boost de Jargão para '{list(jargon_terms)}' ({boosted_count} afetados).") rare_terms = {w for w in query_words if doc_freq.get(w, 0) <= 5 and len(w) > 4} if rare_terms: boosted_count = 0 for r in results: text = normalize_text(create_unified_document_text(r)) if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in rare_terms): r['text_score'] = min(r['text_score'] * 1.3, 97) boosted_count += 1 if boosted_count > 0: log_messages.append(f"Boost Ultra-Raro para '{list(rare_terms)}' ({boosted_count} afetados).") for r in results: r['score'] = r['text_score'] return results, log_messages # --- Bloco 5: Função Principal de Orquestração da Busca --- # def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus, valid_words_set, bm25_model, doc_freq, cross_encoder_model=None, user_best_matches_counts=None, user_feedback_threshold=10): start_time = time.time(); original_query = str(query).strip() response = { "search_log": [], "final_semantic_results": [], "original_query": original_query, "corrected_query": "", "was_corrected": False } if not original_query: response["search_log"].append("Query vazia."); return response response["search_log"].append(f"Buscando por: '{original_query}'") # --- Etapa 1: Pré-processamento inicial --- query_para_literal = literal_normalize_text(original_query) response["search_log"].append(f"\n--- Etapa 1: Pré-processamento da Query ---") response["search_log"].append(f"Query para busca literal (após limpeza de bordas): '{query_para_literal}'") # --- Etapa 2: Camada de Early Exit --- response["search_log"].append("\n--- Etapa 2: Camada de Early Exit (Literal) ---") literal_results = [] seen_indices = set() literal_fields = ['Codigo_TUSS_literal', 'Procedimento_Rol_literal', 'Descricao_TUSS_literal'] for field in literal_fields: mask = df_normalized[field] == query_para_literal for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]: literal_results.append(format_result(df_original.loc[index], index, "Literal Exata", 100)) seen_indices.add(index) if literal_results: response["search_log"].append(f"Encontrado(s) {len(literal_results)} resultado(s) literal(is). ATIVANDO EARLY EXIT.") results_with_semantic, log_msg = rerank_with_cross_encoder(normalize_text(original_query), literal_results, cross_encoder_model) response["search_log"].append(f"Cálculo de Score Semântico: {log_msg}") final_list = sorted(results_with_semantic, key=lambda x: x['text_score'], reverse=True) response["final_semantic_results"] = _highlight_matches(final_list, original_query) end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4) response["search_log"].append(f"\nBusca concluída em {response['search_duration_seconds']} segundos.") print("\n".join(response["search_log"])) return response else: response["search_log"].append("Nenhum resultado literal encontrado. Prosseguindo para a busca completa.") # --- Etapa 3: Busca Completa (se não houve Early Exit) --- query_para_geral = clean_symbols_from_query(original_query) response["search_log"].append(f"Query para busca geral (após limpeza de símbolos): '{query_para_geral}'") original_correction_corpus, normalized_correction_corpus = correction_corpus stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} final_query_words = [] words_were_corrected = False for word in query_para_geral.split(): norm_word = normalize_text(word) is_candidate_for_correction = ( len(norm_word) > 2 and norm_word not in stopwords and not norm_word.isdigit() and norm_word not in valid_words_set ) if is_candidate_for_correction: best_match, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.ratio) if score > 85: original_term_index = normalized_correction_corpus.index(best_match) corrected_word = original_correction_corpus[original_term_index] final_query_words.append(corrected_word) words_were_corrected = True else: final_query_words.append(word) else: final_query_words.append(word) query_corrigida = " ".join(final_query_words) if words_were_corrected and query_corrigida.strip() != query_para_geral.strip(): response["was_corrected"] = True response["corrected_query"] = query_corrigida response["search_log"].append(f"Correção ortográfica aplicada: '{query_para_geral}' -> '{query_corrigida}'") normalized_query = normalize_text(query_corrigida) query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1] final_list = [] response["search_log"].append("\n--- Etapa 3.1: Verificação de Feedback de Usuário ---") query_norm_fb = normalize_text(response.get("corrected_query") or original_query) if user_best_matches_counts and query_norm_fb in user_best_matches_counts: tuss_codes_by_votes = sorted(user_best_matches_counts[query_norm_fb].items(), key=lambda item: item[1], reverse=True) for tuss_code, votes in tuss_codes_by_votes: if votes >= user_feedback_threshold: matching_rows = df_original[df_original['Codigo_TUSS'] == tuss_code] for index, row in matching_rows.iterrows(): if index not in seen_indices: result = format_result(row, index, "Prioridade por Feedback", 101) result['feedback_votes'] = votes final_list.append(result) seen_indices.add(index) response["search_log"].append(f"{len(final_list)} resultado(s) adicionado(s) por feedback de usuário.") protected_candidates = [] response["search_log"].append("\n--- Etapa 3.2: Coleta de Candidatos do Bloco Protegido ---") exact_fields = [f"{col}_norm" for col in ['Codigo_TUSS', 'Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']] for field in exact_fields: if field in df_normalized.columns: for index in df_normalized.index[(df_normalized[field] == normalized_query) & (~df_normalized.index.isin(seen_indices))]: protected_candidates.append(format_result(df_original.loc[index], index, "Exato", 100)); seen_indices.add(index) if len(query_words) > 1: phrase_pattern = r'\b' + re.escape(normalized_query) + r'\b' for index in df_normalized.index[df_normalized['full_text_norm'].str.contains(phrase_pattern, na=False, regex=True) & ~df_normalized.index.isin(seen_indices)]: protected_candidates.append(format_result(df_original.loc[index], index, "Frase Exata", 99)); seen_indices.add(index) search_fields_norm = [f"{col}_norm" for col in ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']] for field in search_fields_norm: if field in df_normalized.columns: mask = pd.Series(True, index=df_normalized.index) for word in query_words: mask &= df_normalized[field].str.contains(r'\b' + re.escape(word) + r'\b', na=False) for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]: protected_candidates.append(format_result(df_original.loc[index], index, "Lógica (E)", 98)); seen_indices.add(index) broad_candidates = [] response["search_log"].append("\n--- Etapa 3.3: Coleta de Candidatos do Bloco Amplo ---") for match_text, score in process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], scorer=fuzz.token_set_ratio, limit=20, score_cutoff=88): for _, index in [item for item in fuzzy_search_corpus if item[0] == match_text]: if index not in seen_indices: broad_candidates.append(format_result(df_original.loc[index], index, "Aproximação", score)); seen_indices.add(index) if query_words: bm25_scores = bm25_model.get_scores(query_words) max_score = max(bm25_scores) if any(s > 0 for s in bm25_scores) else 1.0 top_n_indices = sorted(range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True)[:50] for i in top_n_indices: if bm25_scores[i] > 0 and df_normalized.index[i] not in seen_indices: index = df_normalized.index[i] broad_candidates.append(format_result(df_original.loc[index], index, "Relevância (BM25)", (bm25_scores[i]/max_score)*95)); seen_indices.add(index) response["search_log"].append("\n--- Etapa 4: Processamento dos Blocos ---") if protected_candidates: unique_protected = list({r['row_index']: r for r in protected_candidates}.values()) protected_with_scores, log_msg = rerank_with_cross_encoder(normalized_query, unique_protected, cross_encoder_model) response["search_log"].append(f"Bloco Protegido: {log_msg}. Ordem textual mantida.") final_list.extend(sorted(protected_with_scores, key=lambda x: x['text_score'], reverse=True)) if broad_candidates: unique_broad = list({r['row_index']: r for r in broad_candidates}.values()) unique_broad_sorted = sorted(unique_broad, key=lambda x: x['text_score'], reverse=True)[:30] response["search_log"].append(f"Bloco Amplo: {len(unique_broad_sorted)} candidatos selecionados para otimização.") boosted_broad, boost_logs = apply_boosts(unique_broad_sorted, query_words, doc_freq) for log in boost_logs: response["search_log"].append(log) reranked_broad, log_msg = rerank_with_cross_encoder(normalized_query, boosted_broad, cross_encoder_model) response["search_log"].append(f"Bloco Amplo: {log_msg}") final_list.extend(reranked_broad) response["final_semantic_results"] = _highlight_matches(final_list[:20], query_corrigida) end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4) response["search_log"].append(f"\nBusca completa em {response['search_duration_seconds']} segundos.") print("\n\n" + "="*20 + " LOG DE BUSCA FINAL " + "="*20) print("\n".join(response["search_log"])) return response