Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Medical Transcription Retriever from Langfuse | |
| Retrieves medical transcriptions from Langfuse traces and saves them locally. | |
| """ | |
| import os | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| from langfuse import Langfuse | |
| # Load environment variables | |
| load_dotenv() | |
| class MedicalTranscriptionRetriever: | |
| """Retrieves medical transcriptions from Langfuse traces.""" | |
| def __init__(self): | |
| """Initialize the retriever with Langfuse credentials.""" | |
| self.public_key = os.getenv('LANGFUSE_PUBLIC_KEY') | |
| self.secret_key = os.getenv('LANGFUSE_SECRET_KEY') | |
| self.host = os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com') | |
| if not self.public_key or not self.secret_key: | |
| raise ValueError("Missing Langfuse keys in .env file") | |
| self.client = Langfuse( | |
| public_key=self.public_key, | |
| secret_key=self.secret_key, | |
| host=self.host | |
| ) | |
| def extract_transcription_from_input(self, input_data): | |
| """Extract transcription from document input data.""" | |
| if isinstance(input_data, str): | |
| if "Voici le document:" in input_data: | |
| parts = input_data.split("Voici le document:") | |
| if len(parts) > 1: | |
| return parts[1].strip() | |
| elif isinstance(input_data, dict): | |
| # Search in messages if it's a dict with messages | |
| if 'messages' in input_data: | |
| for message in input_data['messages']: | |
| if isinstance(message, dict) and message.get('role') == 'user': | |
| content = message.get('content', '') | |
| if isinstance(content, str) and "Voici le document:" in content: | |
| parts = content.split("Voici le document:") | |
| if len(parts) > 1: | |
| return parts[1].strip() | |
| # Search in other dict keys | |
| for key, value in input_data.items(): | |
| if isinstance(value, str) and "Voici le document:" in value: | |
| parts = value.split("Voici le document:") | |
| if len(parts) > 1: | |
| return parts[1].strip() | |
| elif isinstance(input_data, list): | |
| for message in input_data: | |
| if isinstance(message, dict): | |
| content = message.get('content', '') | |
| if isinstance(content, str) and "Voici le document:" in content: | |
| parts = content.split("Voici le document:") | |
| if len(parts) > 1: | |
| return parts[1].strip() | |
| return None | |
| def get_traces_with_transcriptions(self, limit=50, days_back=7): | |
| """Retrieve traces containing medical transcriptions.""" | |
| print(f"π Searching for transcriptions in the last {limit} traces...") | |
| try: | |
| # Retrieve traces | |
| traces = self.client.get_traces(limit=limit) | |
| print(f"β {len(traces.data)} traces retrieved") | |
| transcriptions = [] | |
| for i, trace in enumerate(traces.data): | |
| print( | |
| f"π Analyzing trace {i+1}/{len(traces.data)}: {trace.id}") | |
| try: | |
| # Check if trace.input contains a transcription | |
| if hasattr(trace, 'input') and trace.input is not None: | |
| transcription = self.extract_transcription_from_input( | |
| trace.input) | |
| if transcription: | |
| trans_info = { | |
| 'trace_id': trace.id, | |
| 'trace_name': trace.name, | |
| 'user_id': trace.user_id, | |
| 'trace_timestamp': trace.timestamp.isoformat() if trace.timestamp else None, | |
| 'transcription': transcription, | |
| 'extracted_at': datetime.now().isoformat() | |
| } | |
| transcriptions.append(trans_info) | |
| print(f" β Transcription found and extracted!") | |
| else: | |
| print(f" β No transcription found in trace.input") | |
| else: | |
| print(f" β οΈ No input available for this trace") | |
| except Exception as e: | |
| print(f" β οΈ Error analyzing trace {trace.id}: {e}") | |
| continue | |
| # Delay between requests to avoid rate limiting | |
| if i < len(traces.data) - 1: # Don't wait after the last trace | |
| time.sleep(1) # Wait 1 second between each trace | |
| print(f"\nπ Summary: {len(transcriptions)} transcriptions found") | |
| return transcriptions | |
| except Exception as e: | |
| print(f"β Error retrieving traces: {e}") | |
| return [] | |
| def save_transcriptions(self, transcriptions, filename=None): | |
| """Save transcriptions to a JSON file.""" | |
| if not filename: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"medical_transcriptions_{timestamp}.json" | |
| try: | |
| # Concatenate all transcriptions into a single string | |
| transcription_texts = [trans['transcription'] | |
| for trans in transcriptions] | |
| concatenated_transcription = "\n\n".join(transcription_texts) | |
| # Save as an object with transcription as a single string | |
| data_to_save = { | |
| "extracted_at": datetime.now().isoformat(), | |
| "total_transcriptions": len(transcriptions), | |
| "transcription": concatenated_transcription | |
| } | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(data_to_save, f, ensure_ascii=False, indent=2) | |
| print(f"πΎ Transcriptions saved to {filename}") | |
| return filename | |
| except Exception as e: | |
| print(f"β Error during save: {e}") | |
| return None | |
| def save_transcriptions_by_user(self, transcriptions): | |
| """Save transcriptions by user in separate files.""" | |
| if not transcriptions: | |
| print("π No transcriptions to save") | |
| return | |
| # Create transcriptions directory if it doesn't exist | |
| transcriptions_dir = "transcriptions" | |
| if not os.path.exists(transcriptions_dir): | |
| os.makedirs(transcriptions_dir) | |
| print(f"π Directory '{transcriptions_dir}' created") | |
| # Group transcriptions by user_id | |
| user_transcriptions = {} | |
| for trans in transcriptions: | |
| user_id = trans.get('user_id', 'unknown') | |
| if user_id not in user_transcriptions: | |
| user_transcriptions[user_id] = [] | |
| user_transcriptions[user_id].append(trans) | |
| # Save one file per user (only if user_id contains .rtf) | |
| saved_files = [] | |
| for user_id, user_trans in user_transcriptions.items(): | |
| # Check if user_id contains .rtf | |
| if '.rtf' not in user_id: | |
| print(f"βοΈ Skipped {user_id} (no .rtf)") | |
| continue | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"transcriptions_{user_id}_{timestamp}.json" | |
| filepath = os.path.join(transcriptions_dir, filename) | |
| try: | |
| # Concatenate all transcriptions into a single string | |
| transcription_texts = [trans['transcription'] | |
| for trans in user_trans] | |
| concatenated_transcription = "\n\n".join(transcription_texts) | |
| # Save as an object with transcription as a single string | |
| data_to_save = { | |
| "user_id": user_id, | |
| "extracted_at": datetime.now().isoformat(), | |
| "total_transcriptions": len(user_trans), | |
| "transcription": concatenated_transcription | |
| } | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(data_to_save, f, ensure_ascii=False, indent=2) | |
| saved_files.append(filepath) | |
| print(f"πΎ Saved transcriptions for {user_id}: {filename}") | |
| except Exception as e: | |
| print(f"β Error saving transcriptions for {user_id}: {e}") | |
| print(f"\nπ Summary: {len(saved_files)} files saved") | |
| return saved_files | |
| def display_transcriptions_summary(self, transcriptions): | |
| """Display a summary of retrieved transcriptions.""" | |
| if not transcriptions: | |
| print("π No transcriptions to display") | |
| return | |
| print("\nπ TRANSCRIPTIONS SUMMARY") | |
| print("=" * 50) | |
| print(f"Total transcriptions: {len(transcriptions)}") | |
| # Group by user | |
| user_counts = {} | |
| for trans in transcriptions: | |
| user_id = trans.get('user_id', 'unknown') | |
| user_counts[user_id] = user_counts.get(user_id, 0) + 1 | |
| print(f"Unique users: {len(user_counts)}") | |
| for user_id, count in user_counts.items(): | |
| print(f" - {user_id}: {count} transcriptions") | |
| def run(self, limit=50, save_to_file=True, save_by_user=True): | |
| """Run the complete transcription retrieval process.""" | |
| print("π Starting medical transcription retrieval...") | |
| print("=" * 60) | |
| # Retrieve transcriptions | |
| transcriptions = self.get_traces_with_transcriptions(limit=limit) | |
| if not transcriptions: | |
| print("β No transcriptions found") | |
| return None | |
| # Display summary | |
| self.display_transcriptions_summary(transcriptions) | |
| # Save transcriptions | |
| saved_files = [] | |
| if save_to_file: | |
| saved_file = self.save_transcriptions(transcriptions) | |
| if saved_file: | |
| saved_files.append(saved_file) | |
| if save_by_user: | |
| user_files = self.save_transcriptions_by_user(transcriptions) | |
| saved_files.extend(user_files) | |
| print(f"\nβ Retrieval completed! {len(saved_files)} files saved") | |
| return saved_files | |
| def main(): | |
| """Main function to run the transcription retriever.""" | |
| print("π₯ Medical Transcription Retriever") | |
| print("=" * 40) | |
| try: | |
| retriever = MedicalTranscriptionRetriever() | |
| saved_files = retriever.run( | |
| limit=50, save_to_file=True, save_by_user=True) | |
| if saved_files: | |
| print(f"\nπ Success! Files saved: {len(saved_files)}") | |
| else: | |
| print("\nβ No files were saved") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| main() | |