from nemo.collections.asr.models import ASRModel import torch import gradio as gr # import spaces import gc import shutil from pathlib import Path from pydub import AudioSegment import numpy as np import os import gradio.themes as gr_themes import csv import json device = "cuda" if torch.cuda.is_available() else "cpu" MODEL_NAME="nvidia/parakeet-tdt-0.6b-v2" model = ASRModel.from_pretrained(model_name=MODEL_NAME) model.eval() def start_session(request: gr.Request): session_hash = request.session_hash session_dir = Path(f'/tmp/{session_hash}') session_dir.mkdir(parents=True, exist_ok=True) print(f"Session with hash {session_hash} started.") return session_dir.as_posix() def end_session(request: gr.Request): session_hash = request.session_hash session_dir = Path(f'/tmp/{session_hash}') if session_dir.exists(): shutil.rmtree(session_dir) print(f"Session with hash {session_hash} ended.") def get_audio_segment(audio_path, start_second, end_second): if not audio_path or not Path(audio_path).exists(): print(f"Warning: Audio path '{audio_path}' not found or invalid for clipping.") return None try: start_ms = int(start_second * 1000) end_ms = int(end_second * 1000) start_ms = max(0, start_ms) if end_ms <= start_ms: print(f"Warning: End time ({end_second}s) is not after start time ({start_second}s). Adjusting end time.") end_ms = start_ms + 100 audio = AudioSegment.from_file(audio_path) clipped_audio = audio[start_ms:end_ms] samples = np.array(clipped_audio.get_array_of_samples()) if clipped_audio.channels == 2: samples = samples.reshape((-1, 2)).mean(axis=1).astype(samples.dtype) frame_rate = clipped_audio.frame_rate if frame_rate <= 0: print(f"Warning: Invalid frame rate ({frame_rate}) detected for clipped audio.") frame_rate = audio.frame_rate # Fallback to original audio frame rate if samples.size == 0: print(f"Warning: Clipped audio resulted in empty samples array ({start_second}s to {end_second}s).") return None return (frame_rate, samples) except FileNotFoundError: print(f"Error: Audio file not found at path: {audio_path}") return None except Exception as e: print(f"Error clipping audio {audio_path} from {start_second}s to {end_second}s: {e}") return None # @spaces.GPU def get_transcripts_and_raw_times(audio_path, session_dir): if not audio_path: gr.Error("No audio file path provided for transcription.", duration=None) return [], [], [], None, gr.DownloadButton(visible=False) # vis_data, raw_times_data, char_vis_data, audio_path, button_update vis_data = [["N/A", "N/A", "Processing failed"]] raw_times_data = [[0.0, 0.0]] char_vis_data = [] # Initialize char_vis_data processed_audio_path = None # csv_file_path = None # Declared later original_path_name = Path(audio_path).name audio_name = Path(audio_path).stem try: try: gr.Info(f"Loading audio: {original_path_name}", duration=2) audio = AudioSegment.from_file(audio_path) duration_sec = audio.duration_seconds except Exception as load_e: gr.Error(f"Failed to load audio file {original_path_name}: {load_e}", duration=None) return [["Error", "Error", "Load failed"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) resampled = False mono = False target_sr = 16000 if audio.frame_rate != target_sr: try: audio = audio.set_frame_rate(target_sr) resampled = True except Exception as resample_e: gr.Error(f"Failed to resample audio: {resample_e}", duration=None) return [["Error", "Error", "Resample failed"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) if audio.channels == 2: try: audio = audio.set_channels(1) mono = True except Exception as mono_e: gr.Error(f"Failed to convert audio to mono: {mono_e}", duration=None) return [["Error", "Error", "Mono conversion failed"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) elif audio.channels > 2: gr.Error(f"Audio has {audio.channels} channels. Only mono (1) or stereo (2) supported.", duration=None) return [["Error", "Error", f"{audio.channels}-channel audio not supported"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) if resampled or mono: try: processed_audio_path = Path(session_dir, f"{audio_name}_resampled.wav") audio.export(processed_audio_path, format="wav") transcribe_path = processed_audio_path.as_posix() info_path_name = f"{original_path_name} (processed)" except Exception as export_e: gr.Error(f"Failed to export processed audio: {export_e}", duration=None) if processed_audio_path and os.path.exists(processed_audio_path): os.remove(processed_audio_path) return [["Error", "Error", "Export failed"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) else: transcribe_path = audio_path info_path_name = original_path_name long_audio_settings_applied = False try: model.to(device) model.to(torch.float32) gr.Info(f"Transcribing {info_path_name} on {device}...", duration=2) if duration_sec > 480 : # 8 minutes try: gr.Info("Audio longer than 8 minutes. Applying optimized settings for long transcription.", duration=3) print("Applying long audio settings: Local Attention and Chunking.") model.change_attention_model("rel_pos_local_attn", [256,256]) model.change_subsampling_conv_chunking_factor(1) long_audio_settings_applied = True except Exception as setting_e: gr.Warning(f"Could not apply long audio settings: {setting_e}", duration=5) print(f"Warning: Failed to apply long audio settings: {setting_e}") model.to(torch.bfloat16) output = model.transcribe([transcribe_path], timestamps=True) if not output or not isinstance(output, list) or not output[0] or not hasattr(output[0], 'timestamp') or not output[0].timestamp or 'segment' not in output[0].timestamp: gr.Error("Transcription failed or produced unexpected output format.", duration=None) return [["Error", "Error", "Transcription Format Issue"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) segment_timestamps = output[0].timestamp['segment'] csv_headers = ["Start (s)", "End (s)", "Segment"] vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in segment_timestamps] raw_times_data = [[ts['start'], ts['end']] for ts in segment_timestamps] # 文字タイムスタンプ(char)を追加で抽出 char_timestamps_raw = output[0].timestamp.get("char", []) if not isinstance(char_timestamps_raw, list): print(f"Warning: char_timestamps_raw is not a list, but {type(char_timestamps_raw)}. Defaulting to empty.") char_timestamps_raw = [] char_vis_data = [ [f"{c['start']:.2f}", f"{c['end']:.2f}", c["char"]] for c in char_timestamps_raw if isinstance(c, dict) and 'start' in c and 'end' in c and 'char' in c ] # 単語タイムスタンプ(word)を追加で抽出 word_timestamps_raw = output[0].timestamp.get("word", []) word_vis_data = [ [f"{w['start']:.2f}", f"{w['end']:.2f}", w["word"]] for w in word_timestamps_raw if isinstance(w, dict) and 'start' in w and 'end' in w and 'word' in w ] button_update = gr.DownloadButton(visible=False) srt_file_path = None vtt_file_path = None json_file_path = None lrc_file_path = None try: csv_file_path = Path(session_dir, f"transcription_{audio_name}.csv") with open(csv_file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(csv_headers) writer.writerows(vis_data) print(f"CSV transcript saved to temporary file: {csv_file_path}") button_update = gr.DownloadButton(value=csv_file_path.as_posix(), visible=True) # SRT, VTT, JSON も保存 srt_file_path = Path(session_dir, f"transcription_{audio_name}.srt") vtt_file_path = Path(session_dir, f"transcription_{audio_name}.vtt") json_file_path = Path(session_dir, f"transcription_{audio_name}.json") write_srt(vis_data, srt_file_path) write_vtt(vis_data, word_vis_data, vtt_file_path) write_json(vis_data, word_vis_data, json_file_path) print(f"SRT, VTT, JSON transcript saved to temporary files: {srt_file_path}, {vtt_file_path}, {json_file_path}") # LRC も保存 lrc_file_path = Path(session_dir, f"transcription_{audio_name}.lrc") write_lrc(vis_data, lrc_file_path) print(f"LRC transcript saved to temporary file: {lrc_file_path}") except Exception as csv_e: gr.Error(f"Failed to create transcript files: {csv_e}", duration=None) print(f"Error writing transcript files: {csv_e}") gr.Info("Transcription complete.", duration=2) # 4つのファイルパスを返す return ( vis_data, raw_times_data, word_vis_data, audio_path, gr.DownloadButton(value=csv_file_path.as_posix(), visible=True), gr.DownloadButton(value=srt_file_path.as_posix(), visible=True), gr.DownloadButton(value=vtt_file_path.as_posix(), visible=True), gr.DownloadButton(value=json_file_path.as_posix(), visible=True), gr.DownloadButton(value=lrc_file_path.as_posix(), visible=True) ) except torch.cuda.OutOfMemoryError as e: error_msg = 'CUDA out of memory. Please try a shorter audio or reduce GPU load.' print(f"CUDA OutOfMemoryError: {e}") gr.Error(error_msg, duration=None) return [["OOM", "OOM", error_msg]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) except FileNotFoundError: error_msg = f"Audio file for transcription not found: {Path(transcribe_path).name}." print(f"Error: Transcribe audio file not found at path: {transcribe_path}") gr.Error(error_msg, duration=None) return [["Error", "Error", "File not found for transcription"]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) except Exception as e: error_msg = f"Transcription failed: {e}" print(f"Error during transcription processing: {e}") gr.Error(error_msg, duration=None) # Ensure vis_data, raw_times_data, char_vis_data are returned with appropriate error content or defaults return [["Error", "Error", error_msg]], [[0.0, 0.0]], [], audio_path, gr.DownloadButton(visible=False) finally: try: if long_audio_settings_applied: try: print("Reverting long audio settings.") model.change_attention_model("rel_pos") model.change_subsampling_conv_chunking_factor(-1) # long_audio_settings_applied = False # Flag already local to function scope except Exception as revert_e: print(f"Warning: Failed to revert long audio settings: {revert_e}") gr.Warning(f"Issue reverting model settings after long transcription: {revert_e}", duration=5) if 'model' in locals() and hasattr(model, 'cpu'): if device == 'cuda': model.cpu() gc.collect() if device == 'cuda': torch.cuda.empty_cache() except Exception as cleanup_e: print(f"Error during model cleanup: {cleanup_e}") gr.Warning(f"Issue during model cleanup: {cleanup_e}", duration=5) finally: # This finally is for the outer try block concerning processed_audio_path removal if processed_audio_path and os.path.exists(processed_audio_path): try: os.remove(processed_audio_path) print(f"Temporary audio file {processed_audio_path} removed.") except Exception as e: print(f"Error removing temporary audio file {processed_audio_path}: {e}") def play_segment(evt: gr.SelectData, raw_ts_list, current_audio_path): if not isinstance(raw_ts_list, list): print(f"Warning: raw_ts_list is not a list ({type(raw_ts_list)}). Cannot play segment.") return gr.Audio(value=None, label="Selected Segment") if not current_audio_path: print("No audio path available to play segment from.") return gr.Audio(value=None, label="Selected Segment") selected_index = evt.index[0] if selected_index < 0 or selected_index >= len(raw_ts_list): print(f"Invalid index {selected_index} selected for list of length {len(raw_ts_list)}.") return gr.Audio(value=None, label="Selected Segment") if not isinstance(raw_ts_list[selected_index], (list, tuple)) or len(raw_ts_list[selected_index]) != 2: print(f"Warning: Data at index {selected_index} is not in the expected format [start, end].") return gr.Audio(value=None, label="Selected Segment") start_time_s, end_time_s = raw_ts_list[selected_index] print(f"Attempting to play segment: {current_audio_path} from {start_time_s:.2f}s to {end_time_s:.2f}s") segment_data = get_audio_segment(current_audio_path, start_time_s, end_time_s) if segment_data: print("Segment data retrieved successfully.") return gr.Audio(value=segment_data, autoplay=True, label=f"Segment: {start_time_s:.2f}s - {end_time_s:.2f}s", interactive=False) else: print("Failed to get audio segment data.") return gr.Audio(value=None, label="Selected Segment") def write_srt(segments, path): def sec2srt(t): h, rem = divmod(int(float(t)), 3600) m, s = divmod(rem, 60) ms = int((float(t) - int(float(t))) * 1000) return f"{h:02}:{m:02}:{s:02},{ms:03}" with open(path, "w", encoding="utf-8") as f: for i, seg in enumerate(segments, 1): f.write(f"{i}\n{sec2srt(seg[0])} --> {sec2srt(seg[1])}\n{seg[2]}\n\n") def write_vtt(segments, words, path): def sec2vtt(t): h, rem = divmod(int(float(t)), 3600) m, s = divmod(rem, 60) ms = int((float(t) - int(float(t))) * 1000) return f"{h:02}:{m:02}:{s:02}.{ms:03}" with open(path, "w", encoding="utf-8") as f: f.write("WEBVTT\n\n") word_idx = 0 for seg in segments: s_start = float(seg[0]) s_end = float(seg[1]) s_text = seg[2] # このセグメントに含まれる単語を抽出 segment_words = [] while word_idx < len(words): w = words[word_idx] w_start = float(w[0]) w_end = float(w[1]) if w_start >= s_start and w_end <= s_end: segment_words.append(w) word_idx += 1 elif w_end < s_start: word_idx += 1 else: break # 各単語ごとにタイムスタンプを生成 for i, w in enumerate(segment_words): w_start = float(w[0]) w_end = float(w[1]) w_text = w[2] # 現在の単語を強調表示し、他の単語は通常表示 colored_text = "" for j, other_w in enumerate(segment_words): if j == i: colored_text += f"{other_w[2]} " else: colored_text += f"{other_w[2]} " f.write(f"{sec2vtt(w_start)} --> {sec2vtt(w_end)}\n{colored_text.strip()}\n\n") def write_json(segments, words, path): # segments: [[start, end, text], ...] # words: [[start, end, word], ...] result = {"segments": []} word_idx = 0 for s in segments: s_start = float(s[0]) s_end = float(s[1]) s_text = s[2] word_list = [] # wordのstartがこのsegmentの範囲内のものを抽出 while word_idx < len(words): w = words[word_idx] w_start = float(w[0]) w_end = float(w[1]) if w_start >= s_start and w_end <= s_end: word_list.append({"start": w_start, "end": w_end, "word": w[2]}) word_idx += 1 elif w_end < s_start: word_idx += 1 else: break result["segments"].append({ "start": s_start, "end": s_end, "text": s_text, "words": word_list }) with open(path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) def write_lrc(segments, path): # segments: [[start, end, text], ...] def sec2lrc(t): m, s = divmod(float(t), 60) return f"[{int(m):02}:{s:05.2f}]" with open(path, "w", encoding="utf-8") as f: for seg in segments: f.write(f"{sec2lrc(seg[0])}{seg[2]}\n") article = ( "

