Azure_Transcipt_Service / Developer.md
Chirapath's picture
Upload 8 files
d891c35 verified

A newer version of the Gradio SDK is available: 6.0.1

Upgrade

πŸ› οΈ Azure Speech Transcription - Developer Guide

πŸ“‹ Table of Contents


πŸ—οΈ System Architecture

Overview

The Azure Speech Transcription service is built with a modern, secure architecture focusing on user privacy, PDPA compliance, and scalability.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Frontend UI   β”‚    β”‚   Backend API   β”‚    β”‚ Azure Services  β”‚
β”‚   (Gradio)      │◄──►│   (Python)      │◄──►│ Speech & Blob   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                       β”‚                       β”‚
         β”‚                       β”‚                       β”‚
         β–Ό                       β–Ό                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   User Session  β”‚    β”‚ SQLite Database β”‚    β”‚  User Storage   β”‚
β”‚   Management    β”‚    β”‚   (Metadata)    β”‚    β”‚   (Isolated)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

1. Frontend Layer (gradio_app.py)

  • Technology: Gradio with custom CSS
  • Purpose: User interface and session management
  • Features: Authentication, file upload, real-time status, history management

2. Backend Layer (app_core.py)

  • Technology: Python with threading and async processing
  • Purpose: Business logic, authentication, and Azure integration
  • Features: User management, transcription processing, PDPA compliance

3. Data Layer

  • Database: SQLite with Azure Blob backup
  • Storage: Azure Blob Storage with user separation
  • Security: User-isolated folders and encrypted connections

4. External Services

  • Azure Speech Services: Transcription processing
  • Azure Blob Storage: File and database storage
  • FFmpeg: Audio/video conversion

Data Flow

1. User uploads file β†’ 2. Authentication check β†’ 3. File validation
        ↓                       ↓                       ↓
8. Download results ← 7. Store transcript ← 6. Process with Azure
        ↑                       ↑                       ↑
9. Update UI status ← 4. Save to user folder ← 5. Background processing

πŸ’» Development Environment

Prerequisites

  • Python: 3.8 or higher
  • Azure Account: With Speech Services and Blob Storage
  • FFmpeg: For audio/video processing
  • Git: For version control

Environment Setup

1. Clone Repository

git clone <repository-url>
cd azure-speech-transcription

2. Virtual Environment

# Create virtual environment
python -m venv venv

# Activate (Windows)
venv\Scripts\activate

# Activate (macOS/Linux)
source venv/bin/activate

3. Install Dependencies

pip install -r requirements.txt

4. Environment Configuration

# Copy environment template
cp .env.example .env

# Edit with your Azure credentials
nano .env

5. Install FFmpeg

Windows (Chocolatey):

choco install ffmpeg

macOS (Homebrew):

brew install ffmpeg

Ubuntu/Debian:

sudo apt update
sudo apt install ffmpeg

6. Verify Installation

python -c "
import gradio as gr
from azure.storage.blob import BlobServiceClient
import subprocess
print('Gradio:', gr.__version__)
print('FFmpeg:', subprocess.run(['ffmpeg', '-version'], capture_output=True).returncode == 0)
print('Azure Blob:', 'OK')
"

Development Server

# Start development server
python gradio_app.py

# Server will be available at:
# http://localhost:7860

Development Tools

Recommended IDE Setup

  • VS Code: With Python, Azure, and Git extensions
  • PyCharm: Professional edition with Azure toolkit
  • Vim/Emacs: With appropriate Python plugins

Useful Extensions

{
  "recommendations": [
    "ms-python.python",
    "ms-vscode.azure-cli",
    "ms-azuretools.azure-cli-tools",
    "ms-python.black-formatter",
    "ms-python.flake8"
  ]
}

Code Quality Tools

# Install development tools
pip install black flake8 pytest mypy

# Format code
black .

# Lint code
flake8 .

# Type checking
mypy app_core.py gradio_app.py

πŸš€ Deployment Guide

Production Deployment Options

Option 1: Traditional Server Deployment

1. Server Preparation

# Update system
sudo apt update && sudo apt upgrade -y

# Install Python and dependencies
sudo apt install python3 python3-pip python3-venv nginx ffmpeg -y

# Create application user
sudo useradd -m -s /bin/bash transcription
sudo su - transcription

2. Application Setup

# Clone repository
git clone <repository-url> /home/transcription/app
cd /home/transcription/app

# Setup virtual environment
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

# Configure environment
cp .env.example .env
# Edit .env with production values

3. Systemd Service

# /etc/systemd/system/transcription.service
[Unit]
Description=Azure Speech Transcription Service
After=network.target

[Service]
Type=simple
User=transcription
Group=transcription
WorkingDirectory=/home/transcription/app
Environment=PATH=/home/transcription/app/venv/bin
ExecStart=/home/transcription/app/venv/bin/python gradio_app.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

4. Nginx Configuration

# /etc/nginx/sites-available/transcription
server {
    listen 80;
    server_name your-domain.com;
    client_max_body_size 500M;

    location / {
        proxy_pass http://127.0.0.1:7860;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 300s;
        proxy_connect_timeout 75s;
    }
}

5. SSL Certificate

# Install Certbot
sudo apt install certbot python3-certbot-nginx -y

# Get SSL certificate
sudo certbot --nginx -d your-domain.com

# Verify auto-renewal
sudo certbot renew --dry-run

6. Start Services

# Enable and start application
sudo systemctl enable transcription
sudo systemctl start transcription

# Enable and restart nginx
sudo systemctl enable nginx
sudo systemctl restart nginx

# Check status
sudo systemctl status transcription
sudo systemctl status nginx

