sungo-ganpare commited on
Commit
c9be4ad
·
1 Parent(s): f67cd0b

音声処理の設定を強化し、セグメント分割機能を改善。自然な区切り点を探す関数を追加し、VTTファイルのサイズ制限を設定。バッチサイズを2に変更し、エラーハンドリングを強化。

Browse files
Files changed (1) hide show
  1. app.py +138 -22
app.py CHANGED
@@ -19,7 +19,23 @@ except ImportError:
19
 
20
  # グローバル設定
21
  MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2"
22
- LONG_AUDIO_THRESHOLD_SECONDS = 480 # 8分 (この秒数を超えると長尺用設定を試みる)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  device = "cuda" if torch.cuda.is_available() else "cpu" # スクリプト起動時のデバイス検出
24
 
25
  # モデルの初期化 (グローバルに一度だけ行う)
@@ -31,6 +47,67 @@ model.eval()
31
  model.cpu()
32
  print("ASR model initialized and moved to CPU.")
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  def transcribe_audio_core(
36
  audio_path: str,
@@ -62,7 +139,7 @@ def transcribe_audio_core(
62
  gr.Info(f"Audio duration ({duration_sec:.2f}s) exceeds threshold. Applying long audio settings.", duration=3)
63
  try:
64
  print("Applying long audio settings: Local Attention and Chunking.")
65
- model.change_attention_model("rel_pos_local_attn", [256,256])
66
  model.change_subsampling_conv_chunking_factor(1)
67
  long_audio_settings_applied = True
68
  print("Successfully applied long audio settings.")
@@ -79,7 +156,7 @@ def transcribe_audio_core(
79
 
80
  # 文字起こし実行
81
  print(f"Transcribing {audio_path}...")
82
- output = model.transcribe([audio_path], timestamps=True)
83
  print("Transcription API call finished.")
84
 
85
  if not output or not isinstance(output, list) or not output[0] or \
@@ -91,14 +168,65 @@ def transcribe_audio_core(
91
  return None, None, None
92
 
93
  segment_timestamps = output[0].timestamp['segment']
94
- vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in segment_timestamps]
95
- raw_times_data = [[ts['start'], ts['end']] for ts in segment_timestamps]
96
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  word_timestamps_raw = output[0].timestamp.get("word", [])
98
- word_vis_data = [
99
- [f"{w['start']:.2f}", f"{w['end']:.2f}", w["word"]]
100
- for w in word_timestamps_raw if isinstance(w, dict) and 'start' in w and 'end' in w and 'word' in w
101
- ]
 
 
 
 
 
 
 
 
 
 
 
 
102
  gr.Info("Transcription successful!", duration=3)
103
  return vis_data, raw_times_data, word_vis_data
104
 
@@ -162,7 +290,6 @@ def process_audio_file(audio_filepath: str) -> dict: # Gradioから渡される
162
  # pydubが失敗しても、NeMoは処理を試みることができるので、duration_sec = 0 で続行
163
  duration_sec = 0
164
 
165
-
166
  # 文字起こしコア処理を呼び出し
167
  vis_data, raw_times_data, word_vis_data = transcribe_audio_core(audio_filepath, duration_sec, current_processing_device)
168
 
@@ -200,12 +327,6 @@ with gr.Blocks() as demo:
200
  file_input = gr.File(label="Upload Audio File", type="filepath") # type="filepath" を明示
201
  output_json = gr.JSON(label="Transcription Result")
202
 
203
- # .change() は非推奨なので .upload() を使うか、ボタンを使うのが一般的
204
- # ここでは元の構造に合わせて .change() を使うが、ファイル選択後すぐに処理が走る
205
- # 大容量ファイルの場合、アップロード完了を待つボタンの方がUXが良い
206
- # transcribe_button = gr.Button("Transcribe File")
207
-
208
- # transcribe_button.click(
209
  file_input.change( # ファイルがアップロード/変更されたら実行
210
  fn=process_audio_file,
211
  inputs=[file_input],
@@ -214,15 +335,11 @@ with gr.Blocks() as demo:
214
  gr.Examples(
215
  examples=[
216
  [os.path.join(os.path.dirname(__file__), "audio_example.wav") if os.path.exists(os.path.join(os.path.dirname(__file__), "audio_example.wav")) else "https://www.kozco.com/tech/piano2-CoolEdit.mp3"]
217
- # ダミーの音声ファイルパスまたはURL。実際に存在するファイルパスに置き換えてください。
218
- # Hugging Face Spacesで使う場合、���ポジトリにサンプル音声を含めてそのパスを指定するのが良いでしょう。
219
- # 例: ["sample_audio.wav"] (リポジトリのルートにsample_audio.wavを置く場合)
220
  ],
221
  inputs=[file_input],
222
  label="Example Audio (Click to load)"
223
  )
224
 
225
-
226
  if __name__ == "__main__":
227
  # ダミーの音声ファイルを作成 (Examples用、もし存在しなければ)
228
  example_dir = os.path.dirname(__file__)
@@ -242,7 +359,6 @@ if __name__ == "__main__":
242
  elif not PYDUB_AVAILABLE:
243
  print("Skipping dummy audio file creation as pydub is not available.")
244
 
245
-
246
  print("Launching Gradio demo...")
247
  demo.queue() # リクエストキューを有効化
248
  demo.launch()
 
19
 
20
  # グローバル設定
21
  MODEL_NAME = "nvidia/parakeet-tdt-0.6b-v2"
22
+ TARGET_SAMPLE_RATE = 16000
23
+ # 音声の長さに関する閾値 (秒)
24
+ LONG_AUDIO_THRESHOLD_SECONDS = 480 # 8分
25
+ VERY_LONG_AUDIO_THRESHOLD_SECONDS = 10800 # 3時間
26
+ # チャンク分割時の設定
27
+ CHUNK_LENGTH_SECONDS = 1800 # 30分
28
+ CHUNK_OVERLAP_SECONDS = 60 # 1分
29
+ # セグメント処理の設定
30
+ MAX_SEGMENT_LENGTH_SECONDS = 15 # 最大セグメント長(秒)を15秒に短縮
31
+ MAX_SEGMENT_CHARS = 100 # 最大セグメント文字数を100文字に短縮
32
+ MIN_SEGMENT_GAP_SECONDS = 0.3 # 最小セグメント間隔(秒)
33
+ # VTTファイルの最大サイズ(バイト)
34
+ MAX_VTT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB
35
+ # 文の区切り文字
36
+ SENTENCE_ENDINGS = ['.', '!', '?', '。', '!', '?']
37
+ SENTENCE_PAUSES = [',', '、', ';', ';', ':', ':']
38
+
39
  device = "cuda" if torch.cuda.is_available() else "cpu" # スクリプト起動時のデバイス検出
40
 
41
  # モデルの初期化 (グローバルに一度だけ行う)
 
47
  model.cpu()
48
  print("ASR model initialized and moved to CPU.")
49
 
50
+ def find_natural_break_point(text: str, max_length: int) -> int:
51
+ """テキスト内で自然な区切り点を探す"""
52
+ if len(text) <= max_length:
53
+ return len(text)
54
+
55
+ # 文末で区切る
56
+ for i in range(max_length, 0, -1):
57
+ if i < len(text) and text[i] in SENTENCE_ENDINGS:
58
+ return i + 1
59
+
60
+ # 文の区切りで区切る
61
+ for i in range(max_length, 0, -1):
62
+ if i < len(text) and text[i] in SENTENCE_PAUSES:
63
+ return i + 1
64
+
65
+ # スペースで区切る
66
+ for i in range(max_length, 0, -1):
67
+ if i < len(text) and text[i].isspace():
68
+ return i + 1
69
+
70
+ # それでも見つからない場合は最大長で区切る
71
+ return max_length
72
+
73
+ def split_segment(segment: dict, max_length_seconds: float, max_chars: int) -> List[dict]:
74
+ """セグメントを自然な区切りで分割する"""
75
+ if (segment['end'] - segment['start']) <= max_length_seconds and len(segment['segment']) <= max_chars:
76
+ return [segment]
77
+
78
+ result = []
79
+ current_text = segment['segment']
80
+ current_start = segment['start']
81
+ total_duration = segment['end'] - segment['start']
82
+
83
+ while current_text:
84
+ # 文字数に基づく分割点を探す
85
+ break_point = find_natural_break_point(current_text, max_chars)
86
+
87
+ # 時間に基づく分割点を計算
88
+ text_ratio = break_point / len(segment['segment'])
89
+ segment_duration = total_duration * text_ratio
90
+
91
+ # 分割点が最大長を超えないように調整
92
+ if segment_duration > max_length_seconds:
93
+ time_ratio = max_length_seconds / total_duration
94
+ break_point = int(len(segment['segment']) * time_ratio)
95
+ break_point = find_natural_break_point(current_text, break_point)
96
+ segment_duration = max_length_seconds
97
+
98
+ # 新しいセグメントを作成
99
+ new_segment = {
100
+ 'start': current_start,
101
+ 'end': current_start + segment_duration,
102
+ 'segment': current_text[:break_point].strip()
103
+ }
104
+ result.append(new_segment)
105
+
106
+ # 残りのテキストと開始時間を更新
107
+ current_text = current_text[break_point:].strip()
108
+ current_start = new_segment['end']
109
+
110
+ return result
111
 
112
  def transcribe_audio_core(
113
  audio_path: str,
 
139
  gr.Info(f"Audio duration ({duration_sec:.2f}s) exceeds threshold. Applying long audio settings.", duration=3)
140
  try:
141
  print("Applying long audio settings: Local Attention and Chunking.")
142
+ model.change_attention_model("rel_pos_local_attn", [128, 128]) # 256,256から128,128に変更
143
  model.change_subsampling_conv_chunking_factor(1)
144
  long_audio_settings_applied = True
145
  print("Successfully applied long audio settings.")
 
156
 
157
  # 文字起こし実行
158
  print(f"Transcribing {audio_path}...")
159
+ output = model.transcribe([audio_path], timestamps=True, batch_size=2) # バッチサイズを2に設定
160
  print("Transcription API call finished.")
161
 
162
  if not output or not isinstance(output, list) or not output[0] or \
 
168
  return None, None, None
169
 
170
  segment_timestamps = output[0].timestamp['segment']
171
+
172
+ # セグメントの前処理:より適切なセグメント分割
173
+ processed_segments = []
174
+ current_segment = None
175
+
176
+ for ts in segment_timestamps:
177
+ if current_segment is None:
178
+ current_segment = ts
179
+ else:
180
+ # セグメント結合の条件を厳格化
181
+ time_gap = ts['start'] - current_segment['end']
182
+ current_text = current_segment['segment']
183
+ next_text = ts['segment']
184
+
185
+ # 結合条件のチェック
186
+ should_merge = (
187
+ time_gap < MIN_SEGMENT_GAP_SECONDS and # 時間間隔が短い
188
+ len(current_text) + len(next_text) < MAX_SEGMENT_CHARS and # 文字数制限
189
+ (current_segment['end'] - current_segment['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 現在のセグメントが短い
190
+ (ts['end'] - ts['start']) < MAX_SEGMENT_LENGTH_SECONDS and # 次のセグメントが短い
191
+ not any(current_text.strip().endswith(p) for p in SENTENCE_ENDINGS) # 文の区切りでない
192
+ )
193
+
194
+ if should_merge:
195
+ current_segment['end'] = ts['end']
196
+ current_segment['segment'] += ' ' + ts['segment']
197
+ else:
198
+ # 現在のセグメントを分割
199
+ split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS)
200
+ processed_segments.extend(split_segments)
201
+ current_segment = ts
202
+
203
+ if current_segment is not None:
204
+ # 最後のセグメントも分割
205
+ split_segments = split_segment(current_segment, MAX_SEGMENT_LENGTH_SECONDS, MAX_SEGMENT_CHARS)
206
+ processed_segments.extend(split_segments)
207
+
208
+ # 処理済みセグメントからデータを生成
209
+ vis_data = [[f"{ts['start']:.2f}", f"{ts['end']:.2f}", ts['segment']] for ts in processed_segments]
210
+ raw_times_data = [[ts['start'], ts['end']] for ts in processed_segments]
211
+
212
+ # 単語タイムスタンプの処理を改善
213
  word_timestamps_raw = output[0].timestamp.get("word", [])
214
+ word_vis_data = []
215
+
216
+ for w in word_timestamps_raw:
217
+ if not isinstance(w, dict) or not all(k in w for k in ['start', 'end', 'word']):
218
+ continue
219
+
220
+ # 単語のタイムスタンプを最も近いセグメントに割り当て
221
+ word_start = float(w['start'])
222
+ word_end = float(w['end'])
223
+
224
+ # 単語が完全に含まれるセグメントを探す
225
+ for seg in processed_segments:
226
+ if word_start >= seg['start'] - 0.05 and word_end <= seg['end'] + 0.05:
227
+ word_vis_data.append([f"{word_start:.2f}", f"{word_end:.2f}", w["word"]])
228
+ break
229
+
230
  gr.Info("Transcription successful!", duration=3)
231
  return vis_data, raw_times_data, word_vis_data
232
 
 
290
  # pydubが失敗しても、NeMoは処理を試みることができるので、duration_sec = 0 で続行
291
  duration_sec = 0
292
 
 
293
  # 文字起こしコア処理を呼び出し
294
  vis_data, raw_times_data, word_vis_data = transcribe_audio_core(audio_filepath, duration_sec, current_processing_device)
295
 
 
327
  file_input = gr.File(label="Upload Audio File", type="filepath") # type="filepath" を明示
328
  output_json = gr.JSON(label="Transcription Result")
329
 
 
 
 
 
 
 
330
  file_input.change( # ファイルがアップロード/変更されたら実行
331
  fn=process_audio_file,
332
  inputs=[file_input],
 
335
  gr.Examples(
336
  examples=[
337
  [os.path.join(os.path.dirname(__file__), "audio_example.wav") if os.path.exists(os.path.join(os.path.dirname(__file__), "audio_example.wav")) else "https://www.kozco.com/tech/piano2-CoolEdit.mp3"]
 
 
 
338
  ],
339
  inputs=[file_input],
340
  label="Example Audio (Click to load)"
341
  )
342
 
 
343
  if __name__ == "__main__":
344
  # ダミーの音声ファイルを作成 (Examples用、もし存在しなければ)
345
  example_dir = os.path.dirname(__file__)
 
359
  elif not PYDUB_AVAILABLE:
360
  print("Skipping dummy audio file creation as pydub is not available.")
361
 
 
362
  print("Launching Gradio demo...")
363
  demo.queue() # リクエストキューを有効化
364
  demo.launch()