" "This demo showcases parakeet-tdt-0.6b-v2, a 600-million-parameter model designed for high-quality English speech recognition." "

" "

Key Features:

" "" "

" "This model is available for commercial and non-commercial use." "

" "

" "🎙️ Learn more about the Model | " "📄 Fast Conformer paper | " "📚 TDT paper | " "🧑‍💻 NeMo Repository" "

" ) examples = [ ["data/example-yt_saTD1u8PorI.mp3"], ] nvidia_theme = gr_themes.Default( primary_hue=gr_themes.Color( c50="#E6F1D9", c100="#CEE3B3", c200="#B5D58C", c300="#9CC766", c400="#84B940", c500="#76B900", c600="#68A600", c700="#5A9200", c800="#4C7E00", c900="#3E6A00", c950="#2F5600" ), neutral_hue="gray", font=[gr_themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"], ).set() with gr.Blocks(theme=nvidia_theme) as demo: model_display_name = MODEL_NAME.split('/')[-1] if '/' in MODEL_NAME else MODEL_NAME gr.Markdown(f"

Speech Transcription with {model_display_name}

") gr.HTML(article) current_audio_path_state = gr.State(None) raw_timestamps_list_state = gr.State([]) # For segment timestamps session_dir_state = gr.State() # Renamed to avoid conflict with session_dir variable in functions demo.load(start_session, outputs=[session_dir_state]) with gr.Tabs(): with gr.TabItem("Audio File"): file_input = gr.Audio(sources=["upload"], type="filepath", label="Upload Audio File") gr.Examples(examples=examples, inputs=[file_input], label="Example Audio Files (Click to Load)") file_transcribe_btn = gr.Button("Transcribe Uploaded File", variant="primary") with gr.TabItem("Microphone"): mic_input = gr.Audio(sources=["microphone"], type="filepath", label="Record Audio") mic_transcribe_btn = gr.Button("Transcribe Microphone Input", variant="primary") gr.Markdown("---") gr.Markdown("