Option 2: Docker Deployment

1. Dockerfile

FROM python:3.9-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create necessary directories
RUN mkdir -p uploads database temp

# Expose port
EXPOSE 7860

# Run application
CMD ["python", "gradio_app.py"]

2. Docker Compose

# docker-compose.yml
version: '3.8'

services:
  transcription:
    build: .
    ports:
      - "7860:7860"
    environment:
      - AZURE_SPEECH_KEY=${AZURE_SPEECH_KEY}
      - AZURE_SPEECH_KEY_ENDPOINT=${AZURE_SPEECH_KEY_ENDPOINT}
      - AZURE_REGION=${AZURE_REGION}
      - AZURE_BLOB_CONNECTION=${AZURE_BLOB_CONNECTION}
      - AZURE_CONTAINER=${AZURE_CONTAINER}
      - AZURE_BLOB_SAS_TOKEN=${AZURE_BLOB_SAS_TOKEN}
      - ALLOWED_LANGS=${ALLOWED_LANGS}
    volumes:
      - ./uploads:/app/uploads
      - ./database:/app/database
      - ./temp:/app/temp
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/ssl/certs
    depends_on:
      - transcription
    restart: unless-stopped

3. Deploy with Docker

# Build and start
docker-compose up -d

# View logs
docker-compose logs -f transcription

# Update application
git pull
docker-compose build transcription
docker-compose up -d transcription

Option 3: Cloud Deployment (Azure Container Instances)

1. Create Container Registry

# Create ACR
az acr create --resource-group myResourceGroup \
  --name myregistry --sku Basic

# Login to ACR
az acr login --name myregistry

# Build and push image
docker build -t myregistry.azurecr.io/transcription:latest .
docker push myregistry.azurecr.io/transcription:latest

2. Deploy Container Instance

# Create container instance
az container create \
  --resource-group myResourceGroup \
  --name transcription-app \
  --image myregistry.azurecr.io/transcription:latest \
  --cpu 2 --memory 4 \
  --port 7860 \
  --environment-variables \
    AZURE_SPEECH_KEY=$AZURE_SPEECH_KEY \
    AZURE_SPEECH_KEY_ENDPOINT=$AZURE_SPEECH_KEY_ENDPOINT \
    AZURE_REGION=$AZURE_REGION \
    AZURE_BLOB_CONNECTION="$AZURE_BLOB_CONNECTION" \
    AZURE_CONTAINER=$AZURE_CONTAINER \
    AZURE_BLOB_SAS_TOKEN="$AZURE_BLOB_SAS_TOKEN"

πŸ“‘ API Documentation

Core Classes and Methods

TranscriptionManager Class

Purpose: Main service class handling all transcription operations

class TranscriptionManager:
    def __init__(self)
    
    # User Authentication
    def register_user(email: str, username: str, password: str, 
                     gdpr_consent: bool, data_retention_agreed: bool, 
                     marketing_consent: bool) -> Tuple[bool, str, Optional[str]]
    
    def login_user(login: str, password: str) -> Tuple[bool, str, Optional[User]]
    
    # Transcription Operations  
    def submit_transcription(file_bytes: bytes, original_filename: str,
                           user_id: str, language: str, 
                           settings: Dict) -> str
    
    def get_job_status(job_id: str) -> Optional[TranscriptionJob]
    
    # Data Management
    def get_user_history(user_id: str, limit: int) -> List[TranscriptionJob]
    def get_user_stats(user_id: str) -> Dict
    def export_user_data(user_id: str) -> Dict
    def delete_user_account(user_id: str) -> bool

DatabaseManager Class

Purpose: Handle database operations and Azure blob synchronization

class DatabaseManager:
    def __init__(db_path: str = None)
    
    # User Operations
    def create_user(...) -> Tuple[bool, str, Optional[str]]
    def authenticate_user(login: str, password: str) -> Tuple[bool, str, Optional[User]]
    def get_user_by_id(user_id: str) -> Optional[User]
    
    # Job Operations
    def save_job(job: TranscriptionJob)
    def get_job(job_id: str) -> Optional[TranscriptionJob]
    def get_user_jobs(user_id: str, limit: int) -> List[TranscriptionJob]
    def get_pending_jobs() -> List[TranscriptionJob]

AuthManager Class

Purpose: Authentication utilities and validation

class AuthManager:
    @staticmethod
    def hash_password(password: str) -> str
    def verify_password(password: str, password_hash: str) -> bool
    def validate_email(email: str) -> bool
    def validate_username(username: str) -> bool
    def validate_password(password: str) -> Tuple[bool, str]

Data Models

User Model

@dataclass
class User:
    user_id: str
    email: str
    username: str
    password_hash: str
    created_at: str
    last_login: Optional[str] = None
    is_active: bool = True
    gdpr_consent: bool = False
    data_retention_agreed: bool = False
    marketing_consent: bool = False

TranscriptionJob Model

@dataclass
class TranscriptionJob:
    job_id: str
    user_id: str
    original_filename: str
    audio_url: str
    language: str
    status: str  # pending, processing, completed, failed
    created_at: str
    completed_at: Optional[str] = None
    transcript_text: Optional[str] = None
    transcript_url: Optional[str] = None
    error_message: Optional[str] = None
    azure_trans_id: Optional[str] = None
    settings: Optional[Dict] = None

Configuration Parameters

Environment Variables

# Required
AZURE_SPEECH_KEY: str
AZURE_SPEECH_KEY_ENDPOINT: str  
AZURE_REGION: str
AZURE_BLOB_CONNECTION: str
AZURE_CONTAINER: str
AZURE_BLOB_SAS_TOKEN: str

