import os import glob import json import traceback import logging import gradio as gr import numpy as np import librosa import torch import asyncio import edge_tts import sys import io import wave from datetime import datetime from fairseq import checkpoint_utils from fairseq.data.dictionary import Dictionary from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from vc_infer_pipeline import VC from config import Config config = Config() logging.getLogger("numba").setLevel(logging.WARNING) spaces = True # Setup mode audio berdasarkan environment if spaces: audio_mode = ["Upload audio", "TTS Audio"] else: audio_mode = ["Input path", "Upload audio", "TTS Audio"] # Setup metode F0 f0method_mode = ["pm", "harvest"] f0method_info = "PM is fast, Harvest is good but can be slow on CPU." if os.path.isfile("rmvpe.pt"): f0method_mode.insert(2, "rmvpe") f0method_info = "PM is fast, Harvest is good but slow on CPU, Rvmpe is a good alternative." # Fungsi helper yang telah diperbaiki untuk memuat audio def _load_audio_input(vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, spaces_limit=20): temp_file = None if vc_audio_mode == "Input path" and vc_input: print(f"Loading audio from path: {vc_input}") audio, sr = librosa.load(vc_input, sr=16000, mono=True) return audio, sr, None if vc_audio_mode == "Upload audio": if vc_upload is None: raise ValueError("You need to upload an audio file.") sampling_rate, audio = vc_upload duration = len(audio) / sampling_rate print(f"Loading uploaded audio. Original SR: {sampling_rate}, Duration: {duration:.2f}s") if duration > spaces_limit and spaces: raise ValueError(f"Audio is too long (> {spaces_limit}s). Please upload a shorter file.") audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32) if len(audio.shape) > 1: audio = librosa.to_mono(audio.transpose(1, 0)) if sampling_rate != 16000: audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) return audio, 16000, None if vc_audio_mode == "TTS Audio": if not tts_text or not tts_voice: raise ValueError("You need to enter text and select a voice.") if len(tts_text) > 100 and spaces: raise ValueError("Text is too long (> 100 characters).") temp_file = "tts.mp3" print(f"Generating TTS audio for text: '{tts_text[:50]}...'") asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save(temp_file)) audio, sr = librosa.load(temp_file, sr=16000, mono=True) return audio, sr, temp_file raise ValueError("Invalid audio mode or missing input.") # Fungsi konversi utama yang sudah diperbaiki def create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, file_index): def vc_fn( vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, f0_up_key, f0_method, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, ): logs = [] temp_audio_file = None try: logs.append(f"Converting using {model_name}...") yield "\n".join(logs), None audio, sr, temp_audio_file = _load_audio_input( vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice ) logs.append("Audio successfully loaded.") logs.append(f"Starting RVC pipeline with F0 method: {f0_method}...") yield "\n".join(logs), None times = [0, 0, 0] f0_up_key = int(f0_up_key) audio_opt = vc.pipeline( hubert_model, net_g, 0, audio, vc_input if vc_input else temp_audio_file, times, f0_up_key, f0_method, file_index, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=None, ) info = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}]: npy: {times[0]}, f0: {times[1]}s, infer: {times[2]}s" print(f"{model_name} | {info}") logs.append(f"Successfully Converted!\n{info}") yield "\n".join(logs), (tgt_sr, audio_opt) except Exception as e: error_info = traceback.format_exc() print(f"An error occurred: {error_info}") return str(e), None finally: if temp_audio_file and os.path.exists(temp_audio_file): os.remove(temp_audio_file) return vc_fn # Fungsi load model (tidak berubah) def load_model(): categories = [] if os.path.isfile("weights/folder_info.json"): with open("weights/folder_info.json", "r", encoding="utf-8") as f: folder_info = json.load(f) for category_name, category_info in folder_info.items(): if not category_info.get('enable', True): continue category_title, category_folder, description = category_info['title'], category_info['folder_path'], category_info['description'] models = [] with open(f"weights/{category_folder}/model_info.json", "r", encoding="utf-8") as f: models_info = json.load(f) for character_name, info in models_info.items(): if not info.get('enable', True): continue model_title, model_name, model_author = info['title'], info['model_path'], info.get("author") model_cover = f"weights/{category_folder}/{character_name}/{info['cover']}" model_index = f"weights/{category_folder}/{character_name}/{info['feature_retrieval_library']}" cpt = torch.load(f"weights/{category_folder}/{character_name}/{model_name}", map_location="cpu") tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] if_f0 = cpt.get("f0", 1) version = cpt.get("version", "v1") if version == "v1": net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) if if_f0 == 1 else SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) elif version == "v2": net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) if if_f0 == 1 else SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) del net_g.enc_q print(net_g.load_state_dict(cpt["weight"], strict=False)) net_g.eval().to(config.device) net_g = net_g.half() if config.is_half else net_g.float() vc = VC(tgt_sr, config) print(f"Model loaded: {character_name} ({version})") models.append((character_name, model_title, model_author, model_cover, version, create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, model_index))) categories.append([category_title, category_folder, description, models]) return categories # Fungsi load Hubert (tidak berubah) def load_hubert(): global hubert_model torch.serialization.add_safe_globals([Dictionary]) models, _, _ = checkpoint_utils.load_model_ensemble_and_task(["hubert_base.pt"], suffix="",) hubert_model = models[0].to(config.device) hubert_model = hubert_model.half() if config.is_half else hubert_model.float() hubert_model.eval() # Fungsi untuk mengubah UI berdasarkan mode audio (disederhanakan) def change_audio_mode(vc_audio_mode): is_input_path = vc_audio_mode == "Input path" is_upload = vc_audio_mode == "Upload audio" is_tts = vc_audio_mode == "TTS Audio" return ( gr.Textbox.update(visible=is_input_path), gr.Checkbox.update(visible=is_upload), gr.Audio.update(visible=is_upload), gr.Textbox.update(visible=is_tts), gr.Dropdown.update(visible=is_tts) ) def use_microphone(microphone): return gr.Audio.update(source="microphone" if microphone else "upload") if __name__ == '__main__': load_hubert() categories = load_model() tts_voice_list = asyncio.new_event_loop().run_until_complete(edge_tts.list_voices()) voices = [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] with gr.Blocks(theme=gr.themes.Base()) as app: gr.Markdown("# RVC Blue Archive\n### Voice Conversion App") if not categories: gr.Markdown("## No model found. Please add models to the 'weights' folder.") for (folder_title, folder, description, models) in categories: with gr.TabItem(folder_title): if description: gr.Markdown(f"###
{description}
") with gr.Tabs(): if not models: gr.Markdown("##
No models loaded in this category.
") continue for (name, title, author, cover, model_version, vc_fn) in models: with gr.TabItem(name): with gr.Row(): gr.Markdown( f'
' f'
{title}
\n' f'
RVC {model_version} Model
\n' + (f'
Author: {author}
' if author else "") + (f'' if cover else "") + '
' ) with gr.Row(): with gr.Column(): vc_audio_mode = gr.Dropdown(label="Input Mode", choices=audio_mode, value="Upload audio") vc_input = gr.Textbox(label="Input Audio Path", visible=False) vc_microphone_mode = gr.Checkbox(label="Use Microphone", value=False, visible=True) vc_upload = gr.Audio(label="Upload Audio File", source="upload", visible=True) tts_text = gr.Textbox(label="TTS Text", info="Text to speech input", visible=False) tts_voice = gr.Dropdown(label="Edge-TTS Speaker", choices=voices, visible=False, value="en-US-AnaNeural-Female") with gr.Column(): vc_transform0 = gr.Number(label="Transpose", value=0, info='e.g., 12 for male to female') f0method0 = gr.Radio(label="Pitch Extraction Algorithm", info=f0method_info, choices=f0method_mode, value="pm") index_rate1 = gr.Slider(minimum=0, maximum=1, label="Retrieval Feature Ratio", value=0.7) filter_radius0 = gr.Slider(minimum=0, maximum=7, label="Median Filtering", value=3, step=1, info="Reduces breathiness") resample_sr0 = gr.Slider(minimum=0, maximum=48000, label="Output Resample Rate", value=0, step=1, info="0 for no resampling") rms_mix_rate0 = gr.Slider(minimum=0, maximum=1, label="Volume Envelope Ratio", value=1) protect0 = gr.Slider(minimum=0, maximum=0.5, label="Voice Protection", value=0.5, step=0.01, info="Protects voiceless consonants") with gr.Column(): vc_log = gr.Textbox(label="Output Information", interactive=False) vc_output = gr.Audio(label="Output Audio", interactive=False) vc_convert = gr.Button("Convert", variant="primary") vc_convert.click( fn=vc_fn, inputs=[ vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, vc_transform0, f0method0, index_rate1, filter_radius0, resample_sr0, rms_mix_rate0, protect0, ], outputs=[vc_log, vc_output] ) vc_audio_mode.change( fn=change_audio_mode, inputs=[vc_audio_mode], outputs=[vc_input, vc_microphone_mode, vc_upload, tts_text, tts_voice] ) vc_microphone_mode.change( fn=use_microphone, inputs=vc_microphone_mode, outputs=vc_upload ) app.queue(max_size=20).launch(share=False, server_name="0.0.0.0", server_port=7860)