Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Complete Pipeline Test | |
| Tests the full pipeline including Langfuse transcription download | |
| """ | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from datetime import datetime | |
| # Add the current directory to Python path | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| def test_complete_pipeline(): | |
| """Test the complete pipeline including Langfuse transcription download.""" | |
| print("π₯ Complete Medical Document Pipeline Test") | |
| print("=" * 70) | |
| print("This test will:") | |
| print("1. Download transcriptions from Langfuse") | |
| print("2. Run the complete document processing pipeline") | |
| print("3. Validate the results") | |
| print("=" * 70) | |
| # Step 1: Download transcriptions from Langfuse | |
| print("\nπ₯ Step 1: Downloading transcriptions from Langfuse...") | |
| try: | |
| from medical_transcription_retriever import MedicalTranscriptionRetriever | |
| retriever = MedicalTranscriptionRetriever() | |
| saved_files = retriever.run( | |
| limit=5, save_to_file=True, save_by_user=True) | |
| if not saved_files: | |
| print("β No transcriptions downloaded from Langfuse") | |
| print("Please check your Langfuse configuration and try again") | |
| return None | |
| print( | |
| f"β Successfully downloaded transcriptions: {len(saved_files)} files") | |
| except Exception as e: | |
| print(f"β Error downloading transcriptions: {e}") | |
| print("Continuing with existing transcriptions if available...") | |
| # Step 2: Check if we have transcription files | |
| transcriptions_dir = "transcriptions" | |
| if not os.path.exists(transcriptions_dir): | |
| print(f"β Transcriptions directory not found: {transcriptions_dir}") | |
| return None | |
| transcription_files = list(Path(transcriptions_dir).glob("*.json")) | |
| if not transcription_files: | |
| print(f"β No transcription files found in {transcriptions_dir}") | |
| return None | |
| print(f"π Found {len(transcription_files)} transcription files") | |
| # Step 3: Test with the first transcription file | |
| first_transcription = transcription_files[0] | |
| print(f"π Using transcription file: {first_transcription.name}") | |
| try: | |
| # Step 4: Initialize the orchestrator | |
| print( | |
| "\nπ Step 2: Initializing orchestrator with automatic SFTP model detection...") | |
| from langchain_medical_agents_refactored import MedicalDocumentOrchestrator | |
| orchestrator = MedicalDocumentOrchestrator( | |
| template_path=None, # Let the SFTP agent find the template | |
| transcription_path=str(first_transcription), | |
| transcriptions_dir=transcriptions_dir | |
| ) | |
| # Step 5: Run the complete pipeline | |
| print("\nπ Step 3: Running complete pipeline...") | |
| print("This will include:") | |
| print(" π₯ Step 0: SFTP Download (.rtf β .doc) - AUTOMATIC MODEL DETECTION") | |
| print(" π Step 1: Template Analysis") | |
| print(" βοΈ Step 2: Transcription Correction") | |
| print(" π¬ Step 3: Medical Data Analysis") | |
| print(" π Step 4: Title Generation") | |
| print(" π Step 5: Section Generation") | |
| print(" π Step 6: Document Assembly") | |
| print(" π Step 7: Validation") | |
| start_time = time.time() | |
| output_file = orchestrator.run_full_pipeline() | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| print(f"\nβ±οΈ Pipeline execution time: {execution_time:.2f} seconds") | |
| print(f"\nπ Pipeline completed successfully!") | |
| print(f"π Output file: {output_file}") | |
| # Step 6: Show SFTP download summary | |
| if orchestrator.downloaded_models: | |
| successful_downloads = [ | |
| m for m in orchestrator.downloaded_models if m['status'] == 'success'] | |
| failed_downloads = [ | |
| m for m in orchestrator.downloaded_models if m['status'] == 'error'] | |
| print(f"\nπ₯ SFTP Download Summary:") | |
| print( | |
| f" β Successfully downloaded: {len(successful_downloads)} models") | |
| print(f" β Failed downloads: {len(failed_downloads)} models") | |
| if successful_downloads: | |
| print(" π Downloaded models:") | |
| for model in successful_downloads[:5]: # Show first 5 | |
| print( | |
| f" - {model['model_id']}: {model['local_filename']}") | |
| if len(successful_downloads) > 5: | |
| print(f" ... and {len(successful_downloads) - 5} more") | |
| # Step 7: Verify output file exists | |
| if os.path.exists(output_file): | |
| file_size = os.path.getsize(output_file) | |
| print(f"\nβ Output file verified:") | |
| print(f" π File: {output_file}") | |
| print(f" π Size: {file_size} bytes") | |
| # Check if file is readable | |
| try: | |
| from docx import Document | |
| doc = Document(output_file) | |
| paragraph_count = len(doc.paragraphs) | |
| print(f" π Paragraphs: {paragraph_count}") | |
| print(f" β Document is readable and valid") | |
| except Exception as e: | |
| print(f" β οΈ Document validation failed: {e}") | |
| else: | |
| print(f"\nβ Output file not found: {output_file}") | |
| return output_file | |
| except Exception as e: | |
| print(f"β Error running pipeline: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def cleanup_test_files(): | |
| """Clean up test files after testing.""" | |
| print("\nπ§Ή Cleaning up test files...") | |
| # Remove generated documents | |
| for file in Path("./transcriptions").glob("*.json"): | |
| try: | |
| os.remove(file) | |
| print(f"ποΈ Removed: {file}") | |
| except Exception as e: | |
| print(f"β οΈ Could not remove {file}: {e}") | |
| for file in Path("./").glob("*.docx"): | |
| try: | |
| os.remove(file) | |
| print(f"ποΈ Removed: {file}") | |
| except Exception as e: | |
| print(f"β οΈ Could not remove {file}: {e}") | |
| for file in Path("./").glob("*.json"): | |
| try: | |
| os.remove(file) | |
| print(f"ποΈ Removed: {file}") | |
| except Exception as e: | |
| print(f"β οΈ Could not remove {file}: {e}") | |
| # Remove downloaded models | |
| models_dir = "models" | |
| if os.path.exists(models_dir): | |
| for file in Path(models_dir).glob("*.doc"): | |
| try: | |
| os.remove(file) | |
| print(f"ποΈ Removed: {file}") | |
| except Exception as e: | |
| print(f"β οΈ Could not remove {file}: {e}") | |
| def main(): | |
| """Main test function.""" | |
| print("π§ͺ Complete Pipeline Test with Langfuse Integration") | |
| print("=" * 70) | |
| # Check if we're in the right directory | |
| if not os.path.exists("transcriptions"): | |
| print("β Please run this script from the project root directory") | |
| print(" (where the 'transcriptions' folder is located)") | |
| return | |
| # Show current configuration | |
| try: | |
| from sftp_config import print_sftp_config | |
| print_sftp_config() | |
| except ImportError: | |
| print("β οΈ SFTP config not available") | |
| # Run the complete pipeline test | |
| result = test_complete_pipeline() | |
| if result: | |
| print(f"\nπ Complete pipeline test completed successfully!") | |
| print(f"π Generated document: {result}") | |
| # Ask if user wants to clean up | |
| cleanup = input( | |
| "\nπ§Ή Do you want to clean up test files? (y/n): ").lower().strip() | |
| if cleanup in ['y', 'yes']: | |
| cleanup_test_files() | |
| else: | |
| print(f"\nβ Complete pipeline test failed") | |
| if __name__ == "__main__": | |
| main() | |