# Optional
ALLOWED_LANGS: str  # JSON string
API_VERSION: str = "v3.2"
PASSWORD_SALT: str = "default_salt"
MAX_FILE_SIZE_MB: int = 500

Transcription Settings

settings = {
    'audio_format': str,           # wav, mp3, etc.
    'diarization_enabled': bool,   # Speaker identification
    'speakers': int,               # Max speakers (1-10)
    'profanity': str,              # masked, removed, raw
    'punctuation': str,            # automatic, dictated, none
    'timestamps': bool,            # Include timestamps
    'lexical': bool,               # Include lexical forms
    'language_id_enabled': bool,   # Auto language detection
    'candidate_locales': List[str] # Language candidates
}

πŸ—„οΈ Database Schema

SQLite Database Structure

Users Table

CREATE TABLE users (
    user_id TEXT PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    username TEXT UNIQUE NOT NULL,
    password_hash TEXT NOT NULL,
    created_at TEXT NOT NULL,
    last_login TEXT,
    is_active BOOLEAN DEFAULT 1,
    gdpr_consent BOOLEAN DEFAULT 0,
    data_retention_agreed BOOLEAN DEFAULT 0,
    marketing_consent BOOLEAN DEFAULT 0
);

-- Indexes
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);

Transcriptions Table

CREATE TABLE transcriptions (
    job_id TEXT PRIMARY KEY,
    user_id TEXT NOT NULL,
    original_filename TEXT NOT NULL,
    audio_url TEXT,
    language TEXT NOT NULL,
    status TEXT NOT NULL,
    created_at TEXT NOT NULL,
    completed_at TEXT,
    transcript_text TEXT,
    transcript_url TEXT,
    error_message TEXT,
    azure_trans_id TEXT,
    settings TEXT,
    FOREIGN KEY (user_id) REFERENCES users (user_id)
);

-- Indexes
CREATE INDEX idx_transcriptions_user_id ON transcriptions(user_id);
CREATE INDEX idx_transcriptions_status ON transcriptions(status);
CREATE INDEX idx_transcriptions_created_at ON transcriptions(created_at DESC);
CREATE INDEX idx_transcriptions_user_created ON transcriptions(user_id, created_at DESC);

Azure Blob Storage Structure

Container: {AZURE_CONTAINER}/
β”œβ”€β”€ shared/
β”‚   └── database/
β”‚       └── transcriptions.db           # Shared database backup
β”œβ”€β”€ users/
β”‚   β”œβ”€β”€ {user-id-1}/
β”‚   β”‚   β”œβ”€β”€ audio/                      # Processed audio files
β”‚   β”‚   β”‚   β”œβ”€β”€ {job-id-1}.wav
β”‚   β”‚   β”‚   └── {job-id-2}.wav
β”‚   β”‚   β”œβ”€β”€ transcripts/                # Transcript files
β”‚   β”‚   β”‚   β”œβ”€β”€ {job-id-1}.txt
β”‚   β”‚   β”‚   └── {job-id-2}.txt
β”‚   β”‚   └── originals/                  # Original uploaded files
β”‚   β”‚       β”œβ”€β”€ {job-id-1}_{filename}.mp4
β”‚   β”‚       └── {job-id-2}_{filename}.wav
β”‚   └── {user-id-2}/
β”‚       β”œβ”€β”€ audio/
β”‚       β”œβ”€β”€ transcripts/
β”‚       └── originals/

Database Operations

User Management Queries

-- Create user
INSERT INTO users (user_id, email, username, password_hash, created_at, 
                   gdpr_consent, data_retention_agreed, marketing_consent)
VALUES (?, ?, ?, ?, ?, ?, ?, ?);

-- Authenticate user
SELECT * FROM users 
WHERE (email = ? OR username = ?) AND is_active = 1;

-- Update last login
UPDATE users SET last_login = ? WHERE user_id = ?;

-- Get user stats
SELECT status, COUNT(*) FROM transcriptions 
WHERE user_id = ? GROUP BY status;

Job Management Queries

-- Create job
INSERT INTO transcriptions (job_id, user_id, original_filename, language, 
                           status, created_at, settings)
VALUES (?, ?, ?, ?, 'pending', ?, ?);

-- Update job status
UPDATE transcriptions 
SET status = ?, completed_at = ?, transcript_text = ?, transcript_url = ?
WHERE job_id = ?;

-- Get user jobs
SELECT * FROM transcriptions 
WHERE user_id = ? 
ORDER BY created_at DESC LIMIT ?;

-- Get pending jobs for background processor
SELECT * FROM transcriptions 
WHERE status IN ('pending', 'processing');

πŸ”’ Security Implementation

Authentication Security

Password Security

# Password hashing with salt
def hash_password(password: str) -> str:
    salt = os.environ.get("PASSWORD_SALT", "default_salt")
    return hashlib.sha256((password + salt).encode()).hexdigest()

# Password validation
def validate_password(password: str) -> Tuple[bool, str]:
    if len(password) < 8:
        return False, "Password must be at least 8 characters"
    if not re.search(r'[A-Z]', password):
        return False, "Password must contain uppercase letter"
    if not re.search(r'[a-z]', password):
        return False, "Password must contain lowercase letter"
    if not re.search(r'\d', password):
        return False, "Password must contain number"
    return True, "Valid"

Session Management

# User session state
session_state = {
    'user_id': str,
    'username': str,
    'logged_in_at': datetime,
    'last_activity': datetime
}

