# enhanced_search_v2.py (Versão Final com Normalização Literal Corrigida)
###################################################################################################
#
# RESUMO DAS CORREÇÕES E MELHORIAS:
#
# 1. NORMALIZAÇÃO LITERAL INTELIGENTE (CORREÇÃO PRINCIPAL):
# - A função `literal_normalize_text` foi aprimorada para remover símbolos e espaços
# do INÍCIO e do FIM da string, tanto na query quanto nos dados do banco.
# - Isso corrige o bug em que o "Early Exit" falhava com queries que continham lixo
# de copiar/colar (como '/////'), restaurando a inteligência da busca literal.
#
# 2. PRÉ-PROCESSAMENTO ROBUSTO MANTIDO:
# - A lógica de correção ortográfica criteriosa, que preserva a ordem e só corrige
# palavras fora dos dicionários, permanece.
#
# 3. ARQUITETURA FINAL MANTIDA:
# - O fluxo com Early Exit, Blocos Protegido/Amplo e Otimização com IA continua sendo
# a base do buscador.
#
###################################################################################################
import pandas as pd
import re
from thefuzz import process, fuzz
from unidecode import unidecode
import time
from sentence_transformers import util
import torch
import math
from collections import defaultdict
from rank_bm25 import BM25Okapi
# --- Bloco 1: Funções Auxiliares de Normalização e Limpeza --- #
def literal_normalize_text(text):
"""
Normalização para a busca literal. Remove acentos, converte para minúsculas
e remove quaisquer símbolos/espaços do início e do fim da string.
"""
if pd.isna(text): return ""
normalized = unidecode(str(text).lower())
# CORREÇÃO: Remove lixo do início e do fim, mas preserva símbolos internos (ex: / em multissitio/ressincronizador)
return re.sub(r'^\W+|\W+$', '', normalized).strip()
def clean_symbols_from_query(text):
"""Remove uma gama completa de aspas e símbolos da query para a busca geral."""
if pd.isna(text): return ""
return re.sub(r"[´`'\"/*]", "", str(text)).strip()
def normalize_text(text):
"""Normalização completa, limpando tudo exceto letras, números e espaços."""
if pd.isna(text): return ""
normalized = unidecode(str(text).lower())
normalized = re.sub(r'[^\w\s]', ' ', normalized)
return re.sub(r'\s+', ' ', normalized).strip()
def get_longest_word(query_text):
"""Função de fallback para encontrar a palavra mais longa em uma query."""
words = re.findall(r'\b\w{4,}\b', query_text)
if not words: return ""
return max(words, key=len)
# --- Bloco 2: Funções de Formatação e Destaque de Resultados --- #
def format_result(row_data, row_index, match_type="", score=0, **kwargs):
"""Formata uma linha do DataFrame em um dicionário de resultado padronizado, aplicando regras de negócio."""
data = row_data.copy()
is_rol = data.get('Correlacao_Rol', '').strip().lower() == 'sim'
if not is_rol:
data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', ''
data['PAC'], data['DUT'] = '---', '---'
else:
data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não'
original_dut_value = data.get('DUT', '').strip()
if original_dut_value and original_dut_value.replace('.', '', 1).isdigit():
data['DUT'] = f'Sim, DUT nº {original_dut_value}'
else: data['DUT'] = 'Não'
standard_columns = [ 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico' ]
formatted_data = {col: data.get(col, '') for col in standard_columns}
result = { "row_index": row_index, "score": round(score), "text_score": round(score), "semantic_score": 0, "match_type": match_type, "is_rol_procedure": is_rol }
result.update(formatted_data)
result.update(kwargs)
return result
def _highlight_matches(results, query):
"""Adiciona tags ao redor das palavras da query nos resultados para destaque no frontend."""
if not query or not results: return results
stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'}
query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords}
cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']
for result in results:
for col in cols_to_highlight:
original_text = result.get(col, '')
if original_text and query_words:
highlighted_text = original_text
for word in sorted(list(query_words), key=len, reverse=True):
pattern = r'\b(' + re.escape(word) + r')\b'
highlighted_text = re.sub(pattern, r'\1', highlighted_text, flags=re.IGNORECASE)
result[f"{col}_highlighted"] = highlighted_text
else:
result[f"{col}_highlighted"] = original_text
return results
# --- Bloco 3: Funções de Carregamento de Dados e Modelos --- #
def load_and_prepare_database(db_path):
"""Carrega a base de dados principal, a normaliza e pré-calcula todas as estruturas de dados para a busca."""
try:
print(f"Carregando e preparando a base de dados de: {db_path}...")
df_original = pd.read_csv(db_path, dtype=str).fillna('')
search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO']
df_normalized = df_original.copy()
df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text)
df_normalized['Procedimento_Rol_literal'] = df_normalized['Procedimento_Rol'].apply(literal_normalize_text)
df_normalized['Descricao_TUSS_literal'] = df_normalized['Descricao_TUSS'].apply(literal_normalize_text)
df_normalized['full_text_norm'] = ""
for col in search_cols:
if col in df_normalized.columns:
df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text)
df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm']
print("Criando modelo BM25 e calculando frequência de palavras...")
tokenized_corpus = [doc.split() for doc in df_normalized['full_text_norm']]
bm25_model = BM25Okapi(tokenized_corpus, k1=1.2)
print(f"Modelo BM25 otimizado com k1={bm25_model.k1}.")
doc_freq = defaultdict(int)
for doc_words in tokenized_corpus:
for word in set(doc_words): doc_freq[word] += 1
print("Combinando frequências de palavras (masculino/feminino)...")
combined_doc_freq = {}
processed_words = set()
for word, freq in doc_freq.items():
if word in processed_words: continue
pair_word = None
if word.endswith('o'): pair_word = word[:-1] + 'a'
elif word.endswith('a'): pair_word = word[:-1] + 'o'
if pair_word and pair_word in doc_freq:
combined_freq = freq + doc_freq[pair_word]
combined_doc_freq[word] = combined_freq
combined_doc_freq[pair_word] = combined_freq
processed_words.add(word); processed_words.add(pair_word)
else:
combined_doc_freq[word] = freq
print("Criando corpus para busca fuzzy...")
fuzzy_search_corpus = []
for index, row in df_normalized.iterrows():
for col in search_cols:
if f'{col}_norm' in row and pd.notna(row[f'{col}_norm']):
if val := row[f'{col}_norm']: fuzzy_search_corpus.append((val, index))
print(f"Base de dados pronta com {len(df_original)} procedimentos.")
return df_original, df_normalized, fuzzy_search_corpus, bm25_model, combined_doc_freq
except Exception as e:
print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise
def load_correction_corpus(dict_path, column_name='Termo_Correto'):
"""Carrega um corpus para correção ortográfica, retornando termos originais, normalizados e um set de palavras."""
try:
df_dict = pd.read_csv(dict_path, dtype=str).fillna('')
if column_name not in df_dict.columns: return [], [], set()
original_corpus = df_dict[column_name].dropna().astype(str).tolist()
normalized_corpus = [normalize_text(term) for term in original_corpus]
db_word_set = {word for term in normalized_corpus for word in term.split()}
return original_corpus, normalized_corpus, db_word_set
except (FileNotFoundError, Exception): return [], [], set()
def load_general_dictionary(path):
"""Carrega o dicionário geral de português."""
try:
with open(path, 'r', encoding='utf-8') as f: words = {normalize_text(line.strip()) for line in f if line.strip()}
print(f"Dicionário geral carregado com {len(words)} palavras.")
return words
except (FileNotFoundError, Exception): return set()
# --- Bloco 4: Funções de Reclassificação, Boosts e IA --- #
def create_unified_document_text(result_dict):
"""Cria um único texto representativo de um procedimento para ser usado pelo Cross-Encoder da IA."""
text_parts = { result_dict.get(key, '') for key in ['Descricao_TUSS', 'Procedimento_Rol', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4']}
return ". ".join(sorted([part for part in text_parts if part and str(part).strip()]))
def rerank_with_cross_encoder(query, results_list, model):
"""Usa a IA (Cross-Encoder) para calcular o score semântico. Para o Bloco Amplo, também reordena a lista."""
if not model or not results_list or not query:
return results_list, "Cross-Encoder não fornecido ou lista de candidatos vazia."
sentence_pairs = [[query, create_unified_document_text(result)] for result in results_list]
if not sentence_pairs: return results_list, "Não foram encontrados pares para reordenar."
try:
raw_scores = model.predict(sentence_pairs, show_progress_bar=False)
semantic_scores = torch.sigmoid(torch.tensor(raw_scores)).numpy() * 100
for i, result in enumerate(results_list): result['semantic_score'] = round(semantic_scores[i])
def conditional_hybrid_sort_key(result):
sem_score = result.get('semantic_score', 0); txt_score = result.get('text_score', 0)
if sem_score >= 85: return (1, sem_score, txt_score)
else: return (0, (sem_score * 0.6) + (txt_score * 0.4), sem_score)
reranked_results = sorted(results_list, key=conditional_hybrid_sort_key, reverse=True)
log_message = f"Scores semânticos calculados para {len(reranked_results)} candidatos."
return reranked_results, log_message
except Exception as e:
log_message = f"Erro no Cross-Encoder: {e}"; print(log_message)
return results_list, log_message
def apply_boosts(results, query_words, doc_freq):
"""Aplica boosts de relevância para jargões e termos ultra-raros ao Bloco Amplo."""
if not results or not query_words: return results, []
log_messages = []
jargon_terms = {w for w in query_words if doc_freq.get(w, 0) <= 15 and len(w) > 4}
if jargon_terms:
boosted_count = 0
for r in results:
text = normalize_text(create_unified_document_text(r))
if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in jargon_terms):
r['text_score'] = min(r['text_score'] * 1.2, 97)
boosted_count += 1
if boosted_count > 0: log_messages.append(f"Boost de Jargão para '{list(jargon_terms)}' ({boosted_count} afetados).")
rare_terms = {w for w in query_words if doc_freq.get(w, 0) <= 5 and len(w) > 4}
if rare_terms:
boosted_count = 0
for r in results:
text = normalize_text(create_unified_document_text(r))
if any(re.search(r'\b' + re.escape(term) + r'\b', text) for term in rare_terms):
r['text_score'] = min(r['text_score'] * 1.3, 97)
boosted_count += 1
if boosted_count > 0: log_messages.append(f"Boost Ultra-Raro para '{list(rare_terms)}' ({boosted_count} afetados).")
for r in results: r['score'] = r['text_score']
return results, log_messages
# --- Bloco 5: Função Principal de Orquestração da Busca --- #
def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus,
valid_words_set, bm25_model, doc_freq,
cross_encoder_model=None, user_best_matches_counts=None, user_feedback_threshold=10):
start_time = time.time(); original_query = str(query).strip()
response = { "search_log": [], "final_semantic_results": [], "original_query": original_query, "corrected_query": "", "was_corrected": False }
if not original_query: response["search_log"].append("Query vazia."); return response
response["search_log"].append(f"Buscando por: '{original_query}'")
# --- Etapa 1: Pré-processamento inicial ---
query_para_literal = literal_normalize_text(original_query)
response["search_log"].append(f"\n--- Etapa 1: Pré-processamento da Query ---")
response["search_log"].append(f"Query para busca literal (após limpeza de bordas): '{query_para_literal}'")
# --- Etapa 2: Camada de Early Exit ---
response["search_log"].append("\n--- Etapa 2: Camada de Early Exit (Literal) ---")
literal_results = []
seen_indices = set()
literal_fields = ['Codigo_TUSS_literal', 'Procedimento_Rol_literal', 'Descricao_TUSS_literal']
for field in literal_fields:
mask = df_normalized[field] == query_para_literal
for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]:
literal_results.append(format_result(df_original.loc[index], index, "Literal Exata", 100))
seen_indices.add(index)
if literal_results:
response["search_log"].append(f"Encontrado(s) {len(literal_results)} resultado(s) literal(is). ATIVANDO EARLY EXIT.")
results_with_semantic, log_msg = rerank_with_cross_encoder(normalize_text(original_query), literal_results, cross_encoder_model)
response["search_log"].append(f"Cálculo de Score Semântico: {log_msg}")
final_list = sorted(results_with_semantic, key=lambda x: x['text_score'], reverse=True)
response["final_semantic_results"] = _highlight_matches(final_list, original_query)
end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4)
response["search_log"].append(f"\nBusca concluída em {response['search_duration_seconds']} segundos.")
print("\n".join(response["search_log"]))
return response
else:
response["search_log"].append("Nenhum resultado literal encontrado. Prosseguindo para a busca completa.")
# --- Etapa 3: Busca Completa (se não houve Early Exit) ---
query_para_geral = clean_symbols_from_query(original_query)
response["search_log"].append(f"Query para busca geral (após limpeza de símbolos): '{query_para_geral}'")
original_correction_corpus, normalized_correction_corpus = correction_corpus
stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'}
final_query_words = []
words_were_corrected = False
for word in query_para_geral.split():
norm_word = normalize_text(word)
is_candidate_for_correction = ( len(norm_word) > 2 and norm_word not in stopwords and not norm_word.isdigit() and norm_word not in valid_words_set )
if is_candidate_for_correction:
best_match, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.ratio)
if score > 85:
original_term_index = normalized_correction_corpus.index(best_match)
corrected_word = original_correction_corpus[original_term_index]
final_query_words.append(corrected_word)
words_were_corrected = True
else:
final_query_words.append(word)
else:
final_query_words.append(word)
query_corrigida = " ".join(final_query_words)
if words_were_corrected and query_corrigida.strip() != query_para_geral.strip():
response["was_corrected"] = True
response["corrected_query"] = query_corrigida
response["search_log"].append(f"Correção ortográfica aplicada: '{query_para_geral}' -> '{query_corrigida}'")
normalized_query = normalize_text(query_corrigida)
query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1]
final_list = []
response["search_log"].append("\n--- Etapa 3.1: Verificação de Feedback de Usuário ---")
query_norm_fb = normalize_text(response.get("corrected_query") or original_query)
if user_best_matches_counts and query_norm_fb in user_best_matches_counts:
tuss_codes_by_votes = sorted(user_best_matches_counts[query_norm_fb].items(), key=lambda item: item[1], reverse=True)
for tuss_code, votes in tuss_codes_by_votes:
if votes >= user_feedback_threshold:
matching_rows = df_original[df_original['Codigo_TUSS'] == tuss_code]
for index, row in matching_rows.iterrows():
if index not in seen_indices:
result = format_result(row, index, "Prioridade por Feedback", 101)
result['feedback_votes'] = votes
final_list.append(result)
seen_indices.add(index)
response["search_log"].append(f"{len(final_list)} resultado(s) adicionado(s) por feedback de usuário.")
protected_candidates = []
response["search_log"].append("\n--- Etapa 3.2: Coleta de Candidatos do Bloco Protegido ---")
exact_fields = [f"{col}_norm" for col in ['Codigo_TUSS', 'Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']]
for field in exact_fields:
if field in df_normalized.columns:
for index in df_normalized.index[(df_normalized[field] == normalized_query) & (~df_normalized.index.isin(seen_indices))]:
protected_candidates.append(format_result(df_original.loc[index], index, "Exato", 100)); seen_indices.add(index)
if len(query_words) > 1:
phrase_pattern = r'\b' + re.escape(normalized_query) + r'\b'
for index in df_normalized.index[df_normalized['full_text_norm'].str.contains(phrase_pattern, na=False, regex=True) & ~df_normalized.index.isin(seen_indices)]:
protected_candidates.append(format_result(df_original.loc[index], index, "Frase Exata", 99)); seen_indices.add(index)
search_fields_norm = [f"{col}_norm" for col in ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']]
for field in search_fields_norm:
if field in df_normalized.columns:
mask = pd.Series(True, index=df_normalized.index)
for word in query_words: mask &= df_normalized[field].str.contains(r'\b' + re.escape(word) + r'\b', na=False)
for index in df_normalized.index[mask & ~df_normalized.index.isin(seen_indices)]:
protected_candidates.append(format_result(df_original.loc[index], index, "Lógica (E)", 98)); seen_indices.add(index)
broad_candidates = []
response["search_log"].append("\n--- Etapa 3.3: Coleta de Candidatos do Bloco Amplo ---")
for match_text, score in process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], scorer=fuzz.token_set_ratio, limit=20, score_cutoff=88):
for _, index in [item for item in fuzzy_search_corpus if item[0] == match_text]:
if index not in seen_indices:
broad_candidates.append(format_result(df_original.loc[index], index, "Aproximação", score)); seen_indices.add(index)
if query_words:
bm25_scores = bm25_model.get_scores(query_words)
max_score = max(bm25_scores) if any(s > 0 for s in bm25_scores) else 1.0
top_n_indices = sorted(range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True)[:50]
for i in top_n_indices:
if bm25_scores[i] > 0 and df_normalized.index[i] not in seen_indices:
index = df_normalized.index[i]
broad_candidates.append(format_result(df_original.loc[index], index, "Relevância (BM25)", (bm25_scores[i]/max_score)*95)); seen_indices.add(index)
response["search_log"].append("\n--- Etapa 4: Processamento dos Blocos ---")
if protected_candidates:
unique_protected = list({r['row_index']: r for r in protected_candidates}.values())
protected_with_scores, log_msg = rerank_with_cross_encoder(normalized_query, unique_protected, cross_encoder_model)
response["search_log"].append(f"Bloco Protegido: {log_msg}. Ordem textual mantida.")
final_list.extend(sorted(protected_with_scores, key=lambda x: x['text_score'], reverse=True))
if broad_candidates:
unique_broad = list({r['row_index']: r for r in broad_candidates}.values())
unique_broad_sorted = sorted(unique_broad, key=lambda x: x['text_score'], reverse=True)[:30]
response["search_log"].append(f"Bloco Amplo: {len(unique_broad_sorted)} candidatos selecionados para otimização.")
boosted_broad, boost_logs = apply_boosts(unique_broad_sorted, query_words, doc_freq)
for log in boost_logs: response["search_log"].append(log)
reranked_broad, log_msg = rerank_with_cross_encoder(normalized_query, boosted_broad, cross_encoder_model)
response["search_log"].append(f"Bloco Amplo: {log_msg}")
final_list.extend(reranked_broad)
response["final_semantic_results"] = _highlight_matches(final_list[:20], query_corrigida)
end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4)
response["search_log"].append(f"\nBusca completa em {response['search_duration_seconds']} segundos.")
print("\n\n" + "="*20 + " LOG DE BUSCA FINAL " + "="*20)
print("\n".join(response["search_log"]))
return response