Transcription Results

") download_btn = gr.DownloadButton(label="Download Segment Transcript (CSV)", visible=False) srt_btn = gr.DownloadButton(label="Download SRT", visible=False) vtt_btn = gr.DownloadButton(label="Download VTT", visible=False) json_btn = gr.DownloadButton(label="Download JSON", visible=False) lrc_btn = gr.DownloadButton(label="Download LRC", visible=False) with gr.Tabs(): # Tabs for result views with gr.TabItem("Segment View (Click row to play segment)"): vis_timestamps_df = gr.DataFrame( headers=["Start (s)", "End (s)", "Segment"], datatype=["number", "number", "str"], wrap=True, # label="Transcription Segments" # Label provided by tab ) selected_segment_player = gr.Audio(label="Selected Segment", interactive=False) with gr.TabItem("Word View"): word_vis_df = gr.DataFrame( # Define word_vis_df here headers=["Start (s)", "End (s)", "Word"], datatype=["number", "number", "str"], wrap=False, # As specified in diff # label="Word Timestamps" # Label provided by tab ) # Ensure outputs list matches the return order of get_transcripts_and_raw_times: # vis_data, raw_times_data, char_vis_data, audio_path, button_update # maps to: # vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn mic_transcribe_btn.click( fn=get_transcripts_and_raw_times, inputs=[mic_input, session_dir_state], outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], api_name="transcribe_mic" ) file_transcribe_btn.click( fn=get_transcripts_and_raw_times, inputs=[file_input, session_dir_state], outputs=[vis_timestamps_df, raw_timestamps_list_state, word_vis_df, current_audio_path_state, download_btn, srt_btn, vtt_btn, json_btn, lrc_btn], api_name="transcribe_file" ) vis_timestamps_df.select( fn=play_segment, inputs=[raw_timestamps_list_state, current_audio_path_state], outputs=[selected_segment_player], ) demo.unload(end_session) if __name__ == "__main__": print("Launching Gradio Demo...") demo.queue() demo.launch()