# Session validation
def validate_session(session_state: dict) -> bool:
    if not session_state or 'user_id' not in session_state:
        return False
    
    # Check session timeout (if implemented)
    last_activity = session_state.get('last_activity')
    if last_activity:
        timeout = timedelta(hours=24)  # 24-hour sessions
        if datetime.now() - last_activity > timeout:
            return False
    
    return True

Data Security

Access Control

# User data access verification
def verify_user_access(job_id: str, user_id: str) -> bool:
    job = get_job(job_id)
    return job and job.user_id == user_id

# File path security
def get_user_blob_path(user_id: str, blob_type: str, filename: str) -> str:
    # Ensure user can only access their own folder
    safe_filename = os.path.basename(filename)  # Prevent path traversal
    return f"users/{user_id}/{blob_type}/{safe_filename}"

Data Encryption

# Azure Blob Storage encryption (configured at Azure level)
# - Encryption at rest: Enabled by default
# - Encryption in transit: HTTPS enforced
# - Customer-managed keys: Optional enhancement

# Database encryption (for sensitive fields)
from cryptography.fernet import Fernet

def encrypt_sensitive_data(data: str, key: bytes) -> str:
    f = Fernet(key)
    return f.encrypt(data.encode()).decode()

def decrypt_sensitive_data(encrypted_data: str, key: bytes) -> str:
    f = Fernet(key)
    return f.decrypt(encrypted_data.encode()).decode()

Azure Security

Blob Storage Security

# SAS token configuration for least privilege
sas_permissions = BlobSasPermissions(
    read=True,
    write=True,
    delete=True,
    list=True
)

# IP restrictions (optional)
sas_ip_range = "192.168.1.0/24"  # Restrict to specific IP range

# Time-limited tokens
sas_expiry = datetime.utcnow() + timedelta(hours=1)

Speech Service Security

# Secure API calls
headers = {
    "Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY,
    "Content-Type": "application/json"
}

# Request timeout and retry logic
response = requests.post(
    url, 
    headers=headers, 
    json=body,
    timeout=30,
    verify=True  # Verify SSL certificates
)

Input Validation

File Upload Security

def validate_uploaded_file(file_path: str, max_size: int = 500 * 1024 * 1024) -> Tuple[bool, str]:
    try:
        # Check file exists
        if not os.path.exists(file_path):
            return False, "File not found"
        
        # Check file size
        file_size = os.path.getsize(file_path)
        if file_size > max_size:
            return False, f"File too large: {file_size / 1024 / 1024:.1f}MB"
        
        # Check file type by content (not just extension)
        import magic
        mime_type = magic.from_file(file_path, mime=True)
        allowed_types = ['audio/', 'video/']
        if not any(mime_type.startswith(t) for t in allowed_types):
            return False, f"Invalid file type: {mime_type}"
        
        return True, "Valid"
    
    except Exception as e:
        return False, f"Validation error: {str(e)}"

SQL Injection Prevention

# Use parameterized queries (already implemented)
cursor.execute(
    "SELECT * FROM users WHERE email = ? AND password_hash = ?",
    (email, password_hash)
)

# Input sanitization
def sanitize_input(user_input: str) -> str:
    # Remove dangerous characters
    import html
    sanitized = html.escape(user_input)
    # Limit length
    return sanitized[:1000]

πŸ“Š Monitoring & Maintenance

Application Monitoring

Health Checks

def health_check() -> Dict[str, Any]:
    """System health check endpoint"""
    try:
        # Database check
        db_status = check_database_connection()
        
        # Azure services check
        blob_status = check_blob_storage()
        speech_status = check_speech_service()
        
        # FFmpeg check
        ffmpeg_status = check_ffmpeg_installation()
        
        # Disk space check
        disk_status = check_disk_space()
        
        return {
            'status': 'healthy' if all([db_status, blob_status, speech_status, ffmpeg_status]) else 'unhealthy',
            'timestamp': datetime.now().isoformat(),
            'services': {
                'database': db_status,
                'blob_storage': blob_status,
                'speech_service': speech_status,
                'ffmpeg': ffmpeg_status,
                'disk_space': disk_status
            }
        }
    
    except Exception as e:
        return {
            'status': 'error',
            'timestamp': datetime.now().isoformat(),
            'error': str(e)
        }

def check_database_connection() -> bool:
    try:
        with transcription_manager.db.get_connection() as conn:
            conn.execute("SELECT 1").fetchone()
        return True
    except:
        return False

def check_blob_storage() -> bool:
    try:
        client = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
        client.list_containers(max_results=1)
        return True
    except:
        return False

Logging Configuration

import logging
from logging.handlers import RotatingFileHandler

def setup_logging():
    """Configure application logging"""
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    console_handler.setLevel(logging.INFO)
    
    # File handler with rotation
    file_handler = RotatingFileHandler(
        'logs/transcription.log',
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.DEBUG)
    
    # Configure root logger
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    # Separate logger for sensitive operations
    auth_logger = logging.getLogger('auth')
    auth_handler = RotatingFileHandler(
        'logs/auth.log',
        maxBytes=5*1024*1024,  # 5MB
        backupCount=10
    )
    auth_handler.setFormatter(formatter)
    auth_logger.addHandler(auth_handler)
    auth_logger.setLevel(logging.INFO)

Performance Monitoring

import time
from functools import wraps

