Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, BackgroundTasks | |
| import edge_tts | |
| import asyncio | |
| import os | |
| import time | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from typing import List | |
| import pydub | |
| app = FastAPI() | |
| # Global dictionary to track active requests | |
| active_requests = {} | |
| def split_text(text, max_chunk_size=500): | |
| """Split text into chunks at sentence boundaries.""" | |
| sentences = text.replace('।', '.').replace('؟', '?').split('.') | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for sentence in sentences: | |
| sentence = sentence.strip() + '.' | |
| sentence_length = len(sentence) | |
| if current_length + sentence_length > max_chunk_size and current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [] | |
| current_length = 0 | |
| current_chunk.append(sentence) | |
| current_length += sentence_length | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| async def process_chunk(text, voice, temp_dir, chunk_index): | |
| """Process a single chunk of text asynchronously.""" | |
| tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}_{int(time.time())}.mp3") | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| async def combine_audio_files(chunk_files, output_path): | |
| """Combine multiple MP3 files into one.""" | |
| combined = pydub.AudioSegment.empty() | |
| for file in chunk_files: | |
| audio_segment = pydub.AudioSegment.from_mp3(file) | |
| combined += audio_segment | |
| combined.export(output_path, format="mp3") | |
| # Cleanup chunk files | |
| for file in chunk_files: | |
| try: | |
| os.remove(file) | |
| except: | |
| pass | |
| def home(): | |
| return {"message": "EdgeTTS FastAPI is running!"} | |
| def health_check(): | |
| """Check if the API is running and how many requests are active.""" | |
| return {"status": "running", "active_requests": len(active_requests)} | |
| def status(): | |
| """Return the list of active requests being processed.""" | |
| return {"active_requests": list(active_requests.keys())} | |
| async def tts(text: str, voice: str = "en-US-JennyNeural", background_tasks: BackgroundTasks = None): | |
| """Generate speech from text using EdgeTTS with parallel processing.""" | |
| request_id = f"{int(time.time())}_{os.urandom(4).hex()}" | |
| active_requests[request_id] = "processing" | |
| try: | |
| output_file = f"output_{request_id}.mp3" | |
| temp_dir = f"temp_{request_id}" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| chunks = split_text(text) | |
| tasks = [process_chunk(chunk, voice, temp_dir, i) for i, chunk in enumerate(chunks)] | |
| chunk_files = await asyncio.gather(*tasks) | |
| await combine_audio_files(chunk_files, output_file) | |
| background_tasks.add_task(cleanup_request, request_id) | |
| return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") | |
| except Exception as e: | |
| del active_requests[request_id] | |
| return JSONResponse(content={"error": str(e)}, status_code=500) | |
| def cleanup_request(request_id): | |
| """Cleanup function to remove temporary files.""" | |
| del active_requests[request_id] | |
| temp_dir = f"temp_{request_id}" | |
| if os.path.exists(temp_dir): | |
| for file in os.listdir(temp_dir): | |
| os.remove(os.path.join(temp_dir, file)) | |
| os.rmdir(temp_dir) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |