# enhanced_search_v2.py # --------------------- # Motor de busca híbrido e em camadas para procedimentos médicos. # Versão final consolidada com todas as otimizações. # # Funcionalidades Principais: # 1. Correção Ortográfica PRÉ-BUSCA: Corrige termos inválidos ANTES de qualquer busca. # 2. Camada 0 para Busca Literal Robusta: Encontra correspondências exatas da frase, # ignorando caixa, acentos, pontuação e espaçamento. # 3. Early Exit Otimizado: Interrompe a busca com log claro e correto. # 4. Busca em Múltiplas Camadas: Da mais restrita (literal) à mais abrangente (ponderada). # 5. Pontuação por Relevância (IDF): Palavras raras têm mais peso. # 6. Limpeza de Dados: Zera campos do Rol para procedimentos que não são do Rol. # 7. Reordenação Semântica: Usa o MiniLM-L6-v2 para entender o significado e reordenar. # 8. Feedback de Usuário: Prioriza resultados validados pela comunidade. # 9. Lida com múltiplos procedimentos (linhas do DB) para um mesmo código TUSS. import pandas as pd import re from thefuzz import process, fuzz from unidecode import unidecode import time from sentence_transformers import util import torch import math from collections import defaultdict # --- FUNÇÕES AUXILIARES DE NORMALIZAÇÃO --- ### ALTERAÇÃO ### def literal_normalize_text(text): """ Normaliza o texto para busca literal (Camada 0): minúsculas, sem acentos, substitui pontuação por espaço e padroniza espaços em branco. """ if pd.isna(text): return "" normalized = unidecode(str(text).lower()) # CORREÇÃO: Substitui caracteres não-alfanuméricos por um espaço em vez de removê-los. # Isso impede que palavras como "sangue." e "O" se fundam em "sangueO". normalized = re.sub(r'[^\w\s]', ' ', normalized) return re.sub(r'\s+', ' ', normalized).strip() def normalize_text(text): """Normaliza o texto para busca por tokens (palavras): minúsculas, sem acentos e espaços extras.""" if pd.isna(text): return "" return unidecode(str(text).lower().strip()) def get_longest_word(query_text): """Extrai a palavra mais longa de uma query (usado no fallback).""" words = re.findall(r'\b\w{4,}\b', query_text) if not words: return "" return max(words, key=len) # --- FUNÇÕES DE FORMATAÇÃO E DESTAQUE --- # No arquivo enhanced_search_v2.py def format_result(row_data, match_type="", score=0): """ Formata uma linha do DataFrame em um dicionário de resultado padrão. Aplica a regra de negócio para limpar dados se o procedimento não for do Rol. """ data = row_data.copy() if data.get('Correlacao_Rol', '').strip().lower() != 'sim': data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', '' data['PAC'], data['DUT'] = '---', '---' else: data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não' # Lógica da DUT corrigida aqui original_dut_value = data.get('DUT', '').strip() # CORREÇÃO: A verificação agora aceita números com ponto decimal (ex: "65.1") # A lógica é: se o valor, após remover o primeiro '.', for composto apenas de dígitos, é válido. if original_dut_value and original_dut_value.replace('.', '', 1).isdigit(): data['DUT'] = f'Sim, DUT nº {original_dut_value}' else: data['DUT'] = 'Não' standard_columns = [ 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico' ] formatted_data = {col: data.get(col, '') for col in standard_columns} result = {"score": round(score), "match_type": match_type} result.update(formatted_data) return result def _highlight_matches(results, query): """Adiciona tags em volta das palavras da query nos resultados.""" if not query or not results: return results stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords} cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] for result in results: for col in cols_to_highlight: original_text = result.get(col, '') highlighted_text = original_text if original_text and query_words: for word in sorted(list(query_words), key=len, reverse=True): pattern = r'\b(' + re.escape(word) + r')\b' highlighted_text = re.sub(pattern, r'\1', highlighted_text, flags=re.IGNORECASE) result[f"{col}_highlighted"] = highlighted_text return results # --- FUNÇÕES DE CARREGAMENTO DE DADOS --- def load_and_prepare_database(db_path): """ Carrega o CSV, cria colunas normalizadas, campo de texto único, pesos IDF e um conjunto de todas as palavras únicas da base de dados. """ try: print(f"Carregando e preparando a base de dados de: {db_path}...") df_original = pd.read_csv(db_path, dtype=str).fillna('') search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] df_normalized = df_original.copy() df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text) df_normalized['Codigo_TUSS_norm'] = df_normalized['Codigo_TUSS'].apply(normalize_text) df_normalized['full_text_norm'] = "" for col in search_cols: if col in df_normalized.columns: df_normalized[f'{col}_literal'] = df_normalized[col].apply(literal_normalize_text) df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text) df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm'] print("Calculando pesos IDF e dicionário da base...") num_documents = len(df_normalized) doc_freq = defaultdict(int) db_word_set = set() for text in df_normalized['full_text_norm']: words = set(text.split()) db_word_set.update(words) for word in words: if word: doc_freq[word] += 1 db_word_set.discard('') print(f"Dicionário da base de dados criado com {len(db_word_set)} palavras únicas.") idf_scores = {word: math.log(num_documents / (freq + 1)) for word, freq in doc_freq.items()} print(f"Pesos IDF calculados para {len(idf_scores)} palavras.") print("Criando corpus para busca fuzzy...") fuzzy_search_corpus = [] for index, row in df_normalized.iterrows(): for col in search_cols: if col in df_original.columns and f'{col}_norm' in row and pd.notna(row[f'{col}_norm']): val = row[f'{col}_norm'] if val: fuzzy_search_corpus.append((val, index, f'{col}_norm')) print(f"Base de dados pronta com {len(df_original)} procedimentos.") return df_original, df_normalized, fuzzy_search_corpus, idf_scores, db_word_set except Exception as e: print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise def load_general_dictionary(path): try: print(f"Carregando dicionário geral de português de: {path}...") with open(path, 'r', encoding='utf-8') as f: words = {normalize_text(line.strip()) for line in f if line.strip()} print(f"Dicionário geral carregado com {len(words)} palavras.") return words except FileNotFoundError: return set() except Exception as e: return set() def load_correction_corpus(dict_path, column_name='Termo_Correto'): try: print(f"Carregando corpus de correção de: {dict_path}...") df_dict = pd.read_csv(dict_path, dtype=str).fillna('') if column_name not in df_dict.columns: return [], [] original_corpus = df_dict[column_name].dropna().astype(str).tolist() normalized_corpus = [normalize_text(term) for term in original_corpus] print(f"Corpus de correção carregado com {len(original_corpus)} termos.") return original_corpus, normalized_corpus except FileNotFoundError: return [], [] except Exception as e: return [], [] # --- FUNÇÃO DE RECLASSIFICAÇÃO SEMÂNTICA --- def rerank_with_semantic_model(original_query, results_list, model): if not model or not results_list: return results_list semantic_columns = ['Descricao_TUSS', 'Procedimento_Rol', 'SUBGRUPO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] corpus_texts = [". ".join(sorted(list({res.get(col) for col in semantic_columns if res.get(col) and isinstance(res.get(col), str)}))) for res in results_list] try: query_embedding = model.encode(original_query, convert_to_tensor=True, show_progress_bar=False) corpus_embeddings = model.encode(corpus_texts, convert_to_tensor=True, show_progress_bar=False) cosine_scores = util.cos_sim(query_embedding, corpus_embeddings) except Exception as e: return results_list for i, result in enumerate(results_list): result['semantic_score'] = round(max(0, cosine_scores[0][i].item()) * 100) result['hybrid_score'] = result['semantic_score'] + result.get('score', 0) return sorted(results_list, key=lambda x: (x.get('score', 0) == 100, x.get('hybrid_score', 0)), reverse=True) # --- FUNÇÃO INTERNA DE BUSCA COM CAMADAS --- def _run_search_layers(literal_query, normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, idf_scores, limit_per_layer): """Executa as camadas de busca e retorna o nome da camada de saída em caso de early exit.""" matched_indices = set() stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1] ### ALTERAÇÃO ### # --- CAMADA 0: Busca Literal (Contida) --- # Esta é a lógica correta para encontrar uma frase inteira dentro de um texto maior. if literal_query: temp_results = [] literal_cols = ['Codigo_TUSS_literal', 'Descricao_TUSS_literal', 'Procedimento_Rol_literal'] for col in literal_cols: if col in df_normalized.columns: # Lógica restaurada: Busca pela frase/palavra inteira contida no texto. # O '\b' (word boundary) garante que "sulta" não encontre "consulta", # mas "consulta" encontre "consulta com especialista". # Isso agora funciona corretamente graças à correção em literal_normalize_text. mask = df_normalized[col].str.contains(r'\b' + re.escape(literal_query) + r'\b', na=False) matches = df_normalized[mask] for index, _ in matches.iterrows(): if index not in matched_indices: match_type = "Código Literal" if "Codigo" in col else "Texto Literal" temp_results.append(format_result(df_original.loc[index], match_type, 100)) matched_indices.add(index) if temp_results: response["results_by_layer"]["literal_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] return "Busca Literal" # --- CAMADA 1: Busca Normalizada Exata --- # Esta camada agora serve para quando o texto da busca é *exatamente* igual ao da célula. temp_results = [] if normalized_query: exact_code_matches = df_normalized[df_normalized['Codigo_TUSS_norm'] == normalized_query] for index, _ in exact_code_matches.iterrows(): if index not in matched_indices: temp_results.append(format_result(df_original.loc[index], "Código Exato (Normalizado)", 100)) matched_indices.add(index) for col in ['Descricao_TUSS_norm', 'Procedimento_Rol_norm']: if col in df_normalized.columns: exact_text_matches = df_normalized[df_normalized[col] == normalized_query] for index, _ in exact_text_matches.iterrows(): if index not in matched_indices: temp_results.append(format_result(df_original.loc[index], "Exato (Normalizado)", 100)) matched_indices.add(index) if temp_results: response["results_by_layer"]["exact_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] return "Normalizada Exata" # --- CAMADA 2: Busca Lógica 'E' --- temp_results = [] if query_words: mask = pd.Series(True, index=df_normalized.index) for word in query_words: mask &= df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(word) + r'\b', na=False) for index, row in df_normalized[mask & ~df_normalized.index.isin(matched_indices)].iterrows(): score = fuzz.WRatio(normalized_query, row.get('Descricao_TUSS_norm', '')) if score > 85: temp_results.append(format_result(df_original.loc[index], "Busca Lógica (E)", score)) matched_indices.add(index) response["results_by_layer"]["logical_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer] # --- CAMADA 3: Busca 'Quase Exata' (Fuzzy) --- temp_results = [] processed_indices_layer3 = set() if fuzzy_search_corpus: # --- INÍCIO DO CÓDIGO DE DEPURAÇÃO --- print("\n" + "="*20 + " DEBUG CAMADA 3 " + "="*20) print(f"Query para Fuzzy: '{normalized_query}'") print(f"Tamanho do Fuzzy Corpus: {len(fuzzy_search_corpus)}") # --- FIM DO CÓDIGO DE DEPURAÇÃO --- almost_exact_matches = process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], scorer=fuzz.token_set_ratio, limit=limit_per_layer * 3, score_cutoff=90) # --- INÍCIO DO CÓDIGO DE DEPURAÇÃO --- print("\n" + "="*20 + " DEBUG CAMADA 3 " + "="*20) print(f"Query para Fuzzy: '{normalized_query}'") print(f"Tamanho do Fuzzy Corpus: {len(fuzzy_search_corpus)}") # --- FIM DO CÓDIGO DE DEPURAÇÃO --- for match_text, score in almost_exact_matches: if score == 100 and match_text == normalized_query: continue corpus_items = [item for item in fuzzy_search_corpus if item[0] == match_text] for _, original_index, _ in corpus_items: if original_index not in matched_indices and original_index not in processed_indices_layer3: temp_results.append(format_result(df_original.loc[original_index], "Quase Exato", 98)) matched_indices.add(original_index) processed_indices_layer3.add(original_index) response["results_by_layer"]["almost_exact_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] # --- CAMADA 4: Busca por Termos Validados --- temp_results = [] if query_words: mask_c4 = pd.Series(True, index=df_normalized.index) for word in query_words: mask_c4 &= df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(word) + r'\b', na=False) for index, row in df_normalized[mask_c4 & ~df_normalized.index.isin(matched_indices)].iterrows(): score = fuzz.WRatio(normalized_query, row.get('Descricao_TUSS_norm', '')) if score > 75: temp_results.append(format_result(df_original.iloc[index], "Termos Validados (E)", score)) matched_indices.add(index) response["results_by_layer"]["contains_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer] # --- CAMADA 5: Busca Ponderada (IDF) --- temp_results = [] if query_words and idf_scores: max_idf = max(idf_scores.values()) if idf_scores else 1.0 total_query_idf = sum(idf_scores.get(word, max_idf) for word in query_words) regex_pattern = '|'.join(re.escape(word) for word in query_words) mask = df_normalized['full_text_norm'].str.contains(regex_pattern, na=False) candidate_df = df_normalized[mask & ~df_normalized.index.isin(matched_indices)] for index, row in candidate_df.iterrows(): weighted_score = sum(idf_scores.get(word, 0) for word in query_words if word in row['full_text_norm'].split()) normalized_score = (weighted_score / total_query_idf) * 90 if total_query_idf > 0 else 0 if query_words and row.get('full_text_norm', '').strip().startswith(query_words[0]): normalized_score = min(normalized_score + 10, 95) temp_results.append(format_result(df_original.loc[index], "Busca Ponderada (IDF)", normalized_score)) matched_indices.add(index) response["results_by_layer"]["term_matches"] = sorted(temp_results, key=lambda x: x.get('score', 0), reverse=True)[:limit_per_layer * 4] # --- CAMADA 6: Fallback --- total_found_primary = sum(len(v) for v in response["results_by_layer"].values()) if total_found_primary == 0 and normalized_query: temp_results = [] longest_word = get_longest_word(normalized_query) if longest_word: mask_c6 = df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(longest_word) + r'\b', na=False) for index, row in df_normalized[mask_c6 & ~df_normalized.index.isin(matched_indices)].iterrows(): temp_results.append(format_result(df_original.loc[index], f"Palavra-Chave '{longest_word}'", 80)) response["results_by_layer"]["keyword_matches"] = sorted(temp_results, key=lambda x: x['Codigo_TUSS'])[:limit_per_layer] return None # --- FUNÇÃO PRINCIPAL QUE ORQUESTRA A BUSCA --- def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus, portuguese_word_set, idf_scores, db_word_set, limit_per_layer=10, semantic_model=None, user_best_matches_counts=None, user_feedback_threshold=10): SEMANTIC_RERANK_LIMIT = 40 start_time = time.time() original_query = str(query).strip() response = {"search_log": [], "results_by_layer": {"literal_matches": [], "exact_matches": [], "logical_matches": [], "almost_exact_matches": [], "contains_matches": [], "term_matches": [], "keyword_matches": []}, "final_semantic_results": [], "was_corrected": False, "original_query": original_query, "corrected_query": ""} if not original_query: response["search_log"].append("Query vazia, busca não realizada.") return response response["search_log"].append(f"Buscando por: '{original_query}'") # ETAPA 1: CORREÇÃO ORTOGRÁFICA PRÉ-BUSCA stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_after_correction = original_query original_correction_corpus, normalized_correction_corpus = correction_corpus valid_words = portuguese_word_set.union(db_word_set) if valid_words and original_correction_corpus: words_from_query, corrected_words, made_correction = original_query.split(), [], False for word in words_from_query: norm_word = normalize_text(word) if norm_word in stopwords or len(norm_word) < 4: corrected_words.append(word) continue if norm_word not in valid_words: match_norm, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.token_set_ratio) if score >= 85: match_index = normalized_correction_corpus.index(match_norm) corrected_word_original = original_correction_corpus[match_index] corrected_word = corrected_word_original if word.istitle(): corrected_word = corrected_word.title() elif word.isupper(): corrected_word = corrected_word.upper() corrected_words.append(corrected_word) made_correction = True else: corrected_words.append(word) else: corrected_words.append(word) if made_correction: query_after_correction = " ".join(corrected_words) response["was_corrected"] = True response["corrected_query"] = query_after_correction response["search_log"].append(f"Query corrigida para: '{query_after_correction}'.") # ETAPA 2: PREPARAÇÃO DAS QUERIES PARA AS CAMADAS literal_query = literal_normalize_text(query_after_correction) cleaned_query = " ".join([word for word in query_after_correction.split() if normalize_text(word) not in stopwords]) normalized_query = normalize_text(cleaned_query) if not cleaned_query.strip() and not literal_query.strip(): response["search_log"].append("Query resultante é vazia. Busca não realizada.") return response if cleaned_query != query_after_correction: response["search_log"].append(f"Query limpa (sem stop words): '{cleaned_query}'") # Para proibir buscas com apenas um caractere if len(cleaned_query.strip()) <= 1: # Pega o termo que seria buscado para exibir no log. Pode ser vazio. term_to_log = cleaned_query.strip() response["search_log"].append(f"Busca por '{term_to_log}' ignorada. A busca deve conter no mínimo 2 caracteres.") response["final_semantic_results"] = [] return response # ETAPA 3: EXECUÇÃO DA BUSCA exit_layer_name = _run_search_layers(literal_query, normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, idf_scores, limit_per_layer) # ETAPA 4: AGREGAÇÃO E REORDENAÇÃO DOS RESULTADOS all_candidates = [] layer_order = ["literal_matches", "exact_matches", "logical_matches", "almost_exact_matches", "contains_matches", "term_matches", "keyword_matches"] layer_names_map = {"literal_matches": "0. Busca Literal", "exact_matches": "1. Normalizada Exata", "logical_matches": "2. Lógica 'E'", "almost_exact_matches": "3. Quase Exatos (Fuzzy)", "contains_matches": "4. Termos Validados", "term_matches": "5. Busca Ponderada (IDF)", "keyword_matches": "6. Fallback (Palavra-Chave)"} if exit_layer_name: response["search_log"].append(f"--- [OTIMIZAÇÃO] Resultado de alta confiança encontrado na camada '{exit_layer_name}'. Busca interrompida. ---") response["search_log"].append("\n--- Detalhamento da Busca por Camadas ---") for layer_key in layer_order: layer_results = response["results_by_layer"].get(layer_key, []) num_results = len(layer_results) response["search_log"].append(f"[{layer_names_map.get(layer_key, layer_key)}]: {num_results} resultado(s)") all_candidates.extend(layer_results) # Lógica de feedback do usuário feedback_prioritized_tuss_votes = {} if user_best_matches_counts and all_candidates: query_norm_for_feedback = normalize_text(response.get("corrected_query") or original_query) feedback_for_query = user_best_matches_counts.get(query_norm_for_feedback, {}) for tuss_code, votes in feedback_for_query.items(): if votes >= user_feedback_threshold: feedback_prioritized_tuss_votes[tuss_code] = votes if feedback_prioritized_tuss_votes: response["search_log"].append(f"\nFeedback de usuários qualificado encontrado.") for result in all_candidates: if result.get('Codigo_TUSS') in feedback_prioritized_tuss_votes: result['is_user_best_match'] = True result['feedback_votes'] = feedback_prioritized_tuss_votes[result.get('Codigo_TUSS')] response["search_log"].append(f"\n--- Análise e Reordenação ---\nTotal de candidatos encontrados: {len(all_candidates)}") query_for_highlight = response.get("corrected_query") or cleaned_query all_candidates = _highlight_matches(all_candidates, query_for_highlight) final_list = [] if all_candidates: query_for_semantic = response.get("corrected_query") or cleaned_query prioritized_candidates = [res for res in all_candidates if res.get('is_user_best_match')] non_prioritized_candidates = [res for res in all_candidates if not res.get('is_user_best_match')] if semantic_model and prioritized_candidates: reranked_prioritized = rerank_with_semantic_model(query_for_semantic, prioritized_candidates, semantic_model) prioritized_results_sorted = sorted(reranked_prioritized, key=lambda x: (x.get('feedback_votes', 0), x.get('semantic_score', 0)), reverse=True) else: prioritized_results_sorted = sorted(prioritized_candidates, key=lambda x: (x.get('feedback_votes', 0), x.get('score', 0)), reverse=True) final_list.extend(prioritized_results_sorted) if semantic_model and non_prioritized_candidates: candidates_for_rerank = non_prioritized_candidates[:SEMANTIC_RERANK_LIMIT] reranked_non_prioritized = rerank_with_semantic_model(query_for_semantic, candidates_for_rerank, semantic_model) final_list.extend(reranked_non_prioritized) seen_reranked_codes = {r.get('Codigo_TUSS') for r in reranked_non_prioritized} for candidate in non_prioritized_candidates: if candidate.get('Codigo_TUSS') not in seen_reranked_codes: final_list.append(candidate) else: final_list.extend(sorted(non_prioritized_candidates, key=lambda x: x.get('score', 0), reverse=True)) response["search_log"].append(f"Lista final de resultados combinada: {len(final_list)} itens antes do limite.") response["final_semantic_results"] = final_list[:10] else: response["search_log"].append("Nenhum resultado final para exibir.") response["final_semantic_results"] = [] end_time = time.time() response["search_duration_seconds"] = round(end_time - start_time, 4) response["search_log"].append(f"\nBusca completa em {response['search_duration_seconds']} segundos.") print(f"\n\n==================== LOG DE DEPURAÇÃO (QUERY: '{original_query}') ====================") return response