def monitor_performance(func):
    """Decorator to monitor function performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            logging.info(f"{func.__name__} completed in {duration:.2f}s")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logging.error(f"{func.__name__} failed after {duration:.2f}s: {str(e)}")
            raise
    return wrapper

# Usage
@monitor_performance
def submit_transcription(self, file_bytes, filename, user_id, language, settings):
    # Implementation here
    pass

Database Maintenance

Backup Strategy

def backup_database():
    """Backup database to Azure Blob Storage"""
    try:
        # Create timestamped backup
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_name = f"shared/backups/transcriptions_backup_{timestamp}.db"
        
        # Upload current database
        blob_client = blob_service.get_blob_client(
            container=AZURE_CONTAINER, 
            blob=backup_name
        )
        
        with open(db_path, "rb") as data:
            blob_client.upload_blob(data)
        
        logging.info(f"Database backup created: {backup_name}")
        
        # Clean old backups (keep last 30 days)
        cleanup_old_backups()
        
    except Exception as e:
        logging.error(f"Database backup failed: {str(e)}")

def cleanup_old_backups():
    """Remove backups older than 30 days"""
    try:
        cutoff_date = datetime.now() - timedelta(days=30)
        container_client = blob_service.get_container_client(AZURE_CONTAINER)
        
        for blob in container_client.list_blobs(name_starts_with="shared/backups/"):
            if blob.last_modified < cutoff_date:
                blob_service.delete_blob(AZURE_CONTAINER, blob.name)
                logging.info(f"Deleted old backup: {blob.name}")
                
    except Exception as e:
        logging.error(f"Backup cleanup failed: {str(e)}")

Database Optimization

def optimize_database():
    """Optimize database performance"""
    try:
        with transcription_manager.db.get_connection() as conn:
            # Analyze tables
            conn.execute("ANALYZE")
            
            # Vacuum database (compact)
            conn.execute("VACUUM")
            
            # Update statistics
            conn.execute("PRAGMA optimize")
            
        logging.info("Database optimization completed")
        
    except Exception as e:
        logging.error(f"Database optimization failed: {str(e)}")

# Schedule optimization (run weekly)
import schedule

schedule.every().week.do(optimize_database)
schedule.every().day.at("02:00").do(backup_database)

Resource Management

Cleanup Tasks

def cleanup_temporary_files():
    """Clean up temporary files older than 24 hours"""
    try:
        cutoff_time = time.time() - (24 * 60 * 60)  # 24 hours ago
        temp_dirs = ['uploads', 'temp']
        
        for temp_dir in temp_dirs:
            if os.path.exists(temp_dir):
                for filename in os.listdir(temp_dir):
                    filepath = os.path.join(temp_dir, filename)
                    if os.path.isfile(filepath) and os.path.getmtime(filepath) < cutoff_time:
                        os.remove(filepath)
                        logging.info(f"Cleaned up temporary file: {filepath}")
                        
    except Exception as e:
        logging.error(f"Temporary file cleanup failed: {str(e)}")

def monitor_disk_space():
    """Monitor and alert on disk space"""
    try:
        import shutil
        total, used, free = shutil.disk_usage("/")
        
        # Convert to GB
        free_gb = free // (1024**3)
        total_gb = total // (1024**3)
        usage_percent = (used / total) * 100
        
        if usage_percent > 85:
            logging.warning(f"High disk usage: {usage_percent:.1f}% ({free_gb}GB free)")
            
        if free_gb < 5:
            logging.critical(f"Low disk space: {free_gb}GB remaining")
            
    except Exception as e:
        logging.error(f"Disk space monitoring failed: {str(e)}")

Monitoring Alerts

Email Alerts (Optional)

import smtplib
from email.mime.text import MIMEText

def send_alert(subject: str, message: str):
    """Send email alert for critical issues"""
    try:
        smtp_server = os.environ.get("SMTP_SERVER")
        smtp_port = int(os.environ.get("SMTP_PORT", "587"))
        smtp_user = os.environ.get("SMTP_USER")
        smtp_pass = os.environ.get("SMTP_PASS")
        alert_email = os.environ.get("ALERT_EMAIL")
        
        if not all([smtp_server, smtp_user, smtp_pass, alert_email]):
            return  # Email not configured
        
        msg = MIMEText(message)
        msg['Subject'] = f"[Transcription Service] {subject}"
        msg['From'] = smtp_user
        msg['To'] = alert_email
        
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(smtp_user, smtp_pass)
            server.send_message(msg)
            
    except Exception as e:
        logging.error(f"Failed to send alert: {str(e)}")

🀝 Contributing Guidelines

Development Workflow

1. Setup Development Environment

# Fork repository
git clone https://github.com/your-username/azure-speech-transcription.git
cd azure-speech-transcription

# Create feature branch
git checkout -b feature/your-feature-name

# Setup environment
python -m venv venv
source venv/bin/activate  # or venv\Scripts\activate on Windows
pip install -r requirements.txt
pip install -r requirements-dev.txt  # Development dependencies

2. Code Quality Standards

Python Style Guide

  • Follow PEP 8 style guidelines
  • Use type hints for function parameters and return values
  • Maximum line length: 88 characters (Black formatter)
  • Use meaningful variable and function names

Code Formatting

# Install development tools
pip install black flake8 mypy pytest

# Format code
black .

# Check style
flake8 .

# Type checking
mypy app_core.py gradio_app.py

# Run tests
pytest tests/

Documentation Standards

  • All functions must have docstrings
  • Include type hints
  • Document complex logic with inline comments
  • Update README.md for new features
def submit_transcription(
    self, 
    file_bytes: bytes, 
    original_filename: str,
    user_id: str,
    language: str,
    settings: Dict[str, Any]
) -> str:
    """
    Submit a new transcription job for processing.
    
    Args:
        file_bytes: Raw bytes of the audio/video file
        original_filename: Original name of the uploaded file
        user_id: ID of the authenticated user
        language: Language code for transcription (e.g., 'en-US')
        settings: Transcription configuration options
        
    Returns:
        str: Unique job ID for tracking transcription progress
        
    Raises:
        ValueError: If user_id is invalid or file is too large
        ConnectionError: If Azure services are unavailable
    """

3. Testing Requirements

Unit Tests

import pytest
from unittest.mock import Mock, patch
from app_core import TranscriptionManager, AuthManager

class TestAuthManager:
    def test_password_hashing(self):
        password = "TestPassword123"
        hashed = AuthManager.hash_password(password)
        
        assert hashed != password
        assert AuthManager.verify_password(password, hashed)
        assert not AuthManager.verify_password("wrong", hashed)
    
    def test_email_validation(self):
        assert AuthManager.validate_email("[email protected]")
        assert not AuthManager.validate_email("invalid-email")
        assert not AuthManager.validate_email("")

class TestTranscriptionManager:
    @patch('app_core.BlobServiceClient')
    def test_submit_transcription(self, mock_blob):
        manager = TranscriptionManager()
        
        job_id = manager.submit_transcription(
            b"fake audio data",
            "test.wav",
            "user123",
            "en-US",
            {"audio_format": "wav"}
        )
        
        assert isinstance(job_id, str)
        assert len(job_id) == 36  # UUID length

Integration Tests

class TestIntegration:
    def test_full_transcription_workflow(self):
        # Test complete workflow from upload to download
        pass
    
    def test_user_registration_and_login(self):
        # Test complete auth workflow
        pass

4. Commit Guidelines

Commit Message Format

type(scope): brief description

Detailed explanation of changes if needed

- List specific changes
- Include any breaking changes
- Reference issue numbers

Closes #123

Commit Types

  • feat: New feature
  • fix: Bug fix
  • docs: Documentation changes
  • style: Code style changes (formatting, etc.)
  • refactor: Code refactoring
  • test: Adding or updating tests
  • chore: Maintenance tasks

Example Commits

git commit -m "feat(auth): add password strength validation

- Implement password complexity requirements
- Add client-side validation feedback
- Update registration form UI

Closes #45"

git commit -m "fix(transcription): handle Azure service timeouts

- Add retry logic for failed API calls
- Improve error messages for users
- Log detailed error information

Fixes #67"

5. Pull Request Process

PR Checklist

  • Code follows style guidelines
  • All tests pass
  • Documentation updated
  • Security considerations reviewed
  • Performance impact assessed
  • Breaking changes documented

PR Template

## Description
Brief description of changes

## Type of Change
- [ ] Bug fix
- [ ] New feature
- [ ] Breaking change
- [ ] Documentation update

## Testing
- [ ] Unit tests added/updated
- [ ] Integration tests pass
- [ ] Manual testing completed

## Security
- [ ] No sensitive data exposed
- [ ] Input validation implemented
- [ ] Access controls maintained

## Performance
- [ ] No performance degradation
- [ ] Database queries optimized
- [ ] Resource usage considered

Feature Development

Adding New Languages

# 1. Update environment configuration
ALLOWED_LANGS = {
    "en-US": "English (United States)",
    "es-ES": "Spanish (Spain)",
    "new-LANG": "New Language Name"
}

# 2. Test language support
def test_new_language():
    # Verify Azure Speech Services supports the language
    # Test transcription accuracy
    # Update documentation

Adding New Audio Formats

# 1. Update supported formats list
AUDIO_FORMATS = [
    "wav", "mp3", "ogg", "opus", "flac", 
    "new_format"  # Add new format
]

# 2. Update FFmpeg conversion logic
def _convert_to_audio(self, input_path, output_path, audio_format="wav"):
    if audio_format == "new_format":
        # Add specific conversion parameters
        cmd = ["ffmpeg", "-i", input_path, "-codec", "new_codec", output_path]

Adding New Features

# 1. Database schema updates
def upgrade_database_schema():
    with self.get_connection() as conn:
        conn.execute("""
            ALTER TABLE transcriptions 
            ADD COLUMN new_feature_data TEXT
        """)

# 2. API endpoint updates
def new_feature_endpoint(user_id: str, feature_data: Dict) -> Dict:
    # Implement new feature logic
    pass

# 3. UI updates
def add_new_feature_ui():
    new_feature_input = gr.Textbox(label="New Feature")
    new_feature_button = gr.Button("Use New Feature")

βš™οΈ Advanced Configuration

Performance Optimization

Concurrent Processing

# Adjust worker thread pool size based on server capacity
class TranscriptionManager:
    def __init__(self, max_workers: int = None):
        if max_workers is None:
            # Auto-detect based on CPU cores
            import multiprocessing
            max_workers = min(multiprocessing.cpu_count(), 10)
        
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

# Configure based on server specs
# Small server: max_workers=2-4
# Medium server: max_workers=5-8  
# Large server: max_workers=10+

Database Optimization

# SQLite performance tuning
def configure_database_performance(db_path: str):
    with sqlite3.connect(db_path) as conn:
        # Enable WAL mode for better concurrency
        conn.execute("PRAGMA journal_mode=WAL")
        
        # Increase cache size (in KB)
        conn.execute("PRAGMA cache_size=10000")
        
        # Optimize synchronization
        conn.execute("PRAGMA synchronous=NORMAL")
        
        # Enable foreign keys
        conn.execute("PRAGMA foreign_keys=ON")

Memory Management

# Large file handling
def process_large_file(file_path: str):
    """Process large files in chunks to manage memory"""
    chunk_size = 64 * 1024 * 1024  # 64MB chunks
    
    with open(file_path, 'rb') as f:
        while chunk := f.read(chunk_size):
            # Process chunk
            yield chunk

# Garbage collection for long-running processes
import gc

def cleanup_memory():
    """Force garbage collection"""
    gc.collect()
    
# Schedule periodic cleanup
schedule.every(30).minutes.do(cleanup_memory)

Security Hardening

Rate Limiting

from collections import defaultdict
from time import time

class RateLimiter:
    def __init__(self, max_requests: int = 100, window: int = 3600):
        self.max_requests = max_requests
        self.window = window
        self.requests = defaultdict(list)
    
    def is_allowed(self, user_id: str) -> bool:
        now = time()
        user_requests = self.requests[user_id]
        
        # Clean old requests
        user_requests[:] = [req_time for req_time in user_requests 
                           if now - req_time < self.window]
        
        # Check limit
        if len(user_requests) >= self.max_requests:
            return False
        
        user_requests.append(now)
        return True

# Usage in endpoints
rate_limiter = RateLimiter(max_requests=50, window=3600)  # 50 per hour

def submit_transcription(self, user_id: str, ...):
    if not rate_limiter.is_allowed(user_id):
        raise Exception("Rate limit exceeded")

Input Sanitization

import bleach
import re

def sanitize_filename(filename: str) -> str:
    """Sanitize uploaded filename"""
    # Remove path traversal attempts
    filename = os.path.basename(filename)
    
    # Remove dangerous characters
    filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
    
    # Limit length
    if len(filename) > 255:
        name, ext = os.path.splitext(filename)
        filename = name[:250] + ext
    
    return filename

def sanitize_user_input(text: str) -> str:
    """Sanitize user text input"""
    # Remove HTML tags
    text = bleach.clean(text, tags=[], strip=True)
    
    # Limit length
    text = text[:1000]
    
    return text.strip()

Audit Logging

class AuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('audit')
    
    def log_user_action(self, user_id: str, action: str, details: Dict = None):
        """Log user actions for security auditing"""
        audit_entry = {
            'timestamp': datetime.now().isoformat(),
            'user_id': user_id,
            'action': action,
            'details': details or {},
            'ip_address': self._get_client_ip(),
            'user_agent': self._get_user_agent()
        }
        
        self.logger.info(json.dumps(audit_entry))
    
    def _get_client_ip(self) -> str:
        # Implementation depends on deployment setup
        return "unknown"
    
    def _get_user_agent(self) -> str:
        # Implementation depends on deployment setup
        return "unknown"

# Usage
audit = AuditLogger()
audit.log_user_action(user_id, "login", {"success": True})
audit.log_user_action(user_id, "transcription_submit", {"filename": filename})

Custom Extensions

Plugin Architecture

class TranscriptionPlugin:
    """Base class for transcription plugins"""
    
    def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
        """Pre-process audio before transcription"""
        return file_bytes
    
    def post_process(self, transcript: str, settings: Dict) -> str:
        """Post-process transcript text"""
        return transcript
    
    def get_name(self) -> str:
        """Return plugin name"""
        raise NotImplementedError

class NoiseReductionPlugin(TranscriptionPlugin):
    def get_name(self) -> str:
        return "noise_reduction"
    
    def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
        # Implement noise reduction using audio processing library
        # This is a placeholder - actual implementation would use
        # libraries like librosa, scipy, or pydub
        return file_bytes

class LanguageDetectionPlugin(TranscriptionPlugin):
    def get_name(self) -> str:
        return "language_detection"
    
    def pre_process(self, file_bytes: bytes, settings: Dict) -> bytes:
        # Detect language and update settings
        detected_language = self._detect_language(file_bytes)
        settings['detected_language'] = detected_language
        return file_bytes

# Plugin manager
class PluginManager:
    def __init__(self):
        self.plugins: List[TranscriptionPlugin] = []
    
    def register_plugin(self, plugin: TranscriptionPlugin):
        self.plugins.append(plugin)
    
    def apply_pre_processing(self, file_bytes: bytes, settings: Dict) -> bytes:
        for plugin in self.plugins:
            file_bytes = plugin.pre_process(file_bytes, settings)
        return file_bytes
    
    def apply_post_processing(self, transcript: str, settings: Dict) -> str:
        for plugin in self.plugins:
            transcript = plugin.post_process(transcript, settings)
        return transcript

πŸ”§ Troubleshooting

Common Development Issues

Environment Setup Problems

Issue: Azure connection fails

# Check environment variables
python -c "
import os
print('AZURE_SPEECH_KEY:', bool(os.getenv('AZURE_SPEECH_KEY')))
print('AZURE_BLOB_CONNECTION:', bool(os.getenv('AZURE_BLOB_CONNECTION')))
"

# Test Azure connection
python -c "
from azure.storage.blob import BlobServiceClient
client = BlobServiceClient.from_connection_string('$AZURE_BLOB_CONNECTION')
print('Containers:', list(client.list_containers()))
"

Issue: FFmpeg not found

# Check FFmpeg installation
ffmpeg -version

# Install FFmpeg (Ubuntu/Debian)
sudo apt update && sudo apt install ffmpeg

# Install FFmpeg (Windows with Chocolatey)
choco install ffmpeg

# Install FFmpeg (macOS with Homebrew)
brew install ffmpeg

Issue: Database initialization fails

# Check database permissions
import os
db_dir = "database"
if not os.path.exists(db_dir):
    os.makedirs(db_dir)
    print(f"Created directory: {db_dir}")

# Test database creation
import sqlite3
conn = sqlite3.connect("database/test.db")
conn.execute("CREATE TABLE test (id INTEGER)")
conn.close()
print("Database test successful")

Runtime Issues

Issue: Memory errors with large files

# Monitor memory usage
import psutil

def check_memory():
    memory = psutil.virtual_memory()
    print(f"Memory usage: {memory.percent}%")
    print(f"Available: {memory.available / 1024**3:.1f}GB")

# Implement file chunking for large uploads
def process_large_file_in_chunks(file_path: str, chunk_size: int = 64*1024*1024):
    with open(file_path, 'rb') as f:
        while chunk := f.read(chunk_size):
            yield chunk

Issue: Transcription jobs stuck

# Check pending jobs
def diagnose_stuck_jobs():
    pending_jobs = transcription_manager.db.get_pending_jobs()
    print(f"Pending jobs: {len(pending_jobs)}")
    
    for job in pending_jobs:
        duration = datetime.now() - datetime.fromisoformat(job.created_at)
        print(f"Job {job.job_id}: {job.status} for {duration}")
        
        if duration.total_seconds() > 3600:  # 1 hour
            print(f"⚠️ Job {job.job_id} may be stuck")

# Reset stuck jobs
def reset_stuck_jobs():
    with transcription_manager.db.get_connection() as conn:
        conn.execute("""
            UPDATE transcriptions 
            SET status = 'pending', azure_trans_id = NULL
            WHERE status = 'processing' 
            AND created_at < datetime('now', '-1 hour')
        """)

Issue: Azure API errors

# Test Azure Speech Service
def test_azure_speech():
    try:
        url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/v3.2/transcriptions"
        headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
        
        response = requests.get(url, headers=headers)
        print(f"Status: {response.status_code}")
        print(f"Response: {response.text[:200]}")
        
    except Exception as e:
        print(f"Azure Speech test failed: {e}")

# Check Azure service status
def check_azure_status():
    # Check Azure status page
    status_url = "https://status.azure.com/en-us/status"
    print(f"Check Azure status: {status_url}")

Debugging Tools

Debug Mode Configuration

# Enable debug mode
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"

if DEBUG:
    logging.basicConfig(level=logging.DEBUG)
    
    # Enable Gradio debug mode
    demo.launch(debug=True, show_error=True)

Performance Profiling

import cProfile
import pstats

def profile_function(func):
    """Profile function performance"""
    profiler = cProfile.Profile()
    
    def wrapper(*args, **kwargs):
        profiler.enable()
        result = func(*args, **kwargs)
        profiler.disable()
        
        # Print stats
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(10)  # Top 10 functions
        
        return result
    
    return wrapper

# Usage
@profile_function
def submit_transcription(self, ...):
    # Function implementation
    pass

Log Analysis

def analyze_logs(log_file: str = "logs/transcription.log"):
    """Analyze application logs for issues"""
    
    errors = []
    warnings = []
    performance_issues = []
    
    with open(log_file, 'r') as f:
        for line in f:
            if 'ERROR' in line:
                errors.append(line.strip())
            elif 'WARNING' in line:
                warnings.append(line.strip())
            elif 'completed in' in line:
                # Extract timing information
                import re
                match = re.search(r'completed in (\d+\.\d+)s', line)
                if match and float(match.group(1)) > 30:  # > 30 seconds
                    performance_issues.append(line.strip())
    
    print(f"Errors: {len(errors)}")
    print(f"Warnings: {len(warnings)}")
    print(f"Performance issues: {len(performance_issues)}")
    
    return {
        'errors': errors[-10:],  # Last 10 errors
        'warnings': warnings[-10:],  # Last 10 warnings
        'performance_issues': performance_issues[-10:]
    }

Production Troubleshooting

Service Health Check

#!/bin/bash
# health_check.sh

echo "=== System Health Check ==="

# Check service status
systemctl is-active transcription
systemctl is-active nginx

# Check disk space
df -h

# Check memory usage
free -h

# Check CPU usage
top -b -n1 | grep "Cpu(s)"

# Check logs for errors
tail -n 50 /home/transcription/app/logs/transcription.log | grep ERROR

# Check Azure connectivity
curl -s -o /dev/null -w "%{http_code}" https://azure.microsoft.com/

echo "=== Health Check Complete ==="

Database Recovery

def recover_database():
    """Recover database from Azure backup"""
    try:
        # List available backups
        container_client = blob_service.get_container_client(AZURE_CONTAINER)
        backups = []
        
        for blob in container_client.list_blobs(name_starts_with="shared/backups/"):
            backups.append({
                'name': blob.name,
                'modified': blob.last_modified
            })
        
        # Sort by date (newest first)
        backups.sort(key=lambda x: x['modified'], reverse=True)
        
        if not backups:
            print("No backups found")
            return
        
        # Download latest backup
        latest_backup = backups[0]['name']
        print(f"Restoring from: {latest_backup}")
        
        blob_client = blob_service.get_blob_client(
            container=AZURE_CONTAINER,
            blob=latest_backup
        )
        
        # Download backup
        with open("database/transcriptions_restored.db", "wb") as f:
            f.write(blob_client.download_blob().readall())
        
        print("Database restored successfully")
        print("Restart the application to use restored database")
        
    except Exception as e:
        print(f"Database recovery failed: {str(e)}")

πŸ“š Additional Resources

Documentation Links

Useful Tools

  • Azure Storage Explorer: GUI for managing blob storage
  • DB Browser for SQLite: Visual database management
  • Postman: API testing and development
  • Azure CLI: Command-line Azure management
  • Visual Studio Code: Recommended IDE with Azure extensions

Community Resources


This developer guide provides comprehensive information for setting up, developing, deploying, and maintaining the Azure Speech Transcription service. For additional help, refer to the linked documentation and community resources. πŸš€