import gradio as gr import time import json import os import subprocess from datetime import datetime, timedelta from typing import List, Tuple, Optional from app_core import ( ALLOWED_LANGS, AUDIO_FORMATS, transcription_manager, allowed_file, User ) def format_status(status): """Convert status to user-friendly format""" status_map = { 'pending': 'ā³ Queued', 'processing': 'šŸ”„ Processing', 'completed': 'āœ… Done', 'failed': 'āŒ Failed' } return status_map.get(status, status) def format_processing_time(created_at, completed_at=None): """Calculate and format processing time""" try: start_time = datetime.fromisoformat(created_at) if completed_at: end_time = datetime.fromisoformat(completed_at) duration = end_time - start_time else: duration = datetime.now() - start_time total_seconds = int(duration.total_seconds()) if total_seconds < 60: return f"{total_seconds}s" elif total_seconds < 3600: minutes = total_seconds // 60 seconds = total_seconds % 60 return f"{minutes}m {seconds}s" else: hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return f"{hours}h {minutes}m" except: return "Unknown" def get_user_stats_display(user: User): """Get user statistics for display""" if not user: return "šŸ‘¤ Please log in to view statistics" try: stats = transcription_manager.get_user_stats(user.user_id) total = stats.get('total_jobs', 0) recent = stats.get('recent_jobs', 0) by_status = stats.get('by_status', {}) completed = by_status.get('completed', 0) processing = by_status.get('processing', 0) pending = by_status.get('pending', 0) failed = by_status.get('failed', 0) stats_text = f"šŸ‘¤ {user.username} | šŸ“Š Total: {total} | āœ… Completed: {completed}" if processing > 0: stats_text += f" | šŸ”„ Processing: {processing}" if pending > 0: stats_text += f" | ā³ Pending: {pending}" if failed > 0: stats_text += f" | āŒ Failed: {failed}" if recent > 0: stats_text += f" | šŸ“… Last 7 days: {recent}" return stats_text except Exception as e: return f"šŸ‘¤ {user.username} | Stats error: {str(e)}" # Authentication Functions def register_user(email, username, password, confirm_password, gdpr_consent, data_retention_consent, marketing_consent): """Register new user account""" try: print(f"šŸ“ Registration attempt for: {username} ({email})") # Validate inputs if not email or not username or not password: return "āŒ All fields are required", gr.update(visible=False) if password != confirm_password: return "āŒ Passwords do not match", gr.update(visible=False) if not gdpr_consent: return "āŒ GDPR consent is required to create an account", gr.update(visible=False) if not data_retention_consent: return "āŒ Data retention agreement is required", gr.update(visible=False) # Attempt registration success, message, user_id = transcription_manager.register_user( email, username, password, gdpr_consent, data_retention_consent, marketing_consent ) print(f"šŸ“ Registration result: success={success}, message={message}") if success: print(f"āœ… User registered successfully: {username}") return f"āœ… {message}! Please log in with your credentials.", gr.update(visible=True) else: print(f"āŒ Registration failed: {message}") return f"āŒ {message}", gr.update(visible=False) except Exception as e: print(f"āŒ Registration error: {str(e)}") return f"āŒ Registration error: {str(e)}", gr.update(visible=False) def login_user(login, password): """Login user""" try: print(f"šŸ” Login attempt for: {login}") if not login or not password: return "āŒ Please enter both username/email and password", None, gr.update(visible=True), gr.update(visible=False), "šŸ‘¤ Please log in to view your statistics..." success, message, user = transcription_manager.login_user(login, password) print(f"šŸ” Login result: success={success}, message={message}") if success and user: print(f"āœ… User logged in successfully: {user.username}") stats_display = get_user_stats_display(user) return f"āœ… Welcome back, {user.username}!", user, gr.update(visible=False), gr.update(visible=True), stats_display else: print(f"āŒ Login failed: {message}") return f"āŒ {message}", None, gr.update(visible=True), gr.update(visible=False), "šŸ‘¤ Please log in to view your statistics..." except Exception as e: print(f"āŒ Login error: {str(e)}") return f"āŒ Login error: {str(e)}", None, gr.update(visible=True), gr.update(visible=False), "šŸ‘¤ Please log in to view your statistics..." def logout_user(): """Logout user""" print("šŸ‘‹ User logged out") return None, "šŸ‘‹ You have been logged out. Please log in to continue.", gr.update(visible=True), gr.update(visible=False), "šŸ‘¤ Please log in to view your statistics..." # Transcription Functions (require authentication) def submit_transcription(file, language, audio_format, diarization_enabled, speakers, profanity, punctuation, timestamps, lexical, user): """Submit transcription job - requires authenticated user""" if not user: return ( "āŒ Please log in to submit transcriptions", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) if file is None: return ( "Please upload an audio or video file first.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) try: # Get file data try: if isinstance(file, str): if os.path.exists(file): with open(file, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file) else: return ( "File not found. Please try uploading again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) else: file_path = str(file) if os.path.exists(file_path): with open(file_path, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file_path) else: return ( "Unable to process file. Please try again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) except Exception as e: return ( f"Error reading file: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Validate file file_extension = original_filename.split('.')[-1].lower() if '.' in original_filename else "" supported_extensions = set(AUDIO_FORMATS) | { 'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v', 'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob' } if file_extension not in supported_extensions and file_extension != "": return ( f"Unsupported file format: .{file_extension}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Basic file size check if len(file_bytes) > 500 * 1024 * 1024: # 500MB limit return ( "File too large. Please upload files smaller than 500MB.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Prepare settings settings = { 'audio_format': audio_format, 'diarization_enabled': diarization_enabled, 'speakers': speakers, 'profanity': profanity, 'punctuation': punctuation, 'timestamps': timestamps, 'lexical': lexical } # Submit job (logging happens in app_core) job_id = transcription_manager.submit_transcription( file_bytes, original_filename, user.user_id, language, settings ) # Update job state job_state = { 'current_job_id': job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'pending' } # Get updated user stats stats_display = get_user_stats_display(user) return ( f"šŸš€ Transcription started for: {original_filename}\nšŸ“” Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Job ID: {job_id}", job_state, gr.update(visible=True, value="šŸ”„ Auto-refresh active"), stats_display ) except Exception as e: print(f"āŒ Error submitting transcription: {str(e)}") return ( f"Error: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) def check_current_job_status(job_state, user): """Check status of current job with improved transcript handling""" if not user: return ( "āŒ Please log in to check status", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) if not job_state or 'current_job_id' not in job_state: return ( "No active job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return ( "Job not found or access denied", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Calculate processing time processing_time = format_processing_time(job.created_at, job.completed_at) # Enhanced status change logging last_status = job_state.get('last_status', '') if job.status != last_status: print(f"šŸ”„ [{user.username}] Job status change: {last_status} → {job.status} ({job.original_filename})") job_state['last_status'] = job.status # Also log transcript availability for completed jobs if job.status == 'completed': has_text = bool(job.transcript_text and job.transcript_text.strip()) text_length = len(job.transcript_text) if job.transcript_text else 0 print(f"šŸ“ [{user.username}] Transcript status: available={has_text}, length={text_length}") # Get updated user stats stats_display = get_user_stats_display(user) # Handle completed status with better transcript detection if job.status == 'completed' and job.transcript_text and job.transcript_text.strip(): # Job is complete and transcript is available, stop auto-refresh job_state['auto_refresh_active'] = False # Create downloadable file try: transcript_file = create_transcript_file(job.transcript_text, job_id) print(f"āœ… [{user.username}] Transcription ready: {len(job.transcript_text)} characters") except Exception as e: print(f"āš ļø [{user.username}] Error creating transcript file: {str(e)}") transcript_file = None return ( f"āœ… Transcription completed in {processing_time}", job.transcript_text, gr.update(visible=True, value=transcript_file) if transcript_file else gr.update(visible=False), f"Processed: {job.original_filename}", gr.update(visible=False), # Hide auto-refresh status stats_display ) elif job.status == 'failed': # Job failed, stop auto-refresh job_state['auto_refresh_active'] = False error_msg = job.error_message[:100] + "..." if job.error_message and len(job.error_message) > 100 else job.error_message or "Unknown error" return ( f"āŒ Transcription failed after {processing_time}", "", gr.update(visible=False), f"Error: {error_msg}", gr.update(visible=False), stats_display ) elif job.status == 'processing': # Still processing, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"šŸ”„ Processing... ({processing_time} elapsed)\nšŸ“” Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Converting and analyzing: {job.original_filename}", gr.update(visible=True, value="šŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) elif job.status == 'completed' and (not job.transcript_text or not job.transcript_text.strip()): # Job marked as completed but transcript not yet available - keep refreshing auto_refresh_active = job_state.get('auto_refresh_active', False) print(f"ā³ [{user.username}] Job completed but transcript not ready yet - continuing refresh") return ( f"šŸ”„ Finalizing transcript... ({processing_time} elapsed)\nšŸ“” Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Retrieving results: {job.original_filename}", gr.update(visible=True, value="šŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) else: # pending # Still pending, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"ā³ Queued for processing... ({processing_time} waiting)\nšŸ“” Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Waiting: {job.original_filename}", gr.update(visible=True, value="šŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) except Exception as e: print(f"āŒ Error checking job status: {str(e)}") return ( f"Error checking status: {str(e)}", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) def force_refresh_completed_jobs(job_state, user): """Force refresh for completed jobs that might have missed transcript update""" if not user or not job_state or 'current_job_id' not in job_state: return check_current_job_status(job_state, user) job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if job and job.user_id == user.user_id and job.status == 'completed': # Force a fresh check for completed jobs print(f"šŸ”„ [{user.username}] Force refreshing completed job: {job_id[:8]}...") return check_current_job_status(job_state, user) except Exception as e: print(f"āŒ Error in force refresh: {str(e)}") return check_current_job_status(job_state, user) # Update the refresh button click handler to be more aggressive: def manual_refresh_with_force(job_state, user): """Manual refresh with force update for completed jobs""" if user: print(f"šŸ”„ [{user.username}] Manual status check requested") # If job is completed, do a more thorough refresh if job_state and 'current_job_id' in job_state: try: job = transcription_manager.get_job_status(job_state['current_job_id']) if job and job.status == 'completed': print(f"šŸŽÆ [{user.username}] Forcing refresh of completed job") return force_refresh_completed_jobs(job_state, user) except Exception as e: print(f"āŒ Error in manual refresh check: {str(e)}") return check_current_job_status(job_state, user) def should_auto_refresh(job_state, user): """Check if auto-refresh should be active - improved logic""" if not user or not job_state or not job_state.get('auto_refresh_active', False): return False if 'current_job_id' not in job_state: return False job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return False # Continue auto-refresh until: # 1. Job failed permanently # 2. Job completed AND transcript text is available AND non-empty if job.status == 'failed': return False elif job.status == 'completed': # Only stop if transcript is actually available and has content if job.transcript_text and job.transcript_text.strip(): print(f"šŸ [{user.username}] Auto-refresh stopping - transcript ready ({len(job.transcript_text)} chars)") return False else: # Job marked complete but transcript not yet available - keep refreshing print(f"ā³ [{user.username}] Job complete but transcript not ready - continuing refresh") return True else: # Job still pending or processing - continue refreshing return True except Exception as e: print(f"āŒ Error in should_auto_refresh: {str(e)}") return True # Continue refreshing on error to be safe def auto_refresh_status(job_state, user): """Auto-refresh function with improved transcript detection""" if not user: return ( gr.update(), # No change to status_display gr.update(), # No change to transcript_output gr.update(), # No change to download_file gr.update(), # No change to job_info gr.update(visible=False), # Hide auto-refresh indicator gr.update() # No change to user stats ) # Always check if we should continue auto-refreshing if should_auto_refresh(job_state, user): return check_current_job_status(job_state, user) else: # Auto-refresh should stop, but do one final status check to ensure UI is updated if job_state and 'current_job_id' in job_state: job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if job and job.user_id == user.user_id: if job.status == 'completed' and job.transcript_text and job.transcript_text.strip(): # Do final update to show completed transcript print(f"šŸŽÆ [{user.username}] Final refresh - displaying completed transcript") job_state['auto_refresh_active'] = False return check_current_job_status(job_state, user) except Exception as e: print(f"āŒ Error in final refresh check: {str(e)}") return ( gr.update(), # No change to status_display gr.update(), # No change to transcript_output gr.update(), # No change to download_file gr.update(), # No change to job_info gr.update(visible=False), # Hide auto-refresh indicator gr.update() # No change to user stats ) def stop_auto_refresh(job_state, user): """Manually stop auto-refresh""" if job_state: job_state['auto_refresh_active'] = False if user: print(f"ā¹ļø [{user.username}] Auto-refresh stopped by user") return gr.update(visible=False) # History Functions def get_user_history_table(user, show_all_user_transcriptions=False): """Get user history as a formatted table - PDPA compliant""" if not user: return [] try: if show_all_user_transcriptions: # Show ALL transcriptions for current user jobs = transcription_manager.get_user_history(user.user_id, limit=100) else: # Show recent transcriptions for current user jobs = transcription_manager.get_user_history(user.user_id, limit=20) if not jobs: return [] # Create table data table_data = [] for job in jobs: # Format datetime try: created_time = datetime.fromisoformat(job.created_at) formatted_date = created_time.strftime("%Y-%m-%d %H:%M") except: formatted_date = job.created_at[:16] # Status with emoji status_display = format_status(job.status) # Processing time time_display = format_processing_time(job.created_at, job.completed_at) # Job ID display (shortened for table) job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id # Language language_display = ALLOWED_LANGS.get(job.language, job.language) # Create download status if job.status == 'completed' and job.transcript_text: download_link = f"šŸ“„ Available" elif job.status == 'processing': download_link = "šŸ”„ Processing..." elif job.status == 'pending': download_link = "ā³ Queued..." elif job.status == 'failed': download_link = "āŒ Failed" else: download_link = "āš ļø Unknown" table_data.append([ formatted_date, job.original_filename, language_display, status_display, time_display, job_id_display, download_link ]) return table_data except Exception as e: print(f"āŒ Error loading user history: {str(e)}") return [] def refresh_history_and_downloads(user, show_all_user_transcriptions=False): """Refresh history table and create download files""" if not user: return ( [], gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update() ) try: if show_all_user_transcriptions: jobs = transcription_manager.get_user_history(user.user_id, limit=100) else: jobs = transcription_manager.get_user_history(user.user_id, limit=20) # Get table data table_data = get_user_history_table(user, show_all_user_transcriptions) # Create download files for completed jobs download_updates = [] completed_jobs = [job for job in jobs if job.status == 'completed' and job.transcript_text] for i in range(10): # We have 10 download file components if i < len(completed_jobs): job = completed_jobs[i] # Create transcript file transcript_file = create_transcript_file(job.transcript_text, job.job_id) # Create label with filename and job info label = f"šŸ“„ {job.original_filename} ({job.created_at[:10]})" download_updates.append( gr.update(visible=True, value=transcript_file, label=label) ) else: download_updates.append(gr.update(visible=False)) # Get updated user stats stats_display = get_user_stats_display(user) return [table_data] + download_updates + [stats_display] except Exception as e: print(f"āŒ Error refreshing user history: {str(e)}") return ( [], gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update() ) def on_history_refresh_click(user, show_all_user_transcriptions): """Manual history refresh""" if user: print(f"šŸ”„ [{user.username}] User refreshed history (show_all: {show_all_user_transcriptions})") return refresh_history_and_downloads(user, show_all_user_transcriptions) def on_history_tab_select(user): """Auto-refresh history when history tab is selected""" if user: print(f"šŸ“š [{user.username}] History tab opened, refreshing data...") return refresh_history_and_downloads(user, show_all_user_transcriptions=False) # PDPA Compliance Functions def export_user_data(user): """Export user data for GDPR compliance""" if not user: return "āŒ Please log in to export your data", gr.update(visible=False) try: export_data = transcription_manager.export_user_data(user.user_id) # Create export file os.makedirs("temp", exist_ok=True) filename = f"temp/user_data_export_{user.user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(export_data, f, indent=2, ensure_ascii=False) print(f"šŸ“¦ [{user.username}] Data export created") return "āœ… Your data has been exported successfully", gr.update(visible=True, value=filename, label="Download Your Data Export") except Exception as e: print(f"āŒ Error exporting user data: {str(e)}") return f"āŒ Export failed: {str(e)}", gr.update(visible=False) def update_marketing_consent(user, marketing_consent): """Update user's marketing consent""" if not user: return "āŒ Please log in to update consent" try: success = transcription_manager.update_user_consent(user.user_id, marketing_consent) if success: user.marketing_consent = marketing_consent print(f"šŸ“§ [{user.username}] Marketing consent updated: {marketing_consent}") return f"āœ… Marketing consent updated successfully" else: return "āŒ Failed to update consent" except Exception as e: return f"āŒ Error: {str(e)}" def delete_user_account(user, confirmation_text): """Delete user account and all data""" if not user: return "āŒ Please log in to delete account", None, gr.update(visible=True), gr.update(visible=False) if confirmation_text != "DELETE MY ACCOUNT": return "āŒ Please type 'DELETE MY ACCOUNT' to confirm", user, gr.update(visible=False), gr.update(visible=True) try: success = transcription_manager.delete_user_account(user.user_id) if success: print(f"šŸ—‘ļø [{user.username}] Account deleted successfully") return "āœ… Your account and all data have been permanently deleted", None, gr.update(visible=True), gr.update(visible=False) else: return "āŒ Failed to delete account", user, gr.update(visible=False), gr.update(visible=True) except Exception as e: return f"āŒ Error: {str(e)}", user, gr.update(visible=False), gr.update(visible=True) def on_user_login(user): """Update UI components when user logs in""" if user: return gr.update(value=user.marketing_consent) else: return gr.update(value=False) def check_system_status(): """Check if the system is properly initialized""" try: # Test database connection if transcription_manager and transcription_manager.db: # Try a simple database operation test_stats = transcription_manager.db.get_user_stats("test_user_id") print("āœ… System initialization successful") return "šŸ‘¤ Please log in to view your statistics..." else: print("āŒ System initialization failed - transcription manager not available") return "āŒ System initialization failed - please check configuration" except Exception as e: print(f"āŒ System initialization error: {str(e)}") return f"āŒ System error: {str(e)}" def create_transcript_file(transcript_text, job_id): """Create a downloadable transcript file""" os.makedirs("temp", exist_ok=True) filename = f"temp/transcript_{job_id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(transcript_text) return filename # Enhanced CSS with authentication styling enhanced_css = """ /* Main container styling */ .gradio-container { background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); font-family: 'Segoe UI', system-ui, sans-serif; color: #212529; } /* Card styling */ .gr-box { background: white; border: 1px solid #dee2e6; border-radius: 12px; box-shadow: 0 2px 8px rgba(0,0,0,0.08); padding: 20px; margin: 10px 0; } /* Button styling */ .gr-button { background: linear-gradient(135deg, #007bff, #0056b3); border: none; border-radius: 8px; color: white; font-weight: 500; padding: 12px 24px; transition: all 0.2s ease; box-shadow: 0 2px 4px rgba(0,123,255,0.2); } .gr-button:hover { background: linear-gradient(135deg, #0056b3, #004085); transform: translateY(-1px); box-shadow: 0 4px 8px rgba(0,123,255,0.3); } .gr-button[variant="secondary"] { background: linear-gradient(135deg, #6c757d, #495057); } .gr-button[variant="secondary"]:hover { background: linear-gradient(135deg, #495057, #343a40); } /* Login/Register button styling */ .auth-button { background: linear-gradient(135deg, #28a745, #1e7e34); min-width: 120px; } .auth-button:hover { background: linear-gradient(135deg, #1e7e34, #155724); } .danger-button { background: linear-gradient(135deg, #dc3545, #c82333); } .danger-button:hover { background: linear-gradient(135deg, #c82333, #a71e2a); } /* Input styling */ .gr-textbox, .gr-dropdown, .gr-file { border: 2px solid #e9ecef; border-radius: 8px; background: white; color: #212529; transition: border-color 0.2s ease; } .gr-textbox:focus, .gr-dropdown:focus { border-color: #007bff; box-shadow: 0 0 0 3px rgba(0,123,255,0.1); } /* Status styling */ .status-display { background: linear-gradient(135deg, #e3f2fd, #bbdefb); border-left: 4px solid #2196f3; padding: 15px; border-radius: 0 8px 8px 0; margin: 10px 0; } .success-status { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border-left-color: #4caf50; } .error-status { background: linear-gradient(135deg, #ffebee, #ffcdd2); border-left-color: #f44336; } /* Auto-refresh indicator styling */ .auto-refresh-indicator { background: linear-gradient(135deg, #fff3cd, #ffeaa7); border: 1px solid #ffeaa7; border-radius: 6px; padding: 8px 12px; font-size: 12px; color: #856404; text-align: center; animation: pulse 2s infinite; } @keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.7; } 100% { opacity: 1; } } /* User stats styling */ .user-stats { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border: 1px solid #c8e6c9; border-radius: 6px; padding: 8px 12px; font-size: 12px; color: #2e7d32; text-align: center; font-weight: 500; } /* Authentication form styling */ .auth-form { background: white; border: 2px solid #007bff; border-radius: 12px; padding: 25px; box-shadow: 0 4px 12px rgba(0,123,255,0.15); } /* Privacy notice styling */ .privacy-notice { background: linear-gradient(135deg, #e3f2fd, #bbdefb); border: 1px solid #2196f3; border-radius: 8px; padding: 15px; font-size: 14px; color: #1976d2; margin: 10px 0; } /* PDPA section styling */ .pdpa-section { background: linear-gradient(135deg, #fff3cd, #ffeaa7); border: 1px solid #ffc107; border-radius: 8px; padding: 15px; margin: 10px 0; } /* History table styling */ .history-table { background: white; border: 1px solid #dee2e6; border-radius: 8px; font-size: 14px; } .history-table thead th { background: linear-gradient(135deg, #f8f9fa, #e9ecef); color: #495057; font-weight: 600; padding: 12px; border-bottom: 2px solid #dee2e6; } .history-table tbody tr { cursor: pointer; transition: background-color 0.2s ease; } .history-table tbody tr:hover { background: linear-gradient(135deg, #f8f9fa, #e9ecef); } .history-table tbody tr:nth-child(even) { background: #f8f9fa; } .history-table tbody tr:nth-child(even):hover { background: linear-gradient(135deg, #e9ecef, #dee2e6); } .history-table tbody td { padding: 10px; border-bottom: 1px solid #dee2e6; vertical-align: middle; } /* Tab styling */ .tab-nav { background: white; border-bottom: 2px solid #dee2e6; border-radius: 8px 8px 0 0; } /* Header styling */ .main-header { background: white; border: 1px solid #dee2e6; border-radius: 12px; padding: 25px; text-align: center; margin-bottom: 20px; box-shadow: 0 2px 8px rgba(0,0,0,0.05); } .main-header h1 { color: #007bff; margin-bottom: 10px; font-size: 2.2em; font-weight: 600; } .main-header p { color: #6c757d; font-size: 1.1em; margin: 0; } """ # Create the main interface with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="gray", neutral_hue="gray", font=["system-ui", "sans-serif"] ), css=enhanced_css, title="šŸŽ™ļø Azure Transcipt Service - Secure & PDPA Compliant" ) as demo: # Global state current_user = gr.State(None) job_state = gr.State({}) # Header with gr.Row(): gr.HTML("""

šŸŽ™ļø Azure Transcipt Service

Secure, PDPA-compliant transcription service with user authentication and privacy protection

""") # User stats display user_stats_display = gr.Textbox( label="", lines=1, interactive=False, show_label=False, placeholder="šŸ‘¤ Please log in to view your statistics...", elem_classes=["user-stats"] ) # Authentication Section with gr.Column(visible=True, elem_classes=["auth-form"]) as auth_section: gr.Markdown("## šŸ” Authentication Required") gr.Markdown("Please log in or create an account to use the transcription service.") with gr.Tabs() as auth_tabs: # Login Tab with gr.Tab("šŸ”‘ Login") as login_tab: with gr.Column(): login_email = gr.Textbox( label="Email or Username", placeholder="Enter your email or username" ) login_password = gr.Textbox( label="Password", type="password", placeholder="Enter your password" ) with gr.Row(): login_btn = gr.Button("šŸ”‘ Login", variant="primary", elem_classes=["auth-button"]) login_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Enter your credentials and click Login" ) # Register Tab with gr.Tab("šŸ“ Register") as register_tab: with gr.Column(): reg_email = gr.Textbox( label="Email", placeholder="Enter your email address" ) reg_username = gr.Textbox( label="Username", placeholder="Choose a username (3-30 characters, alphanumeric and underscore)" ) reg_password = gr.Textbox( label="Password", type="password", placeholder="Create a strong password (min 8 chars, mixed case, numbers)" ) reg_confirm_password = gr.Textbox( label="Confirm Password", type="password", placeholder="Confirm your password" ) gr.Markdown("### šŸ“‹ Privacy & Data Consent") with gr.Column(elem_classes=["privacy-notice"]): gr.Markdown(""" **Privacy Notice**: By creating an account, you acknowledge that: - Your data will be stored securely in user-separated Azure Blob Storage - Transcriptions are processed using Azure Speech Services - You can export or delete your data at any time - We comply with GDPR and data protection regulations """) gdpr_consent = gr.Checkbox( label="I consent to the processing of my personal data as described in the Privacy Notice (Required)", value=False ) data_retention_consent = gr.Checkbox( label="I agree to data retention for transcription service functionality (Required)", value=False ) marketing_consent = gr.Checkbox( label="I consent to receiving marketing communications (Optional)", value=False ) with gr.Row(): register_btn = gr.Button("šŸ“ Create Account", variant="primary", elem_classes=["auth-button"]) register_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Fill out the form and agree to the required consents to create your account" ) login_after_register = gr.Button( "šŸ”‘ Go to Login", visible=False, variant="secondary" ) # Main Application (visible only when logged in) with gr.Column(visible=False) as main_app: # Logout button with gr.Row(): with gr.Column(scale=3): pass with gr.Column(scale=1): logout_btn = gr.Button("šŸ‘‹ Logout", variant="secondary") # Main transcription interface with gr.Tab("šŸŽ™ļø Transcribe"): with gr.Row(): # Left column - Input settings with gr.Column(scale=1): gr.Markdown("### šŸ“ Upload File") file_upload = gr.File( label="Audio or Video File", type="filepath", file_types=[ ".wav", ".mp3", ".ogg", ".opus", ".flac", ".wma", ".aac", ".m4a", ".amr", ".webm", ".speex", ".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".3gp" ] ) with gr.Row(): language = gr.Dropdown( choices=[(v, k) for k, v in ALLOWED_LANGS.items()], label="Language", value="en-US" ) audio_format = gr.Dropdown( choices=AUDIO_FORMATS, value="wav", label="Output Format" ) gr.Markdown("### āš™ļø Settings") with gr.Row(): diarization_enabled = gr.Checkbox( label="Speaker Identification", value=True ) speakers = gr.Slider( minimum=1, maximum=10, step=1, value=2, label="Max Speakers" ) with gr.Row(): timestamps = gr.Checkbox( label="Timestamps", value=True ) profanity = gr.Dropdown( choices=["masked", "removed", "raw"], value="masked", label="Profanity Filter" ) with gr.Row(): punctuation = gr.Dropdown( choices=["automatic", "dictated", "none"], value="automatic", label="Punctuation" ) lexical = gr.Checkbox( label="Lexical Form", value=False ) submit_btn = gr.Button( "šŸš€ Start Transcription", variant="primary", size="lg" ) # Right column - Results with gr.Column(scale=1): gr.Markdown("### šŸ“Š Status") # Auto-refresh indicator auto_refresh_status_display = gr.Textbox( label="", lines=1, interactive=False, show_label=False, visible=False, elem_classes=["auto-refresh-indicator"] ) status_display = gr.Textbox( label="", lines=3, interactive=False, show_label=False, placeholder="Upload a file and click 'Start Transcription' to begin...\nStatus will auto-refresh every 10 seconds during processing.\nYour data is stored in your private user folder for PDPA compliance." ) job_info = gr.Textbox( label="", lines=1, interactive=False, show_label=False, placeholder="" ) with gr.Row(): refresh_btn = gr.Button( "šŸ”„ Check Status", variant="secondary" ) stop_refresh_btn = gr.Button( "ā¹ļø Stop Auto-Refresh", variant="secondary" ) gr.Markdown("### šŸ“„ Results") transcript_output = gr.Textbox( label="Transcript", lines=12, interactive=False, placeholder="Your transcript with speaker identification and precise timestamps (HH:MM:SS) will appear here..." ) download_file = gr.File( label="Download", interactive=False, visible=False ) # History tab with gr.Tab("šŸ“š My History"): gr.Markdown("### šŸ“‹ Your Transcription History & Downloads") gr.Markdown("*View your personal transcription history and download completed transcripts (PDPA compliant - only your data)*") with gr.Row(): refresh_history_btn = gr.Button( "šŸ”„ Refresh My History & Downloads", variant="primary" ) show_all_user_checkbox = gr.Checkbox( label="Show All My Transcriptions (not just recent 20)", value=False ) history_table = gr.Dataframe( headers=["Date", "Filename", "Language", "Status", "Duration", "Job ID", "Download"], datatype=["str", "str", "str", "str", "str", "str", "str"], col_count=(7, "fixed"), row_count=(15, "dynamic"), wrap=True, interactive=False, elem_classes=["history-table"] ) # Download Files Section gr.Markdown("### šŸ“„ Download Your Completed Transcripts") gr.Markdown("*Your available transcript downloads will appear below after refreshing*") # Container for dynamic download files with gr.Column(): download_file_1 = gr.File(label="", visible=False, interactive=False) download_file_2 = gr.File(label="", visible=False, interactive=False) download_file_3 = gr.File(label="", visible=False, interactive=False) download_file_4 = gr.File(label="", visible=False, interactive=False) download_file_5 = gr.File(label="", visible=False, interactive=False) download_file_6 = gr.File(label="", visible=False, interactive=False) download_file_7 = gr.File(label="", visible=False, interactive=False) download_file_8 = gr.File(label="", visible=False, interactive=False) download_file_9 = gr.File(label="", visible=False, interactive=False) download_file_10 = gr.File(label="", visible=False, interactive=False) # PDPA Compliance Tab with gr.Tab("šŸ”’ Privacy & Data"): gr.Markdown("### šŸ”’ GDPR & Data Protection") gr.Markdown("Manage your personal data and privacy settings in compliance with data protection regulations.") with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### šŸ“Š Data Export") gr.Markdown("Download all your personal data including transcriptions, account info, and usage statistics.") export_btn = gr.Button("šŸ“¦ Export My Data", variant="primary") export_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Click 'Export My Data' to download your complete data archive" ) export_file = gr.File( label="Your Data Export", visible=False, interactive=False ) with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### šŸ“§ Marketing Consent") gr.Markdown("Update your preferences for receiving marketing communications.") marketing_consent_checkbox = gr.Checkbox( label="I consent to receiving marketing communications", value=False ) update_consent_btn = gr.Button("āœ… Update Consent", variant="secondary") consent_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Update your marketing consent preferences" ) with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### āš ļø Account Deletion") gr.Markdown(""" **Warning**: This action is irreversible and will permanently delete: - Your user account and profile - All transcription history and files - All data stored in Azure Blob Storage - Usage statistics and preferences """) deletion_confirmation = gr.Textbox( label="Type 'DELETE MY ACCOUNT' to confirm", placeholder="Type the exact phrase to confirm account deletion" ) delete_account_btn = gr.Button( "šŸ—‘ļø Delete My Account", variant="stop", elem_classes=["danger-button"] ) deletion_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Account deletion requires confirmation text" ) # Auto-refresh timer timer = gr.Timer(10.0) # Event handlers # Authentication events login_btn.click( login_user, inputs=[login_email, login_password], outputs=[login_status, current_user, auth_section, main_app, user_stats_display] ).then( on_user_login, inputs=[current_user], outputs=[marketing_consent_checkbox] ).then( lambda user: ("", "") if user else (gr.update(), gr.update()), # Clear login fields on success inputs=[current_user], outputs=[login_email, login_password] ) register_btn.click( register_user, inputs=[reg_email, reg_username, reg_password, reg_confirm_password, gdpr_consent, data_retention_consent, marketing_consent], outputs=[register_status, login_after_register] ).then( lambda status: ("", "", "", "", False, False, False) if "āœ…" in status else (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()), inputs=[register_status], outputs=[reg_email, reg_username, reg_password, reg_confirm_password, gdpr_consent, data_retention_consent, marketing_consent] ) login_after_register.click( lambda: (gr.update(selected=0), ""), # Switch to login tab (index 0) and clear status outputs=[auth_tabs, register_status] ) logout_btn.click( logout_user, outputs=[current_user, login_status, auth_section, main_app, user_stats_display] ) # Transcription events submit_btn.click( submit_transcription, inputs=[ file_upload, language, audio_format, diarization_enabled, speakers, profanity, punctuation, timestamps, lexical, current_user ], outputs=[status_display, transcript_output, download_file, job_info, job_state, auto_refresh_status_display, user_stats_display] ) refresh_btn.click( lambda job_state, user: ( print("šŸ”„ User manually checked status") if user else None, check_current_job_status(job_state, user) )[1], inputs=[job_state, current_user], outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display] ) stop_refresh_btn.click( stop_auto_refresh, inputs=[job_state, current_user], outputs=[auto_refresh_status_display] ) # Auto-refresh timer event timer.tick( auto_refresh_status, inputs=[job_state, current_user], outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display] ) # History events refresh_history_btn.click( on_history_refresh_click, inputs=[current_user, show_all_user_checkbox], outputs=[ history_table, download_file_1, download_file_2, download_file_3, download_file_4, download_file_5, download_file_6, download_file_7, download_file_8, download_file_9, download_file_10, user_stats_display ] ) show_all_user_checkbox.change( lambda user, show_all: ( print(f"šŸ‘ļø User toggled show all personal transcriptions: {show_all}") if user else None, refresh_history_and_downloads(user, show_all) )[1], inputs=[current_user, show_all_user_checkbox], outputs=[ history_table, download_file_1, download_file_2, download_file_3, download_file_4, download_file_5, download_file_6, download_file_7, download_file_8, download_file_9, download_file_10, user_stats_display ] ) # PDPA compliance events export_btn.click( export_user_data, inputs=[current_user], outputs=[export_status, export_file] ) update_consent_btn.click( update_marketing_consent, inputs=[current_user, marketing_consent_checkbox], outputs=[consent_status] ) delete_account_btn.click( delete_user_account, inputs=[current_user, deletion_confirmation], outputs=[deletion_status, current_user, auth_section, main_app] ) # Auto-hide/show speakers slider diarization_enabled.change( lambda enabled: gr.update(visible=enabled), inputs=[diarization_enabled], outputs=[speakers] ) # Load user stats on app start and verify system is ready demo.load( lambda: ( print("šŸš€ PDPA-Compliant Azure Transcipt Service Service Started..."), check_system_status() )[1], outputs=[user_stats_display] ) # Info section with demo: gr.HTML("""

šŸ“‹ How to Use

  1. Register/Login: Create an account or log in with existing credentials
  2. Upload: Select your audio or video file
  3. Configure: Choose language and enable speaker identification
  4. Start: Click "Start Transcription" - status will auto-update every 10 seconds
  5. Download: Get your transcript with speaker identification and timestamps
  6. Manage: Use Privacy & Data tab to export or delete your data

šŸŽµ Supported Formats

Audio: WAV, MP3, OGG, OPUS, FLAC, WMA, AAC, M4A, AMR, WebM, Speex

Video: MP4, MOV, AVI, MKV, WMV, FLV, 3GP

šŸ”’ Security & Privacy Features

šŸŽÆ Enhanced Features

šŸ’” Tips

""") if __name__ == "__main__": print("šŸš€ Starting Secure PDPA-Compliant Azure Transcipt Service Service...") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )