Chirapath's picture
Upload 8 files
d891c35 verified
import gradio as gr
import time
import json
import os
import subprocess
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
from app_core import (
ALLOWED_LANGS, AUDIO_FORMATS, transcription_manager,
allowed_file, User
)
def format_status(status):
"""Convert status to user-friendly format"""
status_map = {
'pending': '⏳ Queued',
'processing': 'πŸ”„ Processing',
'completed': 'βœ… Done',
'failed': '❌ Failed'
}
return status_map.get(status, status)
def format_processing_time(created_at, completed_at=None):
"""Calculate and format processing time"""
try:
start_time = datetime.fromisoformat(created_at)
if completed_at:
end_time = datetime.fromisoformat(completed_at)
duration = end_time - start_time
else:
duration = datetime.now() - start_time
total_seconds = int(duration.total_seconds())
if total_seconds < 60:
return f"{total_seconds}s"
elif total_seconds < 3600:
minutes = total_seconds // 60
seconds = total_seconds % 60
return f"{minutes}m {seconds}s"
else:
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
return f"{hours}h {minutes}m"
except:
return "Unknown"
def get_user_stats_display(user: User):
"""Get user statistics for display"""
if not user:
return "πŸ‘€ Please log in to view statistics"
try:
stats = transcription_manager.get_user_stats(user.user_id)
total = stats.get('total_jobs', 0)
recent = stats.get('recent_jobs', 0)
by_status = stats.get('by_status', {})
completed = by_status.get('completed', 0)
processing = by_status.get('processing', 0)
pending = by_status.get('pending', 0)
failed = by_status.get('failed', 0)
stats_text = f"πŸ‘€ {user.username} | πŸ“Š Total: {total} | βœ… Completed: {completed}"
if processing > 0:
stats_text += f" | πŸ”„ Processing: {processing}"
if pending > 0:
stats_text += f" | ⏳ Pending: {pending}"
if failed > 0:
stats_text += f" | ❌ Failed: {failed}"
if recent > 0:
stats_text += f" | πŸ“… Last 7 days: {recent}"
return stats_text
except Exception as e:
return f"πŸ‘€ {user.username} | Stats error: {str(e)}"
# Authentication Functions
def register_user(email, username, password, confirm_password, gdpr_consent, data_retention_consent, marketing_consent):
"""Register new user account"""
try:
print(f"πŸ“ Registration attempt for: {username} ({email})")
# Validate inputs
if not email or not username or not password:
return "❌ All fields are required", gr.update(visible=False)
if password != confirm_password:
return "❌ Passwords do not match", gr.update(visible=False)
if not gdpr_consent:
return "❌ GDPR consent is required to create an account", gr.update(visible=False)
if not data_retention_consent:
return "❌ Data retention agreement is required", gr.update(visible=False)
# Attempt registration
success, message, user_id = transcription_manager.register_user(
email, username, password, gdpr_consent, data_retention_consent, marketing_consent
)
print(f"πŸ“ Registration result: success={success}, message={message}")
if success:
print(f"βœ… User registered successfully: {username}")
return f"βœ… {message}! Please log in with your credentials.", gr.update(visible=True)
else:
print(f"❌ Registration failed: {message}")
return f"❌ {message}", gr.update(visible=False)
except Exception as e:
print(f"❌ Registration error: {str(e)}")
return f"❌ Registration error: {str(e)}", gr.update(visible=False)
def login_user(login, password):
"""Login user"""
try:
print(f"πŸ” Login attempt for: {login}")
if not login or not password:
return "❌ Please enter both username/email and password", None, gr.update(visible=True), gr.update(visible=False), "πŸ‘€ Please log in to view your statistics..."
success, message, user = transcription_manager.login_user(login, password)
print(f"πŸ” Login result: success={success}, message={message}")
if success and user:
print(f"βœ… User logged in successfully: {user.username}")
stats_display = get_user_stats_display(user)
return f"βœ… Welcome back, {user.username}!", user, gr.update(visible=False), gr.update(visible=True), stats_display
else:
print(f"❌ Login failed: {message}")
return f"❌ {message}", None, gr.update(visible=True), gr.update(visible=False), "πŸ‘€ Please log in to view your statistics..."
except Exception as e:
print(f"❌ Login error: {str(e)}")
return f"❌ Login error: {str(e)}", None, gr.update(visible=True), gr.update(visible=False), "πŸ‘€ Please log in to view your statistics..."
def logout_user():
"""Logout user"""
print("πŸ‘‹ User logged out")
return None, "πŸ‘‹ You have been logged out. Please log in to continue.", gr.update(visible=True), gr.update(visible=False), "πŸ‘€ Please log in to view your statistics..."
# Transcription Functions (require authentication)
def submit_transcription(file, language, audio_format, diarization_enabled, speakers,
profanity, punctuation, timestamps, lexical, user):
"""Submit transcription job - requires authenticated user"""
if not user:
return (
"❌ Please log in to submit transcriptions",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
if file is None:
return (
"Please upload an audio or video file first.",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
try:
# Get file data
try:
if isinstance(file, str):
if os.path.exists(file):
with open(file, 'rb') as f:
file_bytes = f.read()
original_filename = os.path.basename(file)
else:
return (
"File not found. Please try uploading again.",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
else:
file_path = str(file)
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
file_bytes = f.read()
original_filename = os.path.basename(file_path)
else:
return (
"Unable to process file. Please try again.",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
except Exception as e:
return (
f"Error reading file: {str(e)}",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
# Validate file
file_extension = original_filename.split('.')[-1].lower() if '.' in original_filename else ""
supported_extensions = set(AUDIO_FORMATS) | {
'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v',
'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob'
}
if file_extension not in supported_extensions and file_extension != "":
return (
f"Unsupported file format: .{file_extension}",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
# Basic file size check
if len(file_bytes) > 500 * 1024 * 1024: # 500MB limit
return (
"File too large. Please upload files smaller than 500MB.",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
# Prepare settings
settings = {
'audio_format': audio_format,
'diarization_enabled': diarization_enabled,
'speakers': speakers,
'profanity': profanity,
'punctuation': punctuation,
'timestamps': timestamps,
'lexical': lexical
}
# Submit job (logging happens in app_core)
job_id = transcription_manager.submit_transcription(
file_bytes, original_filename, user.user_id, language, settings
)
# Update job state
job_state = {
'current_job_id': job_id,
'start_time': datetime.now().isoformat(),
'auto_refresh_active': True,
'last_status': 'pending'
}
# Get updated user stats
stats_display = get_user_stats_display(user)
return (
f"πŸš€ Transcription started for: {original_filename}\nπŸ“‘ Auto-refreshing every 10 seconds...",
"",
gr.update(visible=False),
f"Job ID: {job_id}",
job_state,
gr.update(visible=True, value="πŸ”„ Auto-refresh active"),
stats_display
)
except Exception as e:
print(f"❌ Error submitting transcription: {str(e)}")
return (
f"Error: {str(e)}",
"",
gr.update(visible=False),
"",
{},
gr.update(visible=False),
gr.update()
)
def check_current_job_status(job_state, user):
"""Check status of current job with improved transcript handling"""
if not user:
return (
"❌ Please log in to check status",
"",
gr.update(visible=False),
"",
gr.update(visible=False),
gr.update()
)
if not job_state or 'current_job_id' not in job_state:
return (
"No active job",
"",
gr.update(visible=False),
"",
gr.update(visible=False),
gr.update()
)
job_id = job_state['current_job_id']
try:
job = transcription_manager.get_job_status(job_id)
if not job or job.user_id != user.user_id:
return (
"Job not found or access denied",
"",
gr.update(visible=False),
"",
gr.update(visible=False),
gr.update()
)
# Calculate processing time
processing_time = format_processing_time(job.created_at, job.completed_at)
# Enhanced status change logging
last_status = job_state.get('last_status', '')
if job.status != last_status:
print(f"πŸ”„ [{user.username}] Job status change: {last_status} β†’ {job.status} ({job.original_filename})")
job_state['last_status'] = job.status
# Also log transcript availability for completed jobs
if job.status == 'completed':
has_text = bool(job.transcript_text and job.transcript_text.strip())
text_length = len(job.transcript_text) if job.transcript_text else 0
print(f"πŸ“ [{user.username}] Transcript status: available={has_text}, length={text_length}")
# Get updated user stats
stats_display = get_user_stats_display(user)
# Handle completed status with better transcript detection
if job.status == 'completed' and job.transcript_text and job.transcript_text.strip():
# Job is complete and transcript is available, stop auto-refresh
job_state['auto_refresh_active'] = False
# Create downloadable file
try:
transcript_file = create_transcript_file(job.transcript_text, job_id)
print(f"βœ… [{user.username}] Transcription ready: {len(job.transcript_text)} characters")
except Exception as e:
print(f"⚠️ [{user.username}] Error creating transcript file: {str(e)}")
transcript_file = None
return (
f"βœ… Transcription completed in {processing_time}",
job.transcript_text,
gr.update(visible=True, value=transcript_file) if transcript_file else gr.update(visible=False),
f"Processed: {job.original_filename}",
gr.update(visible=False), # Hide auto-refresh status
stats_display
)
elif job.status == 'failed':
# Job failed, stop auto-refresh
job_state['auto_refresh_active'] = False
error_msg = job.error_message[:100] + "..." if job.error_message and len(job.error_message) > 100 else job.error_message or "Unknown error"
return (
f"❌ Transcription failed after {processing_time}",
"",
gr.update(visible=False),
f"Error: {error_msg}",
gr.update(visible=False),
stats_display
)
elif job.status == 'processing':
# Still processing, continue auto-refresh
auto_refresh_active = job_state.get('auto_refresh_active', False)
return (
f"πŸ”„ Processing... ({processing_time} elapsed)\nπŸ“‘ Auto-refreshing every 10 seconds...",
"",
gr.update(visible=False),
f"Converting and analyzing: {job.original_filename}",
gr.update(visible=True, value="πŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False),
stats_display
)
elif job.status == 'completed' and (not job.transcript_text or not job.transcript_text.strip()):
# Job marked as completed but transcript not yet available - keep refreshing
auto_refresh_active = job_state.get('auto_refresh_active', False)
print(f"⏳ [{user.username}] Job completed but transcript not ready yet - continuing refresh")
return (
f"πŸ”„ Finalizing transcript... ({processing_time} elapsed)\nπŸ“‘ Auto-refreshing every 10 seconds...",
"",
gr.update(visible=False),
f"Retrieving results: {job.original_filename}",
gr.update(visible=True, value="πŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False),
stats_display
)
else: # pending
# Still pending, continue auto-refresh
auto_refresh_active = job_state.get('auto_refresh_active', False)
return (
f"⏳ Queued for processing... ({processing_time} waiting)\nπŸ“‘ Auto-refreshing every 10 seconds...",
"",
gr.update(visible=False),
f"Waiting: {job.original_filename}",
gr.update(visible=True, value="πŸ”„ Auto-refresh active") if auto_refresh_active else gr.update(visible=False),
stats_display
)
except Exception as e:
print(f"❌ Error checking job status: {str(e)}")
return (
f"Error checking status: {str(e)}",
"",
gr.update(visible=False),
"",
gr.update(visible=False),
gr.update()
)
def force_refresh_completed_jobs(job_state, user):
"""Force refresh for completed jobs that might have missed transcript update"""
if not user or not job_state or 'current_job_id' not in job_state:
return check_current_job_status(job_state, user)
job_id = job_state['current_job_id']
try:
job = transcription_manager.get_job_status(job_id)
if job and job.user_id == user.user_id and job.status == 'completed':
# Force a fresh check for completed jobs
print(f"πŸ”„ [{user.username}] Force refreshing completed job: {job_id[:8]}...")
return check_current_job_status(job_state, user)
except Exception as e:
print(f"❌ Error in force refresh: {str(e)}")
return check_current_job_status(job_state, user)
# Update the refresh button click handler to be more aggressive:
def manual_refresh_with_force(job_state, user):
"""Manual refresh with force update for completed jobs"""
if user:
print(f"πŸ”„ [{user.username}] Manual status check requested")
# If job is completed, do a more thorough refresh
if job_state and 'current_job_id' in job_state:
try:
job = transcription_manager.get_job_status(job_state['current_job_id'])
if job and job.status == 'completed':
print(f"🎯 [{user.username}] Forcing refresh of completed job")
return force_refresh_completed_jobs(job_state, user)
except Exception as e:
print(f"❌ Error in manual refresh check: {str(e)}")
return check_current_job_status(job_state, user)
def should_auto_refresh(job_state, user):
"""Check if auto-refresh should be active - improved logic"""
if not user or not job_state or not job_state.get('auto_refresh_active', False):
return False
if 'current_job_id' not in job_state:
return False
job_id = job_state['current_job_id']
try:
job = transcription_manager.get_job_status(job_id)
if not job or job.user_id != user.user_id:
return False
# Continue auto-refresh until:
# 1. Job failed permanently
# 2. Job completed AND transcript text is available AND non-empty
if job.status == 'failed':
return False
elif job.status == 'completed':
# Only stop if transcript is actually available and has content
if job.transcript_text and job.transcript_text.strip():
print(f"🏁 [{user.username}] Auto-refresh stopping - transcript ready ({len(job.transcript_text)} chars)")
return False
else:
# Job marked complete but transcript not yet available - keep refreshing
print(f"⏳ [{user.username}] Job complete but transcript not ready - continuing refresh")
return True
else:
# Job still pending or processing - continue refreshing
return True
except Exception as e:
print(f"❌ Error in should_auto_refresh: {str(e)}")
return True # Continue refreshing on error to be safe
def auto_refresh_status(job_state, user):
"""Auto-refresh function with improved transcript detection"""
if not user:
return (
gr.update(), # No change to status_display
gr.update(), # No change to transcript_output
gr.update(), # No change to download_file
gr.update(), # No change to job_info
gr.update(visible=False), # Hide auto-refresh indicator
gr.update() # No change to user stats
)
# Always check if we should continue auto-refreshing
if should_auto_refresh(job_state, user):
return check_current_job_status(job_state, user)
else:
# Auto-refresh should stop, but do one final status check to ensure UI is updated
if job_state and 'current_job_id' in job_state:
job_id = job_state['current_job_id']
try:
job = transcription_manager.get_job_status(job_id)
if job and job.user_id == user.user_id:
if job.status == 'completed' and job.transcript_text and job.transcript_text.strip():
# Do final update to show completed transcript
print(f"🎯 [{user.username}] Final refresh - displaying completed transcript")
job_state['auto_refresh_active'] = False
return check_current_job_status(job_state, user)
except Exception as e:
print(f"❌ Error in final refresh check: {str(e)}")
return (
gr.update(), # No change to status_display
gr.update(), # No change to transcript_output
gr.update(), # No change to download_file
gr.update(), # No change to job_info
gr.update(visible=False), # Hide auto-refresh indicator
gr.update() # No change to user stats
)
def stop_auto_refresh(job_state, user):
"""Manually stop auto-refresh"""
if job_state:
job_state['auto_refresh_active'] = False
if user:
print(f"⏹️ [{user.username}] Auto-refresh stopped by user")
return gr.update(visible=False)
# History Functions
def get_user_history_table(user, show_all_user_transcriptions=False):
"""Get user history as a formatted table - PDPA compliant"""
if not user:
return []
try:
if show_all_user_transcriptions:
# Show ALL transcriptions for current user
jobs = transcription_manager.get_user_history(user.user_id, limit=100)
else:
# Show recent transcriptions for current user
jobs = transcription_manager.get_user_history(user.user_id, limit=20)
if not jobs:
return []
# Create table data
table_data = []
for job in jobs:
# Format datetime
try:
created_time = datetime.fromisoformat(job.created_at)
formatted_date = created_time.strftime("%Y-%m-%d %H:%M")
except:
formatted_date = job.created_at[:16]
# Status with emoji
status_display = format_status(job.status)
# Processing time
time_display = format_processing_time(job.created_at, job.completed_at)
# Job ID display (shortened for table)
job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id
# Language
language_display = ALLOWED_LANGS.get(job.language, job.language)
# Create download status
if job.status == 'completed' and job.transcript_text:
download_link = f"πŸ“„ Available"
elif job.status == 'processing':
download_link = "πŸ”„ Processing..."
elif job.status == 'pending':
download_link = "⏳ Queued..."
elif job.status == 'failed':
download_link = "❌ Failed"
else:
download_link = "⚠️ Unknown"
table_data.append([
formatted_date,
job.original_filename,
language_display,
status_display,
time_display,
job_id_display,
download_link
])
return table_data
except Exception as e:
print(f"❌ Error loading user history: {str(e)}")
return []
def refresh_history_and_downloads(user, show_all_user_transcriptions=False):
"""Refresh history table and create download files"""
if not user:
return (
[],
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False),
gr.update()
)
try:
if show_all_user_transcriptions:
jobs = transcription_manager.get_user_history(user.user_id, limit=100)
else:
jobs = transcription_manager.get_user_history(user.user_id, limit=20)
# Get table data
table_data = get_user_history_table(user, show_all_user_transcriptions)
# Create download files for completed jobs
download_updates = []
completed_jobs = [job for job in jobs if job.status == 'completed' and job.transcript_text]
for i in range(10): # We have 10 download file components
if i < len(completed_jobs):
job = completed_jobs[i]
# Create transcript file
transcript_file = create_transcript_file(job.transcript_text, job.job_id)
# Create label with filename and job info
label = f"πŸ“„ {job.original_filename} ({job.created_at[:10]})"
download_updates.append(
gr.update(visible=True, value=transcript_file, label=label)
)
else:
download_updates.append(gr.update(visible=False))
# Get updated user stats
stats_display = get_user_stats_display(user)
return [table_data] + download_updates + [stats_display]
except Exception as e:
print(f"❌ Error refreshing user history: {str(e)}")
return (
[],
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False),
gr.update()
)
def on_history_refresh_click(user, show_all_user_transcriptions):
"""Manual history refresh"""
if user:
print(f"πŸ”„ [{user.username}] User refreshed history (show_all: {show_all_user_transcriptions})")
return refresh_history_and_downloads(user, show_all_user_transcriptions)
def on_history_tab_select(user):
"""Auto-refresh history when history tab is selected"""
if user:
print(f"πŸ“š [{user.username}] History tab opened, refreshing data...")
return refresh_history_and_downloads(user, show_all_user_transcriptions=False)
# PDPA Compliance Functions
def export_user_data(user):
"""Export user data for GDPR compliance"""
if not user:
return "❌ Please log in to export your data", gr.update(visible=False)
try:
export_data = transcription_manager.export_user_data(user.user_id)
# Create export file
os.makedirs("temp", exist_ok=True)
filename = f"temp/user_data_export_{user.user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"πŸ“¦ [{user.username}] Data export created")
return "βœ… Your data has been exported successfully", gr.update(visible=True, value=filename, label="Download Your Data Export")
except Exception as e:
print(f"❌ Error exporting user data: {str(e)}")
return f"❌ Export failed: {str(e)}", gr.update(visible=False)
def update_marketing_consent(user, marketing_consent):
"""Update user's marketing consent"""
if not user:
return "❌ Please log in to update consent"
try:
success = transcription_manager.update_user_consent(user.user_id, marketing_consent)
if success:
user.marketing_consent = marketing_consent
print(f"πŸ“§ [{user.username}] Marketing consent updated: {marketing_consent}")
return f"βœ… Marketing consent updated successfully"
else:
return "❌ Failed to update consent"
except Exception as e:
return f"❌ Error: {str(e)}"
def delete_user_account(user, confirmation_text):
"""Delete user account and all data"""
if not user:
return "❌ Please log in to delete account", None, gr.update(visible=True), gr.update(visible=False)
if confirmation_text != "DELETE MY ACCOUNT":
return "❌ Please type 'DELETE MY ACCOUNT' to confirm", user, gr.update(visible=False), gr.update(visible=True)
try:
success = transcription_manager.delete_user_account(user.user_id)
if success:
print(f"πŸ—‘οΈ [{user.username}] Account deleted successfully")
return "βœ… Your account and all data have been permanently deleted", None, gr.update(visible=True), gr.update(visible=False)
else:
return "❌ Failed to delete account", user, gr.update(visible=False), gr.update(visible=True)
except Exception as e:
return f"❌ Error: {str(e)}", user, gr.update(visible=False), gr.update(visible=True)
def on_user_login(user):
"""Update UI components when user logs in"""
if user:
return gr.update(value=user.marketing_consent)
else:
return gr.update(value=False)
def check_system_status():
"""Check if the system is properly initialized"""
try:
# Test database connection
if transcription_manager and transcription_manager.db:
# Try a simple database operation
test_stats = transcription_manager.db.get_user_stats("test_user_id")
print("βœ… System initialization successful")
return "πŸ‘€ Please log in to view your statistics..."
else:
print("❌ System initialization failed - transcription manager not available")
return "❌ System initialization failed - please check configuration"
except Exception as e:
print(f"❌ System initialization error: {str(e)}")
return f"❌ System error: {str(e)}"
def create_transcript_file(transcript_text, job_id):
"""Create a downloadable transcript file"""
os.makedirs("temp", exist_ok=True)
filename = f"temp/transcript_{job_id}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(transcript_text)
return filename
# Enhanced CSS with authentication styling
enhanced_css = """
/* Main container styling */
.gradio-container {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
font-family: 'Segoe UI', system-ui, sans-serif;
color: #212529;
}
/* Card styling */
.gr-box {
background: white;
border: 1px solid #dee2e6;
border-radius: 12px;
box-shadow: 0 2px 8px rgba(0,0,0,0.08);
padding: 20px;
margin: 10px 0;
}
/* Button styling */
.gr-button {
background: linear-gradient(135deg, #007bff, #0056b3);
border: none;
border-radius: 8px;
color: white;
font-weight: 500;
padding: 12px 24px;
transition: all 0.2s ease;
box-shadow: 0 2px 4px rgba(0,123,255,0.2);
}
.gr-button:hover {
background: linear-gradient(135deg, #0056b3, #004085);
transform: translateY(-1px);
box-shadow: 0 4px 8px rgba(0,123,255,0.3);
}
.gr-button[variant="secondary"] {
background: linear-gradient(135deg, #6c757d, #495057);
}
.gr-button[variant="secondary"]:hover {
background: linear-gradient(135deg, #495057, #343a40);
}
/* Login/Register button styling */
.auth-button {
background: linear-gradient(135deg, #28a745, #1e7e34);
min-width: 120px;
}
.auth-button:hover {
background: linear-gradient(135deg, #1e7e34, #155724);
}
.danger-button {
background: linear-gradient(135deg, #dc3545, #c82333);
}
.danger-button:hover {
background: linear-gradient(135deg, #c82333, #a71e2a);
}
/* Input styling */
.gr-textbox, .gr-dropdown, .gr-file {
border: 2px solid #e9ecef;
border-radius: 8px;
background: white;
color: #212529;
transition: border-color 0.2s ease;
}
.gr-textbox:focus, .gr-dropdown:focus {
border-color: #007bff;
box-shadow: 0 0 0 3px rgba(0,123,255,0.1);
}
/* Status styling */
.status-display {
background: linear-gradient(135deg, #e3f2fd, #bbdefb);
border-left: 4px solid #2196f3;
padding: 15px;
border-radius: 0 8px 8px 0;
margin: 10px 0;
}
.success-status {
background: linear-gradient(135deg, #e8f5e8, #c8e6c9);
border-left-color: #4caf50;
}
.error-status {
background: linear-gradient(135deg, #ffebee, #ffcdd2);
border-left-color: #f44336;
}
/* Auto-refresh indicator styling */
.auto-refresh-indicator {
background: linear-gradient(135deg, #fff3cd, #ffeaa7);
border: 1px solid #ffeaa7;
border-radius: 6px;
padding: 8px 12px;
font-size: 12px;
color: #856404;
text-align: center;
animation: pulse 2s infinite;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.7; }
100% { opacity: 1; }
}
/* User stats styling */
.user-stats {
background: linear-gradient(135deg, #e8f5e8, #c8e6c9);
border: 1px solid #c8e6c9;
border-radius: 6px;
padding: 8px 12px;
font-size: 12px;
color: #2e7d32;
text-align: center;
font-weight: 500;
}
/* Authentication form styling */
.auth-form {
background: white;
border: 2px solid #007bff;
border-radius: 12px;
padding: 25px;
box-shadow: 0 4px 12px rgba(0,123,255,0.15);
}
/* Privacy notice styling */
.privacy-notice {
background: linear-gradient(135deg, #e3f2fd, #bbdefb);
border: 1px solid #2196f3;
border-radius: 8px;
padding: 15px;
font-size: 14px;
color: #1976d2;
margin: 10px 0;
}
/* PDPA section styling */
.pdpa-section {
background: linear-gradient(135deg, #fff3cd, #ffeaa7);
border: 1px solid #ffc107;
border-radius: 8px;
padding: 15px;
margin: 10px 0;
}
/* History table styling */
.history-table {
background: white;
border: 1px solid #dee2e6;
border-radius: 8px;
font-size: 14px;
}
.history-table thead th {
background: linear-gradient(135deg, #f8f9fa, #e9ecef);
color: #495057;
font-weight: 600;
padding: 12px;
border-bottom: 2px solid #dee2e6;
}
.history-table tbody tr {
cursor: pointer;
transition: background-color 0.2s ease;
}
.history-table tbody tr:hover {
background: linear-gradient(135deg, #f8f9fa, #e9ecef);
}
.history-table tbody tr:nth-child(even) {
background: #f8f9fa;
}
.history-table tbody tr:nth-child(even):hover {
background: linear-gradient(135deg, #e9ecef, #dee2e6);
}
.history-table tbody td {
padding: 10px;
border-bottom: 1px solid #dee2e6;
vertical-align: middle;
}
/* Tab styling */
.tab-nav {
background: white;
border-bottom: 2px solid #dee2e6;
border-radius: 8px 8px 0 0;
}
/* Header styling */
.main-header {
background: white;
border: 1px solid #dee2e6;
border-radius: 12px;
padding: 25px;
text-align: center;
margin-bottom: 20px;
box-shadow: 0 2px 8px rgba(0,0,0,0.05);
}
.main-header h1 {
color: #007bff;
margin-bottom: 10px;
font-size: 2.2em;
font-weight: 600;
}
.main-header p {
color: #6c757d;
font-size: 1.1em;
margin: 0;
}
"""
# Create the main interface
with gr.Blocks(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="gray",
neutral_hue="gray",
font=["system-ui", "sans-serif"]
),
css=enhanced_css,
title="πŸŽ™οΈ Azure Transcipt Service - Secure & PDPA Compliant"
) as demo:
# Global state
current_user = gr.State(None)
job_state = gr.State({})
# Header
with gr.Row():
gr.HTML("""
<div class="main-header">
<h1>πŸŽ™οΈ Azure Transcipt Service</h1>
<p>Secure, PDPA-compliant transcription service with user authentication and privacy protection</p>
</div>
""")
# User stats display
user_stats_display = gr.Textbox(
label="",
lines=1,
interactive=False,
show_label=False,
placeholder="πŸ‘€ Please log in to view your statistics...",
elem_classes=["user-stats"]
)
# Authentication Section
with gr.Column(visible=True, elem_classes=["auth-form"]) as auth_section:
gr.Markdown("## πŸ” Authentication Required")
gr.Markdown("Please log in or create an account to use the transcription service.")
with gr.Tabs() as auth_tabs:
# Login Tab
with gr.Tab("πŸ”‘ Login") as login_tab:
with gr.Column():
login_email = gr.Textbox(
label="Email or Username",
placeholder="Enter your email or username"
)
login_password = gr.Textbox(
label="Password",
type="password",
placeholder="Enter your password"
)
with gr.Row():
login_btn = gr.Button("πŸ”‘ Login", variant="primary", elem_classes=["auth-button"])
login_status = gr.Textbox(
label="",
show_label=False,
interactive=False,
placeholder="Enter your credentials and click Login"
)
# Register Tab
with gr.Tab("πŸ“ Register") as register_tab:
with gr.Column():
reg_email = gr.Textbox(
label="Email",
placeholder="Enter your email address"
)
reg_username = gr.Textbox(
label="Username",
placeholder="Choose a username (3-30 characters, alphanumeric and underscore)"
)
reg_password = gr.Textbox(
label="Password",
type="password",
placeholder="Create a strong password (min 8 chars, mixed case, numbers)"
)
reg_confirm_password = gr.Textbox(
label="Confirm Password",
type="password",
placeholder="Confirm your password"
)
gr.Markdown("### πŸ“‹ Privacy & Data Consent")
with gr.Column(elem_classes=["privacy-notice"]):
gr.Markdown("""
**Privacy Notice**: By creating an account, you acknowledge that:
- Your data will be stored securely in user-separated Azure Blob Storage
- Transcriptions are processed using Azure Speech Services
- You can export or delete your data at any time
- We comply with GDPR and data protection regulations
""")
gdpr_consent = gr.Checkbox(
label="I consent to the processing of my personal data as described in the Privacy Notice (Required)",
value=False
)
data_retention_consent = gr.Checkbox(
label="I agree to data retention for transcription service functionality (Required)",
value=False
)
marketing_consent = gr.Checkbox(
label="I consent to receiving marketing communications (Optional)",
value=False
)
with gr.Row():
register_btn = gr.Button("πŸ“ Create Account", variant="primary", elem_classes=["auth-button"])
register_status = gr.Textbox(
label="",
show_label=False,
interactive=False,
placeholder="Fill out the form and agree to the required consents to create your account"
)
login_after_register = gr.Button(
"πŸ”‘ Go to Login",
visible=False,
variant="secondary"
)
# Main Application (visible only when logged in)
with gr.Column(visible=False) as main_app:
# Logout button
with gr.Row():
with gr.Column(scale=3):
pass
with gr.Column(scale=1):
logout_btn = gr.Button("πŸ‘‹ Logout", variant="secondary")
# Main transcription interface
with gr.Tab("πŸŽ™οΈ Transcribe"):
with gr.Row():
# Left column - Input settings
with gr.Column(scale=1):
gr.Markdown("### πŸ“ Upload File")
file_upload = gr.File(
label="Audio or Video File",
type="filepath",
file_types=[
".wav", ".mp3", ".ogg", ".opus", ".flac", ".wma", ".aac",
".m4a", ".amr", ".webm", ".speex",
".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".3gp"
]
)
with gr.Row():
language = gr.Dropdown(
choices=[(v, k) for k, v in ALLOWED_LANGS.items()],
label="Language",
value="en-US"
)
audio_format = gr.Dropdown(
choices=AUDIO_FORMATS,
value="wav",
label="Output Format"
)
gr.Markdown("### βš™οΈ Settings")
with gr.Row():
diarization_enabled = gr.Checkbox(
label="Speaker Identification",
value=True
)
speakers = gr.Slider(
minimum=1,
maximum=10,
step=1,
value=2,
label="Max Speakers"
)
with gr.Row():
timestamps = gr.Checkbox(
label="Timestamps",
value=True
)
profanity = gr.Dropdown(
choices=["masked", "removed", "raw"],
value="masked",
label="Profanity Filter"
)
with gr.Row():
punctuation = gr.Dropdown(
choices=["automatic", "dictated", "none"],
value="automatic",
label="Punctuation"
)
lexical = gr.Checkbox(
label="Lexical Form",
value=False
)
submit_btn = gr.Button(
"πŸš€ Start Transcription",
variant="primary",
size="lg"
)
# Right column - Results
with gr.Column(scale=1):
gr.Markdown("### πŸ“Š Status")
# Auto-refresh indicator
auto_refresh_status_display = gr.Textbox(
label="",
lines=1,
interactive=False,
show_label=False,
visible=False,
elem_classes=["auto-refresh-indicator"]
)
status_display = gr.Textbox(
label="",
lines=3,
interactive=False,
show_label=False,
placeholder="Upload a file and click 'Start Transcription' to begin...\nStatus will auto-refresh every 10 seconds during processing.\nYour data is stored in your private user folder for PDPA compliance."
)
job_info = gr.Textbox(
label="",
lines=1,
interactive=False,
show_label=False,
placeholder=""
)
with gr.Row():
refresh_btn = gr.Button(
"πŸ”„ Check Status",
variant="secondary"
)
stop_refresh_btn = gr.Button(
"⏹️ Stop Auto-Refresh",
variant="secondary"
)
gr.Markdown("### πŸ“„ Results")
transcript_output = gr.Textbox(
label="Transcript",
lines=12,
interactive=False,
placeholder="Your transcript with speaker identification and precise timestamps (HH:MM:SS) will appear here..."
)
download_file = gr.File(
label="Download",
interactive=False,
visible=False
)
# History tab
with gr.Tab("πŸ“š My History"):
gr.Markdown("### πŸ“‹ Your Transcription History & Downloads")
gr.Markdown("*View your personal transcription history and download completed transcripts (PDPA compliant - only your data)*")
with gr.Row():
refresh_history_btn = gr.Button(
"πŸ”„ Refresh My History & Downloads",
variant="primary"
)
show_all_user_checkbox = gr.Checkbox(
label="Show All My Transcriptions (not just recent 20)",
value=False
)
history_table = gr.Dataframe(
headers=["Date", "Filename", "Language", "Status", "Duration", "Job ID", "Download"],
datatype=["str", "str", "str", "str", "str", "str", "str"],
col_count=(7, "fixed"),
row_count=(15, "dynamic"),
wrap=True,
interactive=False,
elem_classes=["history-table"]
)
# Download Files Section
gr.Markdown("### πŸ“₯ Download Your Completed Transcripts")
gr.Markdown("*Your available transcript downloads will appear below after refreshing*")
# Container for dynamic download files
with gr.Column():
download_file_1 = gr.File(label="", visible=False, interactive=False)
download_file_2 = gr.File(label="", visible=False, interactive=False)
download_file_3 = gr.File(label="", visible=False, interactive=False)
download_file_4 = gr.File(label="", visible=False, interactive=False)
download_file_5 = gr.File(label="", visible=False, interactive=False)
download_file_6 = gr.File(label="", visible=False, interactive=False)
download_file_7 = gr.File(label="", visible=False, interactive=False)
download_file_8 = gr.File(label="", visible=False, interactive=False)
download_file_9 = gr.File(label="", visible=False, interactive=False)
download_file_10 = gr.File(label="", visible=False, interactive=False)
# PDPA Compliance Tab
with gr.Tab("πŸ”’ Privacy & Data"):
gr.Markdown("### πŸ”’ GDPR & Data Protection")
gr.Markdown("Manage your personal data and privacy settings in compliance with data protection regulations.")
with gr.Column(elem_classes=["pdpa-section"]):
gr.Markdown("#### πŸ“Š Data Export")
gr.Markdown("Download all your personal data including transcriptions, account info, and usage statistics.")
export_btn = gr.Button("πŸ“¦ Export My Data", variant="primary")
export_status = gr.Textbox(
label="",
show_label=False,
interactive=False,
placeholder="Click 'Export My Data' to download your complete data archive"
)
export_file = gr.File(
label="Your Data Export",
visible=False,
interactive=False
)
with gr.Column(elem_classes=["pdpa-section"]):
gr.Markdown("#### πŸ“§ Marketing Consent")
gr.Markdown("Update your preferences for receiving marketing communications.")
marketing_consent_checkbox = gr.Checkbox(
label="I consent to receiving marketing communications",
value=False
)
update_consent_btn = gr.Button("βœ… Update Consent", variant="secondary")
consent_status = gr.Textbox(
label="",
show_label=False,
interactive=False,
placeholder="Update your marketing consent preferences"
)
with gr.Column(elem_classes=["pdpa-section"]):
gr.Markdown("#### ⚠️ Account Deletion")
gr.Markdown("""
**Warning**: This action is irreversible and will permanently delete:
- Your user account and profile
- All transcription history and files
- All data stored in Azure Blob Storage
- Usage statistics and preferences
""")
deletion_confirmation = gr.Textbox(
label="Type 'DELETE MY ACCOUNT' to confirm",
placeholder="Type the exact phrase to confirm account deletion"
)
delete_account_btn = gr.Button(
"πŸ—‘οΈ Delete My Account",
variant="stop",
elem_classes=["danger-button"]
)
deletion_status = gr.Textbox(
label="",
show_label=False,
interactive=False,
placeholder="Account deletion requires confirmation text"
)
# Auto-refresh timer
timer = gr.Timer(10.0)
# Event handlers
# Authentication events
login_btn.click(
login_user,
inputs=[login_email, login_password],
outputs=[login_status, current_user, auth_section, main_app, user_stats_display]
).then(
on_user_login,
inputs=[current_user],
outputs=[marketing_consent_checkbox]
).then(
lambda user: ("", "") if user else (gr.update(), gr.update()), # Clear login fields on success
inputs=[current_user],
outputs=[login_email, login_password]
)
register_btn.click(
register_user,
inputs=[reg_email, reg_username, reg_password, reg_confirm_password,
gdpr_consent, data_retention_consent, marketing_consent],
outputs=[register_status, login_after_register]
).then(
lambda status: ("", "", "", "", False, False, False) if "βœ…" in status else (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()),
inputs=[register_status],
outputs=[reg_email, reg_username, reg_password, reg_confirm_password, gdpr_consent, data_retention_consent, marketing_consent]
)
login_after_register.click(
lambda: (gr.update(selected=0), ""), # Switch to login tab (index 0) and clear status
outputs=[auth_tabs, register_status]
)
logout_btn.click(
logout_user,
outputs=[current_user, login_status, auth_section, main_app, user_stats_display]
)
# Transcription events
submit_btn.click(
submit_transcription,
inputs=[
file_upload, language, audio_format, diarization_enabled,
speakers, profanity, punctuation, timestamps, lexical, current_user
],
outputs=[status_display, transcript_output, download_file, job_info, job_state, auto_refresh_status_display, user_stats_display]
)
refresh_btn.click(
lambda job_state, user: (
print("πŸ”„ User manually checked status") if user else None,
check_current_job_status(job_state, user)
)[1],
inputs=[job_state, current_user],
outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display]
)
stop_refresh_btn.click(
stop_auto_refresh,
inputs=[job_state, current_user],
outputs=[auto_refresh_status_display]
)
# Auto-refresh timer event
timer.tick(
auto_refresh_status,
inputs=[job_state, current_user],
outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display]
)
# History events
refresh_history_btn.click(
on_history_refresh_click,
inputs=[current_user, show_all_user_checkbox],
outputs=[
history_table,
download_file_1, download_file_2, download_file_3, download_file_4, download_file_5,
download_file_6, download_file_7, download_file_8, download_file_9, download_file_10,
user_stats_display
]
)
show_all_user_checkbox.change(
lambda user, show_all: (
print(f"πŸ‘οΈ User toggled show all personal transcriptions: {show_all}") if user else None,
refresh_history_and_downloads(user, show_all)
)[1],
inputs=[current_user, show_all_user_checkbox],
outputs=[
history_table,
download_file_1, download_file_2, download_file_3, download_file_4, download_file_5,
download_file_6, download_file_7, download_file_8, download_file_9, download_file_10,
user_stats_display
]
)
# PDPA compliance events
export_btn.click(
export_user_data,
inputs=[current_user],
outputs=[export_status, export_file]
)
update_consent_btn.click(
update_marketing_consent,
inputs=[current_user, marketing_consent_checkbox],
outputs=[consent_status]
)
delete_account_btn.click(
delete_user_account,
inputs=[current_user, deletion_confirmation],
outputs=[deletion_status, current_user, auth_section, main_app]
)
# Auto-hide/show speakers slider
diarization_enabled.change(
lambda enabled: gr.update(visible=enabled),
inputs=[diarization_enabled],
outputs=[speakers]
)
# Load user stats on app start and verify system is ready
demo.load(
lambda: (
print("πŸš€ PDPA-Compliant Azure Transcipt Service Service Started..."),
check_system_status()
)[1],
outputs=[user_stats_display]
)
# Info section
with demo:
gr.HTML("""
<div style="background: white; border: 1px solid #dee2e6; border-radius: 12px; padding: 20px; margin-top: 20px; color: #212529;">
<h3 style="color: #007bff; margin-top: 0;">πŸ“‹ How to Use</h3>
<ol style="line-height: 1.6;">
<li><strong>Register/Login:</strong> Create an account or log in with existing credentials</li>
<li><strong>Upload:</strong> Select your audio or video file</li>
<li><strong>Configure:</strong> Choose language and enable speaker identification</li>
<li><strong>Start:</strong> Click "Start Transcription" - status will auto-update every 10 seconds</li>
<li><strong>Download:</strong> Get your transcript with speaker identification and timestamps</li>
<li><strong>Manage:</strong> Use Privacy & Data tab to export or delete your data</li>
</ol>
<h3 style="color: #007bff;">🎡 Supported Formats</h3>
<p><strong>Audio:</strong> WAV, MP3, OGG, OPUS, FLAC, WMA, AAC, M4A, AMR, WebM, Speex</p>
<p><strong>Video:</strong> MP4, MOV, AVI, MKV, WMV, FLV, 3GP</p>
<h3 style="color: #007bff;">πŸ”’ Security & Privacy Features</h3>
<ul style="line-height: 1.6;">
<li><strong>βœ… User Authentication:</strong> Secure registration and login system</li>
<li><strong>βœ… Password Security:</strong> Strong password requirements and secure hashing</li>
<li><strong>βœ… User-Separated Storage:</strong> Each user has isolated folders in Azure Blob Storage</li>
<li><strong>βœ… GDPR Compliance:</strong> Full data export and account deletion capabilities</li>
<li><strong>βœ… Consent Management:</strong> Granular consent controls for data processing</li>
<li><strong>βœ… Privacy by Design:</strong> Users can only access their own data</li>
<li><strong>βœ… Audit Trail:</strong> Comprehensive logging for compliance and security</li>
<li><strong>βœ… Data Retention:</strong> Clear data retention policies and user control</li>
</ul>
<h3 style="color: #007bff;">🎯 Enhanced Features</h3>
<ul style="line-height: 1.6;">
<li><strong>Enhanced Timestamps:</strong> Precise timing for each speaker segment (HH:MM:SS format)</li>
<li><strong>Better Speaker Diarization:</strong> Improved speaker identification with timestamps</li>
<li><strong>Personal Statistics:</strong> Real-time usage tracking and analytics</li>
<li><strong>Complete History:</strong> View all your transcriptions or recent ones</li>
<li><strong>Direct Downloads:</strong> Easy access to completed transcriptions</li>
<li><strong>Data Export:</strong> Download all your data in JSON format</li>
<li><strong>Account Management:</strong> Full control over your account and data</li>
</ul>
<h3 style="color: #007bff;">πŸ’‘ Tips</h3>
<ul style="line-height: 1.6;">
<li>Use a strong password with mixed case letters, numbers, and symbols</li>
<li>WAV files process fastest for transcription</li>
<li>Enable speaker identification for meetings and interviews</li>
<li>Auto-refresh continues until transcript is fully retrieved</li>
<li>Visit Privacy & Data tab to manage your data and consent preferences</li>
<li>You can export all your data or delete your account at any time</li>
<li>All your files are stored in your private, secure user folder</li>
<li>Your data is protected according to GDPR and privacy regulations</li>
</ul>
</div>
""")
if __name__ == "__main__":
print("πŸš€ Starting Secure PDPA-Compliant Azure Transcipt Service Service...")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)