A newer version of the Gradio SDK is available:
6.0.1
π οΈ Azure Speech Transcription - Developer Guide
π Table of Contents
- System Architecture
- Development Environment
- Deployment Guide
- API Documentation
- Database Schema
- Security Implementation
- Monitoring & Maintenance
- Contributing Guidelines
- Advanced Configuration
- Troubleshooting
ποΈ System Architecture
Overview
The Azure Speech Transcription service is built with a modern, secure architecture focusing on user privacy, PDPA compliance, and scalability.
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Frontend UI β β Backend API β β Azure Services β
β (Gradio) βββββΊβ (Python) βββββΊβ Speech & Blob β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
βΌ βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β User Session β β SQLite Database β β User Storage β
β Management β β (Metadata) β β (Isolated) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
Core Components
1. Frontend Layer (gradio_app.py)
- Technology: Gradio with custom CSS
- Purpose: User interface and session management
- Features: Authentication, file upload, real-time status, history management
2. Backend Layer (app_core.py)
- Technology: Python with threading and async processing
- Purpose: Business logic, authentication, and Azure integration
- Features: User management, transcription processing, PDPA compliance
3. Data Layer
- Database: SQLite with Azure Blob backup
- Storage: Azure Blob Storage with user separation
- Security: User-isolated folders and encrypted connections
4. External Services
- Azure Speech Services: Transcription processing
- Azure Blob Storage: File and database storage
- FFmpeg: Audio/video conversion
Data Flow
1. User uploads file β 2. Authentication check β 3. File validation
β β β
8. Download results β 7. Store transcript β 6. Process with Azure
β β β
9. Update UI status β 4. Save to user folder β 5. Background processing
π» Development Environment
Prerequisites
- Python: 3.8 or higher
- Azure Account: With Speech Services and Blob Storage
- FFmpeg: For audio/video processing
- Git: For version control
Environment Setup
1. Clone Repository
git clone <repository-url>
cd azure-speech-transcription
2. Virtual Environment
# Create virtual environment
python -m venv venv
# Activate (Windows)
venv\Scripts\activate
# Activate (macOS/Linux)
source venv/bin/activate
3. Install Dependencies
pip install -r requirements.txt
4. Environment Configuration
# Copy environment template
cp .env.example .env
# Edit with your Azure credentials
nano .env
5. Install FFmpeg
Windows (Chocolatey):
choco install ffmpeg
macOS (Homebrew):
brew install ffmpeg
Ubuntu/Debian:
sudo apt update
sudo apt install ffmpeg
6. Verify Installation
python -c "
import gradio as gr
from azure.storage.blob import BlobServiceClient
import subprocess
print('Gradio:', gr.__version__)
print('FFmpeg:', subprocess.run(['ffmpeg', '-version'], capture_output=True).returncode == 0)
print('Azure Blob:', 'OK')
"
Development Server
# Start development server
python gradio_app.py
# Server will be available at:
# http://localhost:7860
Development Tools
Recommended IDE Setup
- VS Code: With Python, Azure, and Git extensions
- PyCharm: Professional edition with Azure toolkit
- Vim/Emacs: With appropriate Python plugins
Useful Extensions
{
"recommendations": [
"ms-python.python",
"ms-vscode.azure-cli",
"ms-azuretools.azure-cli-tools",
"ms-python.black-formatter",
"ms-python.flake8"
]
}
Code Quality Tools
# Install development tools
pip install black flake8 pytest mypy
# Format code
black .
# Lint code
flake8 .
# Type checking
mypy app_core.py gradio_app.py
π Deployment Guide
Production Deployment Options
Option 1: Traditional Server Deployment
1. Server Preparation
# Update system
sudo apt update && sudo apt upgrade -y
# Install Python and dependencies
sudo apt install python3 python3-pip python3-venv nginx ffmpeg -y
# Create application user
sudo useradd -m -s /bin/bash transcription
sudo su - transcription
2. Application Setup
# Clone repository
git clone <repository-url> /home/transcription/app
cd /home/transcription/app
# Setup virtual environment
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Configure environment
cp .env.example .env
# Edit .env with production values
3. Systemd Service
# /etc/systemd/system/transcription.service
[Unit]
Description=Azure Speech Transcription Service
After=network.target
[Service]
Type=simple
User=transcription
Group=transcription
WorkingDirectory=/home/transcription/app
Environment=PATH=/home/transcription/app/venv/bin
ExecStart=/home/transcription/app/venv/bin/python gradio_app.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
4. Nginx Configuration
# /etc/nginx/sites-available/transcription
server {
listen 80;
server_name your-domain.com;
client_max_body_size 500M;
location / {
proxy_pass http://127.0.0.1:7860;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 300s;
proxy_connect_timeout 75s;
}
}
5. SSL Certificate
# Install Certbot
sudo apt install certbot python3-certbot-nginx -y
# Get SSL certificate
sudo certbot --nginx -d your-domain.com
# Verify auto-renewal
sudo certbot renew --dry-run
6. Start Services
# Enable and start application
sudo systemctl enable transcription
sudo systemctl start transcription
# Enable and restart nginx
sudo systemctl enable nginx
sudo systemctl restart nginx
# Check status
sudo systemctl status transcription
sudo systemctl status nginx
Option 2: Docker Deployment
1. Dockerfile
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create necessary directories
RUN mkdir -p uploads database temp
# Expose port
EXPOSE 7860
# Run application
CMD ["python", "gradio_app.py"]
2. Docker Compose
# docker-compose.yml
version: '3.8'
services:
transcription:
build: .
ports:
- "7860:7860"
environment:
- AZURE_SPEECH_KEY=${AZURE_SPEECH_KEY}
- AZURE_SPEECH_KEY_ENDPOINT=${AZURE_SPEECH_KEY_ENDPOINT}
- AZURE_REGION=${AZURE_REGION}
- AZURE_BLOB_CONNECTION=${AZURE_BLOB_CONNECTION}
- AZURE_CONTAINER=${AZURE_CONTAINER}
- AZURE_BLOB_SAS_TOKEN=${AZURE_BLOB_SAS_TOKEN}
- ALLOWED_LANGS=${ALLOWED_LANGS}
volumes:
- ./uploads:/app/uploads
- ./database:/app/database
- ./temp:/app/temp
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/ssl/certs
depends_on:
- transcription
restart: unless-stopped
3. Deploy with Docker
# Build and start
docker-compose up -d
# View logs
docker-compose logs -f transcription
# Update application
git pull
docker-compose build transcription
docker-compose up -d transcription
Option 3: Cloud Deployment (Azure Container Instances)
1. Create Container Registry
# Create ACR
az acr create --resource-group myResourceGroup \
--name myregistry --sku Basic
# Login to ACR
az acr login --name myregistry
# Build and push image
docker build -t myregistry.azurecr.io/transcription:latest .
docker push myregistry.azurecr.io/transcription:latest
2. Deploy Container Instance
# Create container instance
az container create \
--resource-group myResourceGroup \
--name transcription-app \
--image myregistry.azurecr.io/transcription:latest \
--cpu 2 --memory 4 \
--port 7860 \
--environment-variables \
AZURE_SPEECH_KEY=$AZURE_SPEECH_KEY \
AZURE_SPEECH_KEY_ENDPOINT=$AZURE_SPEECH_KEY_ENDPOINT \
AZURE_REGION=$AZURE_REGION \
AZURE_BLOB_CONNECTION="$AZURE_BLOB_CONNECTION" \
AZURE_CONTAINER=$AZURE_CONTAINER \
AZURE_BLOB_SAS_TOKEN="$AZURE_BLOB_SAS_TOKEN"
π‘ API Documentation
Core Classes and Methods
TranscriptionManager Class
Purpose: Main service class handling all transcription operations
class TranscriptionManager:
def __init__(self)
# User Authentication
def register_user(email: str, username: str, password: str,
gdpr_consent: bool, data_retention_agreed: bool,
marketing_consent: bool) -> Tuple[bool, str, Optional[str]]
def login_user(login: str, password: str) -> Tuple[bool, str, Optional[User]]
# Transcription Operations
def submit_transcription(file_bytes: bytes, original_filename: str,
user_id: str, language: str,
settings: Dict) -> str
def get_job_status(job_id: str) -> Optional[TranscriptionJob]
# Data Management
def get_user_history(user_id: str, limit: int) -> List[TranscriptionJob]
def get_user_stats(user_id: str) -> Dict
def export_user_data(user_id: str) -> Dict
def delete_user_account(user_id: str) -> bool
DatabaseManager Class
Purpose: Handle database operations and Azure blob synchronization
class DatabaseManager:
def __init__(db_path: str = None)
# User Operations
def create_user(...) -> Tuple[bool, str, Optional[str]]
def authenticate_user(login: str, password: str) -> Tuple[bool, str, Optional[User]]
def get_user_by_id(user_id: str) -> Optional[User]
# Job Operations
def save_job(job: TranscriptionJob)
def get_job(job_id: str) -> Optional[TranscriptionJob]
def get_user_jobs(user_id: str, limit: int) -> List[TranscriptionJob]
def get_pending_jobs() -> List[TranscriptionJob]
AuthManager Class
Purpose: Authentication utilities and validation
class AuthManager:
@staticmethod
def hash_password(password: str) -> str
def verify_password(password: str, password_hash: str) -> bool
def validate_email(email: str) -> bool
def validate_username(username: str) -> bool
def validate_password(password: str) -> Tuple[bool, str]
Data Models
User Model
@dataclass
class User:
user_id: str
email: str
username: str
password_hash: str
created_at: str
last_login: Optional[str] = None
is_active: bool = True
gdpr_consent: bool = False
data_retention_agreed: bool = False
marketing_consent: bool = False
TranscriptionJob Model
@dataclass
class TranscriptionJob:
job_id: str
user_id: str
original_filename: str
audio_url: str
language: str
status: str # pending, processing, completed, failed
created_at: str
completed_at: Optional[str] = None
transcript_text: Optional[str] = None
transcript_url: Optional[str] = None
error_message: Optional[str] = None
azure_trans_id: Optional[str] = None
settings: Optional[Dict] = None
Configuration Parameters
Environment Variables
# Required
AZURE_SPEECH_KEY: str
AZURE_SPEECH_KEY_ENDPOINT: str
AZURE_REGION: str
AZURE_BLOB_CONNECTION: str
AZURE_CONTAINER: str
AZURE_BLOB_SAS_TOKEN: str
# Optional
ALLOWED_LANGS: str # JSON string
API_VERSION: str = "v3.2"
PASSWORD_SALT: str = "default_salt"
MAX_FILE_SIZE_MB: int = 500
Transcription Settings
settings = {
'audio_format': str, # wav, mp3, etc.
'diarization_enabled': bool, # Speaker identification
'speakers': int, # Max speakers (1-10)
'profanity': str, # masked, removed, raw
'punctuation': str, # automatic, dictated, none
'timestamps': bool, # Include timestamps
'lexical': bool, # Include lexical forms
'language_id_enabled': bool, # Auto language detection
'candidate_locales': List[str] # Language candidates
}
ποΈ Database Schema
SQLite Database Structure
Users Table
CREATE TABLE users (
user_id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL,
last_login TEXT,
is_active BOOLEAN DEFAULT 1,
gdpr_consent BOOLEAN DEFAULT 0,
data_retention_agreed BOOLEAN DEFAULT 0,
marketing_consent BOOLEAN DEFAULT 0
);
-- Indexes
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
Transcriptions Table
CREATE TABLE transcriptions (
job_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
original_filename TEXT NOT NULL,
audio_url TEXT,
language TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
completed_at TEXT,
transcript_text TEXT,
transcript_url TEXT,
error_message TEXT,
azure_trans_id TEXT,
settings TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id)
);
-- Indexes
CREATE INDEX idx_transcriptions_user_id ON transcriptions(user_id);
CREATE INDEX idx_transcriptions_status ON transcriptions(status);
CREATE INDEX idx_transcriptions_created_at ON transcriptions(created_at DESC);
CREATE INDEX idx_transcriptions_user_created ON transcriptions(user_id, created_at DESC);
Azure Blob Storage Structure
Container: {AZURE_CONTAINER}/
βββ shared/
β βββ database/
β βββ transcriptions.db # Shared database backup
βββ users/
β βββ {user-id-1}/
β β βββ audio/ # Processed audio files
β β β βββ {job-id-1}.wav
β β β βββ {job-id-2}.wav
β β βββ transcripts/ # Transcript files
β β β βββ {job-id-1}.txt
β β β βββ {job-id-2}.txt
β β βββ originals/ # Original uploaded files
β β βββ {job-id-1}_{filename}.mp4
β β βββ {job-id-2}_{filename}.wav
β βββ {user-id-2}/
β βββ audio/
β βββ transcripts/
β βββ originals/
Database Operations
User Management Queries
-- Create user
INSERT INTO users (user_id, email, username, password_hash, created_at,
gdpr_consent, data_retention_agreed, marketing_consent)
VALUES (?, ?, ?, ?, ?, ?, ?, ?);
-- Authenticate user
SELECT * FROM users
WHERE (email = ? OR username = ?) AND is_active = 1;
-- Update last login
UPDATE users SET last_login = ? WHERE user_id = ?;
-- Get user stats
SELECT status, COUNT(*) FROM transcriptions
WHERE user_id = ? GROUP BY status;
Job Management Queries
-- Create job
INSERT INTO transcriptions (job_id, user_id, original_filename, language,
status, created_at, settings)
VALUES (?, ?, ?, ?, 'pending', ?, ?);
-- Update job status
UPDATE transcriptions
SET status = ?, completed_at = ?, transcript_text = ?, transcript_url = ?
WHERE job_id = ?;
-- Get user jobs
SELECT * FROM transcriptions
WHERE user_id = ?
ORDER BY created_at DESC LIMIT ?;
-- Get pending jobs for background processor
SELECT * FROM transcriptions
WHERE status IN ('pending', 'processing');
π Security Implementation
Authentication Security
Password Security
# Password hashing with salt
def hash_password(password: str) -> str:
salt = os.environ.get("PASSWORD_SALT", "default_salt")
return hashlib.sha256((password + salt).encode()).hexdigest()
# Password validation
def validate_password(password: str) -> Tuple[bool, str]:
if len(password) < 8:
return False, "Password must be at least 8 characters"
if not re.search(r'[A-Z]', password):
return False, "Password must contain uppercase letter"
if not re.search(r'[a-z]', password):
return False, "Password must contain lowercase letter"
if not re.search(r'\d', password):
return False, "Password must contain number"
return True, "Valid"
Session Management
# User session state
session_state = {
'user_id': str,
'username': str,
'logged_in_at': datetime,
'last_activity': datetime
}
# Session validation
def validate_session(session_state: dict) -> bool:
if not session_state or 'user_id' not in session_state:
return False
# Check session timeout (if implemented)
last_activity = session_state.get('last_activity')
if last_activity:
timeout = timedelta(hours=24) # 24-hour sessions
if datetime.now() - last_activity > timeout:
return False
return True
Data Security
Access Control
# User data access verification
def verify_user_access(job_id: str, user_id: str) -> bool:
job = get_job(job_id)
return job and job.user_id == user_id
# File path security
def get_user_blob_path(user_id: str, blob_type: str, filename: str) -> str:
# Ensure user can only access their own folder
safe_filename = os.path.basename(filename) # Prevent path traversal
return f"users/{user_id}/{blob_type}/{safe_filename}"
Data Encryption
# Azure Blob Storage encryption (configured at Azure level)
# - Encryption at rest: Enabled by default
# - Encryption in transit: HTTPS enforced
# - Customer-managed keys: Optional enhancement
# Database encryption (for sensitive fields)
from cryptography.fernet import Fernet
def encrypt_sensitive_data(data: str, key: bytes) -> str:
f = Fernet(key)
return f.encrypt(data.encode()).decode()
def decrypt_sensitive_data(encrypted_data: str, key: bytes) -> str:
f = Fernet(key)
return f.decrypt(encrypted_data.encode()).decode()
Azure Security
Blob Storage Security
# SAS token configuration for least privilege
sas_permissions = BlobSasPermissions(
read=True,
write=True,
delete=True,
list=True
)
# IP restrictions (optional)
sas_ip_range = "192.168.1.0/24" # Restrict to specific IP range
# Time-limited tokens
sas_expiry = datetime.utcnow() + timedelta(hours=1)
Speech Service Security
# Secure API calls
headers = {
"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY,
"Content-Type": "application/json"
}
# Request timeout and retry logic
response = requests.post(
url,
headers=headers,
json=body,
timeout=30,
verify=True # Verify SSL certificates
)
Input Validation
File Upload Security
def validate_uploaded_file(file_path: str, max_size: int = 500 * 1024 * 1024) -> Tuple[bool, str]:
try:
# Check file exists
if not os.path.exists(file_path):
return False, "File not found"
# Check file size
file_size = os.path.getsize(file_path)
if file_size > max_size:
return False, f"File too large: {file_size / 1024 / 1024:.1f}MB"
# Check file type by content (not just extension)
import magic
mime_type = magic.from_file(file_path, mime=True)
allowed_types = ['audio/', 'video/']
if not any(mime_type.startswith(t) for t in allowed_types):
return False, f"Invalid file type: {mime_type}"
return True, "Valid"
except Exception as e:
return False, f"Validation error: {str(e)}"
SQL Injection Prevention
# Use parameterized queries (already implemented)
cursor.execute(
"SELECT * FROM users WHERE email = ? AND password_hash = ?",
(email, password_hash)
)
# Input sanitization
def sanitize_input(user_input: str) -> str:
# Remove dangerous characters
import html
sanitized = html.escape(user_input)
# Limit length
return sanitized[:1000]
π Monitoring & Maintenance
Application Monitoring
Health Checks
def health_check() -> Dict[str, Any]:
"""System health check endpoint"""
try:
# Database check
db_status = check_database_connection()
# Azure services check
blob_status = check_blob_storage()
speech_status = check_speech_service()
# FFmpeg check
ffmpeg_status = check_ffmpeg_installation()
# Disk space check
disk_status = check_disk_space()
return {
'status': 'healthy' if all([db_status, blob_status, speech_status, ffmpeg_status]) else 'unhealthy',
'timestamp': datetime.now().isoformat(),
'services': {
'database': db_status,
'blob_storage': blob_status,
'speech_service': speech_status,
'ffmpeg': ffmpeg_status,
'disk_space': disk_status
}
}
except Exception as e:
return {
'status': 'error',
'timestamp': datetime.now().isoformat(),
'error': str(e)
}
def check_database_connection() -> bool:
try:
with transcription_manager.db.get_connection() as conn:
conn.execute("SELECT 1").fetchone()
return True
except:
return False
def check_blob_storage() -> bool:
try:
client = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
client.list_containers(max_results=1)
return True
except:
return False
Logging Configuration
import logging
from logging.handlers import RotatingFileHandler
def setup_logging():
"""Configure application logging"""
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
# File handler with rotation
file_handler = RotatingFileHandler(
'logs/transcription.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
# Configure root logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# Separate logger for sensitive operations
auth_logger = logging.getLogger('auth')
auth_handler = RotatingFileHandler(
'logs/auth.log',
maxBytes=5*1024*1024, # 5MB
backupCount=10
)
auth_handler.setFormatter(formatter)
auth_logger.addHandler(auth_handler)
auth_logger.setLevel(logging.INFO)
Performance Monitoring
import time
from functools import wraps
def monitor_performance(func):
"""Decorator to monitor function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logging.info(f"{func.__name__} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logging.error(f"{func.__name__} failed after {duration:.2f}s: {str(e)}")
raise
return wrapper
# Usage
@monitor_performance
def submit_transcription(self, file_bytes, filename, user_id, language, settings):
# Implementation here
pass
Database Maintenance
Backup Strategy
def backup_database():
"""Backup database to Azure Blob Storage"""
try:
# Create timestamped backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"shared/backups/transcriptions_backup_{timestamp}.db"
# Upload current database
blob_client = blob_service.get_blob_client(
container=AZURE_CONTAINER,
blob=backup_name
)
with open(db_path, "rb") as data:
blob_client.upload_blob(data)
logging.info(f"Database backup created: {backup_name}")
# Clean old backups (keep last 30 days)
cleanup_old_backups()
except Exception as e:
logging.error(f"Database backup failed: {str(e)}")
def cleanup_old_backups():
"""Remove backups older than 30 days"""
try:
cutoff_date = datetime.now() - timedelta(days=30)
container_client = blob_service.get_container_client(AZURE_CONTAINER)
for blob in container_client.list_blobs(name_starts_with="shared/backups/"):
if blob.last_modified < cutoff_date:
blob_service.delete_blob(AZURE_CONTAINER, blob.name)
logging.info(f"Deleted old backup: {blob.name}")
except Exception as e:
logging.error(f"Backup cleanup failed: {str(e)}")
Database Optimization
def optimize_database():
"""Optimize database performance"""
try:
with transcription_manager.db.get_connection() as conn:
# Analyze tables
conn.execute("ANALYZE")
# Vacuum database (compact)
conn.execute("VACUUM")
# Update statistics
conn.execute("PRAGMA optimize")
logging.info("Database optimization completed")
except Exception as e:
logging.error(f"Database optimization failed: {str(e)}")
# Schedule optimization (run weekly)
import schedule
schedule.every().week.do(optimize_database)
schedule.every().day.at("02:00").do(backup_database)
Resource Management
Cleanup Tasks
def cleanup_temporary_files():
"""Clean up temporary files older than 24 hours"""
try:
cutoff_time = time.time() - (24 * 60 * 60) # 24 hours ago
temp_dirs = ['uploads', 'temp']
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
for filename in os.listdir(temp_dir):
filepath = os.path.join(temp_dir, filename)
if os.path.isfile(filepath) and os.path.getmtime(filepath) < cutoff_time:
os.remove(filepath)
logging.info(f"Cleaned up temporary file: {filepath}")
except Exception as e:
logging.error(f"Temporary file cleanup failed: {str(e)}")
def monitor_disk_space():
"""Monitor and alert on disk space"""
try:
import shutil
total, used, free = shutil.disk_usage("/")
# Convert to GB
free_gb = free // (1024**3)
total_gb = total // (1024**3)
usage_percent = (used / total) * 100
if usage_percent > 85:
logging.warning(f"High disk usage: {usage_percent:.1f}% ({free_gb}GB free)")
if free_gb < 5:
logging.critical(f"Low disk space: {free_gb}GB remaining")
except Exception as e:
logging.error(f"Disk space monitoring failed: {str(e)}")
Monitoring Alerts
Email Alerts (Optional)
import smtplib
from email.mime.text import MIMEText
def send_alert(subject: str, message: str):
"""Send email alert for critical issues"""
try:
smtp_server = os.environ.get("SMTP_SERVER")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ.get("SMTP_USER")
smtp_pass = os.environ.get("SMTP_PASS")
alert_email = os.environ.get("ALERT_EMAIL")
if not all([smtp_server, smtp_user, smtp_pass, alert_email]):
return # Email not configured
msg = MIMEText(message)
msg['Subject'] = f"[Transcription Service] {subject}"
msg['From'] = smtp_user
msg['To'] = alert_email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
except Exception as e:
logging.error(f"Failed to send alert: {str(e)}")
π€ Contributing Guidelines
Development Workflow
1. Setup Development Environment
# Fork repository
git clone https://github.com/your-username/azure-speech-transcription.git
cd azure-speech-transcription
# Create feature branch
git checkout -b feature/your-feature-name
# Setup environment
python -m venv venv
source venv/bin/activate # or venv\Scripts\activate on Windows
pip install -r requirements.txt
pip install -r requirements-dev.txt # Development dependencies
2. Code Quality Standards
Python Style Guide
- Follow PEP 8 style guidelines
- Use type hints for function parameters and return values
- Maximum line length: 88 characters (Black formatter)
- Use meaningful variable and function names
Code Formatting
# Install development tools
pip install black flake8 mypy pytest
# Format code
black .
# Check style
flake8 .
# Type checking
mypy app_core.py gradio_app.py
# Run tests
pytest tests/
Documentation Standards
- All functions must have docstrings
- Include type hints
- Document complex logic with inline comments
- Update README.md for new features
def submit_transcription(
self,
file_bytes: bytes,
original_filename: str,
user_id: str,
language: str,
settings: Dict[str, Any]
) -> str:
"""
Submit a new transcription job for processing.
Args:
file_bytes: Raw bytes of the audio/video file
original_filename: Original name of the uploaded file
user_id: ID of the authenticated user
language: Language code for transcription (e.g., 'en-US')
settings: Transcription configuration options
Returns:
str: Unique job ID for tracking transcription progress
Raises:
ValueError: If user_id is invalid or file is too large
ConnectionError: If Azure services are unavailable
"""
3. Testing Requirements
Unit Tests
import pytest
from unittest.mock import Mock, patch
from app_core import TranscriptionManager, AuthManager
class TestAuthManager:
def test_password_hashing(self):
password = "TestPassword123"
hashed = AuthManager.hash_password(password)
assert hashed != password
assert AuthManager.verify_password(password, hashed)
assert not AuthManager.verify_password("wrong", hashed)
def test_email_validation(self):
assert AuthManager.validate_email("[email protected]")
assert not AuthManager.validate_email("invalid-email")
assert not AuthManager.validate_email("")
class TestTranscriptionManager:
@patch('app_core.BlobServiceClient')
def test_submit_transcription(self, mock_blob):
manager = TranscriptionManager()
job_id = manager.submit_transcription(
b"fake audio data",
"test.wav",
"user123",
"en-US",
{"audio_format": "wav"}
)
assert isinstance(job_id, str)
assert len(job_id) == 36 # UUID length
Integration Tests
class TestIntegration:
def test_full_transcription_workflow(self):
# Test complete workflow from upload to download
pass
def test_user_registration_and_login(self):
# Test complete auth workflow
pass
4. Commit Guidelines
Commit Message Format
type(scope): brief description
Detailed explanation of changes if needed
- List specific changes
- Include any breaking changes
- Reference issue numbers
Closes #123
Commit Types
feat: New featurefix: Bug fixdocs: Documentation changesstyle: Code style changes (formatting, etc.)refactor: Code refactoringtest: Adding or updating testschore: Maintenance tasks
Example Commits
git commit -m "feat(auth): add password strength validation
- Implement password complexity requirements
- Add client-side validation feedback
- Update registration form UI
Closes #45"
git commit -m "fix(transcription): handle Azure service timeouts
- Add retry logic for failed API calls
- Improve error messages for users
- Log detailed error information
Fixes #67"
5. Pull Request Process
PR Checklist
- Code follows style guidelines
- All tests pass
- Documentation updated
- Security considerations reviewed
- Performance impact assessed
- Breaking changes documented
PR Template
## Description
Brief description of changes
## Type of Change
- [ ] Bug fix
- [ ] New feature
- [ ] Breaking change
- [ ] Documentation update
## Testing
- [ ] Unit tests added/updated
- [ ] Integration tests pass
- [ ] Manual testing completed
## Security
- [ ] No sensitive data exposed
- [ ] Input validation implemented
- [ ] Access controls maintained
## Performance
- [ ] No performance degradation
- [ ] Database queries optimized
- [ ] Resource usage considered
Feature Development
Adding New Languages
# 1. Update environment configuration
ALLOWED_LANGS = {
"en-US": "English (United States)",
"es-ES": "Spanish (Spain)",
"new-LANG": "New Language Name"
}
# 2. Test language support
def test_new_language():
# Verify Azure Speech Services supports the language
# Test transcription accuracy
# Update documentation
Adding New Audio Formats
# 1. Update supported formats list
AUDIO_FORMATS = [
"wav", "mp3", "ogg", "opus", "flac",
"new_format" # Add new format
]
# 2. Update FFmpeg conversion logic
def _convert_to_audio(self, input_path, output_path, audio_format="wav"):
if audio_format == "new_format":
# Add specific conversion parameters
cmd = ["ffmpeg", "-i", input_path, "-codec", "new_codec", output_path]
Adding New Features
# 1. Database schema updates
def upgrade_database_schema():
with self.get_connection() as conn:
conn.execute("""
ALTER TABLE transcriptions
ADD COLUMN new_feature_data TEXT
""")
# 2. API endpoint updates
def new_feature_endpoint(user_id: str, feature_data: Dict) -> Dict:
# Implement new feature logic
pass
# 3. UI updates
def add_new_feature_ui():
new_feature_input = gr.Textbox(label="New Feature")
new_feature_button = gr.Button("Use New Feature")
βοΈ Advanced Configuration
Performance Optimization
Concurrent Processing
# Adjust worker thread pool size based on server capacity
class TranscriptionManager:
def __init__(self, max_workers: int = None):
if max_workers is None:
# Auto-detect based on CPU cores
import multiprocessing
max_workers = min(multiprocessing.cpu_count(), 10)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# Configure based on server specs
# Small server: max_workers=2-4
# Medium server: max_workers=5-8
# Large server: max_workers=10+
Database Optimization
# SQLite performance tuning
def configure_database_performance(db_path: str):
with sqlite3.connect(db_path) as conn:
# Enable WAL mode for better concurrency
conn.execute("PRAGMA journal_mode=WAL")
# Increase cache size (in KB)
conn.execute("PRAGMA cache_size=10000")
# Optimize synchronization
conn.execute("PRAGMA synchronous=NORMAL")
# Enable foreign keys
conn.execute("PRAGMA foreign_keys=ON")
Memory Management
# Large file handling
def process_large_file(file_path: str):
"""Process large files in chunks to manage memory"""
chunk_size = 64 * 1024 * 1024 # 64MB chunks
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
# Process chunk
yield chunk
# Garbage collection for long-running processes
import gc
def cleanup_memory():
"""Force garbage collection"""
gc.collect()
# Schedule periodic cleanup
schedule.every(30).minutes.do(cleanup_memory)
Security Hardening
Rate Limiting
from collections import defaultdict
from time import time
class RateLimiter:
def __init__(self, max_requests: int = 100, window: int = 3600):
self.max_requests = max_requests
self.window = window
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = time()
user_requests = self.requests[user_id]
# Clean old requests
user_requests[:] = [req_time for req_time in user_requests
if now - req_time < self.window]
# Check limit
if len(user_requests) >= self.max_requests:
return False
user_requests.append(now)
return True
# Usage in endpoints
rate_limiter = RateLimiter(max_requests=50, window=3600) # 50 per hour
def submit_transcription(self, user_id: str, ...):
if not rate_limiter.is_allowed(user_id):
raise Exception("Rate limit exceeded")
Input Sanitization
import bleach
import re
def sanitize_filename(filename: str) -> str:
"""Sanitize uploaded filename"""
# Remove path traversal attempts
filename = os.path.basename(filename)
# Remove dangerous characters
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Limit length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
filename = name[:250] + ext
return filename
def sanitize_user_input(text: str) -> str:
"""Sanitize user text input"""
# Remove HTML tags
text = bleach.clean(text, tags=[], strip=True)
# Limit length
text = text[:1000]
return text.strip()
Audit Logging
class AuditLogger:
def __init__(self):
self.logger = logging.getLogger('audit')
def log_user_action(self, user_id: str, action: str, details: Dict = None):
"""Log user actions for security auditing"""
audit_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': user_id,
'action': action,
'details': details or {},
'ip_address': self._get_client_ip(),
'user_agent': self._get_user_agent()
}
self.logger.info(json.dumps(audit_entry))
def _get_client_ip(self) -> str:
# Implementation depends on deployment setup
return "unknown"
def _get_user_agent(self) -> str:
# Implementation depends on deployment setup
return "unknown"
# Usage
audit = AuditLogger()
audit.log_user_action(user_id, "login", {"success": True})
audit.log_user_action(user_id, "transcription_submit", {"filename": filename})
Custom Extensions
Plugin Architecture
class TranscriptionPlugin:
"""Base class for transcription plugins"""
def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
"""Pre-process audio before transcription"""
return file_bytes
def post_process(self, transcript: str, settings: Dict) -> str:
"""Post-process transcript text"""
return transcript
def get_name(self) -> str:
"""Return plugin name"""
raise NotImplementedError
class NoiseReductionPlugin(TranscriptionPlugin):
def get_name(self) -> str:
return "noise_reduction"
def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
# Implement noise reduction using audio processing library
# This is a placeholder - actual implementation would use
# libraries like librosa, scipy, or pydub
return file_bytes
class LanguageDetectionPlugin(TranscriptionPlugin):
def get_name(self) -> str:
return "language_detection"
def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
# Detect language and update settings
detected_language = self._detect_language(file_bytes)
settings['detected_language'] = detected_language
return file_bytes
# Plugin manager
class PluginManager:
def __init__(self):
self.plugins: List[TranscriptionPlugin] = []
def register_plugin(self, plugin: TranscriptionPlugin):
self.plugins.append(plugin)
def apply_pre_processing(self, file_bytes: bytes, settings: Dict) -> bytes:
for plugin in self.plugins:
file_bytes = plugin.pre_process(file_bytes, settings)
return file_bytes
def apply_post_processing(self, transcript: str, settings: Dict) -> str:
for plugin in self.plugins:
transcript = plugin.post_process(transcript, settings)
return transcript
π§ Troubleshooting
Common Development Issues
Environment Setup Problems
Issue: Azure connection fails
# Check environment variables
python -c "
import os
print('AZURE_SPEECH_KEY:', bool(os.getenv('AZURE_SPEECH_KEY')))
print('AZURE_BLOB_CONNECTION:', bool(os.getenv('AZURE_BLOB_CONNECTION')))
"
# Test Azure connection
python -c "
from azure.storage.blob import BlobServiceClient
client = BlobServiceClient.from_connection_string('$AZURE_BLOB_CONNECTION')
print('Containers:', list(client.list_containers()))
"
Issue: FFmpeg not found
# Check FFmpeg installation
ffmpeg -version
# Install FFmpeg (Ubuntu/Debian)
sudo apt update && sudo apt install ffmpeg
# Install FFmpeg (Windows with Chocolatey)
choco install ffmpeg
# Install FFmpeg (macOS with Homebrew)
brew install ffmpeg
Issue: Database initialization fails
# Check database permissions
import os
db_dir = "database"
if not os.path.exists(db_dir):
os.makedirs(db_dir)
print(f"Created directory: {db_dir}")
# Test database creation
import sqlite3
conn = sqlite3.connect("database/test.db")
conn.execute("CREATE TABLE test (id INTEGER)")
conn.close()
print("Database test successful")
Runtime Issues
Issue: Memory errors with large files
# Monitor memory usage
import psutil
def check_memory():
memory = psutil.virtual_memory()
print(f"Memory usage: {memory.percent}%")
print(f"Available: {memory.available / 1024**3:.1f}GB")
# Implement file chunking for large uploads
def process_large_file_in_chunks(file_path: str, chunk_size: int = 64*1024*1024):
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
yield chunk
Issue: Transcription jobs stuck
# Check pending jobs
def diagnose_stuck_jobs():
pending_jobs = transcription_manager.db.get_pending_jobs()
print(f"Pending jobs: {len(pending_jobs)}")
for job in pending_jobs:
duration = datetime.now() - datetime.fromisoformat(job.created_at)
print(f"Job {job.job_id}: {job.status} for {duration}")
if duration.total_seconds() > 3600: # 1 hour
print(f"β οΈ Job {job.job_id} may be stuck")
# Reset stuck jobs
def reset_stuck_jobs():
with transcription_manager.db.get_connection() as conn:
conn.execute("""
UPDATE transcriptions
SET status = 'pending', azure_trans_id = NULL
WHERE status = 'processing'
AND created_at < datetime('now', '-1 hour')
""")
Issue: Azure API errors
# Test Azure Speech Service
def test_azure_speech():
try:
url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/v3.2/transcriptions"
headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.text[:200]}")
except Exception as e:
print(f"Azure Speech test failed: {e}")
# Check Azure service status
def check_azure_status():
# Check Azure status page
status_url = "https://status.azure.com/en-us/status"
print(f"Check Azure status: {status_url}")
Debugging Tools
Debug Mode Configuration
# Enable debug mode
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
if DEBUG:
logging.basicConfig(level=logging.DEBUG)
# Enable Gradio debug mode
demo.launch(debug=True, show_error=True)
Performance Profiling
import cProfile
import pstats
def profile_function(func):
"""Profile function performance"""
profiler = cProfile.Profile()
def wrapper(*args, **kwargs):
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# Print stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return result
return wrapper
# Usage
@profile_function
def submit_transcription(self, ...):
# Function implementation
pass
Log Analysis
def analyze_logs(log_file: str = "logs/transcription.log"):
"""Analyze application logs for issues"""
errors = []
warnings = []
performance_issues = []
with open(log_file, 'r') as f:
for line in f:
if 'ERROR' in line:
errors.append(line.strip())
elif 'WARNING' in line:
warnings.append(line.strip())
elif 'completed in' in line:
# Extract timing information
import re
match = re.search(r'completed in (\d+\.\d+)s', line)
if match and float(match.group(1)) > 30: # > 30 seconds
performance_issues.append(line.strip())
print(f"Errors: {len(errors)}")
print(f"Warnings: {len(warnings)}")
print(f"Performance issues: {len(performance_issues)}")
return {
'errors': errors[-10:], # Last 10 errors
'warnings': warnings[-10:], # Last 10 warnings
'performance_issues': performance_issues[-10:]
}
Production Troubleshooting
Service Health Check
#!/bin/bash
# health_check.sh
echo "=== System Health Check ==="
# Check service status
systemctl is-active transcription
systemctl is-active nginx
# Check disk space
df -h
# Check memory usage
free -h
# Check CPU usage
top -b -n1 | grep "Cpu(s)"
# Check logs for errors
tail -n 50 /home/transcription/app/logs/transcription.log | grep ERROR
# Check Azure connectivity
curl -s -o /dev/null -w "%{http_code}" https://azure.microsoft.com/
echo "=== Health Check Complete ==="
Database Recovery
def recover_database():
"""Recover database from Azure backup"""
try:
# List available backups
container_client = blob_service.get_container_client(AZURE_CONTAINER)
backups = []
for blob in container_client.list_blobs(name_starts_with="shared/backups/"):
backups.append({
'name': blob.name,
'modified': blob.last_modified
})
# Sort by date (newest first)
backups.sort(key=lambda x: x['modified'], reverse=True)
if not backups:
print("No backups found")
return
# Download latest backup
latest_backup = backups[0]['name']
print(f"Restoring from: {latest_backup}")
blob_client = blob_service.get_blob_client(
container=AZURE_CONTAINER,
blob=latest_backup
)
# Download backup
with open("database/transcriptions_restored.db", "wb") as f:
f.write(blob_client.download_blob().readall())
print("Database restored successfully")
print("Restart the application to use restored database")
except Exception as e:
print(f"Database recovery failed: {str(e)}")
π Additional Resources
Documentation Links
- Azure Speech Services Documentation
- Azure Blob Storage Documentation
- Gradio Documentation
- SQLite Documentation
- FFmpeg Documentation
Useful Tools
- Azure Storage Explorer: GUI for managing blob storage
- DB Browser for SQLite: Visual database management
- Postman: API testing and development
- Azure CLI: Command-line Azure management
- Visual Studio Code: Recommended IDE with Azure extensions
Community Resources
This developer guide provides comprehensive information for setting up, developing, deploying, and maintaining the Azure Speech Transcription service. For additional help, refer to the linked documentation and community resources. π