Spaces:
Runtime error
Runtime error
| import requests | |
| import time | |
| from scipy.io.wavfile import write | |
| import io | |
| upload_endpoint = "https://api.assemblyai.com/v2/upload" | |
| transcript_endpoint = "https://api.assemblyai.com/v2/transcript" | |
| def make_header(api_key): | |
| return {"authorization": api_key, "content-type": "application/json"} | |
| def _read_file(filename, chunk_size=5242880): | |
| """Reads the file in chunks. Helper for `upload_file()`""" | |
| with open(filename, "rb") as f: | |
| while True: | |
| data = f.read(chunk_size) | |
| if not data: | |
| break | |
| yield data | |
| def _read_array(audio, chunk_size=5242880): | |
| """Like _read_file but for array - creates temporary unsaved "file" from sample rate and audio np.array""" | |
| sr, aud = audio | |
| # Create temporary "file" and write data to it | |
| bytes_wav = bytes() | |
| temp_file = io.BytesIO(bytes_wav) | |
| write(temp_file, sr, aud) | |
| while True: | |
| data = temp_file.read(chunk_size) | |
| if not data: | |
| break | |
| yield data | |
| def upload_file(audio_file, header, is_file=True): | |
| """Uploads a file to AssemblyAI""" | |
| upload_response = requests.post( | |
| upload_endpoint, | |
| headers=header, | |
| data=_read_file(audio_file) if is_file else _read_array(audio_file), | |
| ) | |
| if upload_response.status_code != 200: | |
| upload_response.raise_for_status() | |
| # Returns {'upload_url': <URL>} | |
| return upload_response.json() | |
| def request_transcript(upload_url, header): | |
| """Requests a transcript from AssemblyAI""" | |
| # If input is a dict returned from `upload_file` rather than a raw upload_url string | |
| if type(upload_url) is dict: | |
| upload_url = upload_url["upload_url"] | |
| # Create request | |
| transcript_request = { | |
| "audio_url": upload_url, | |
| } | |
| # POST request | |
| transcript_response = requests.post( | |
| transcript_endpoint, json=transcript_request, headers=header | |
| ) | |
| return transcript_response.json() | |
| def wait_for_completion(transcript_id, header): | |
| """Given a polling endpoint, waits for the transcription/audio analysis to complete""" | |
| polling_endpoint = "https://api.assemblyai.com/v2/transcript/" + transcript_id | |
| while True: | |
| polling_response = requests.get(polling_endpoint, headers=header) | |
| polling_response = polling_response.json() | |
| if polling_response["status"] == "completed": | |
| return polling_response, None | |
| elif polling_response["status"] == "error": | |
| return None, f"Error: {polling_response['error']}" | |
| time.sleep(5) | |
| def make_paragraphs_string(transc_id, header): | |
| endpoint = transcript_endpoint + "/" + transc_id + "/paragraphs" | |
| paras = requests.get(endpoint, headers=header).json()["paragraphs"] | |
| return "\n\n".join(i["text"] for i in paras) | |