Spaces:
Running
Running
| import os | |
| import glob | |
| import json | |
| import traceback | |
| import logging | |
| import gradio as gr | |
| import numpy as np | |
| import librosa | |
| import torch | |
| import asyncio | |
| import edge_tts | |
| import sys | |
| import io | |
| import wave | |
| from datetime import datetime | |
| from fairseq import checkpoint_utils | |
| from fairseq.data.dictionary import Dictionary | |
| from lib.infer_pack.models import ( | |
| SynthesizerTrnMs256NSFsid, | |
| SynthesizerTrnMs256NSFsid_nono, | |
| SynthesizerTrnMs768NSFsid, | |
| SynthesizerTrnMs768NSFsid_nono, | |
| ) | |
| from vc_infer_pipeline import VC | |
| from config import Config | |
| config = Config() | |
| logging.getLogger("numba").setLevel(logging.WARNING) | |
| spaces = True | |
| # Setup mode audio berdasarkan environment | |
| if spaces: | |
| audio_mode = ["Upload audio", "TTS Audio"] | |
| else: | |
| audio_mode = ["Input path", "Upload audio", "TTS Audio"] | |
| # Setup metode F0 | |
| f0method_mode = ["pm", "harvest"] | |
| f0method_info = "PM is fast, Harvest is good but can be slow on CPU." | |
| if os.path.isfile("rmvpe.pt"): | |
| f0method_mode.insert(2, "rmvpe") | |
| f0method_info = "PM is fast, Harvest is good but slow on CPU, Rvmpe is a good alternative." | |
| # Fungsi helper yang telah diperbaiki untuk memuat audio | |
| def _load_audio_input(vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, spaces_limit=20): | |
| temp_file = None | |
| if vc_audio_mode == "Input path" and vc_input: | |
| print(f"Loading audio from path: {vc_input}") | |
| audio, sr = librosa.load(vc_input, sr=16000, mono=True) | |
| return audio, sr, None | |
| if vc_audio_mode == "Upload audio": | |
| if vc_upload is None: | |
| raise ValueError("You need to upload an audio file.") | |
| sampling_rate, audio = vc_upload | |
| duration = len(audio) / sampling_rate | |
| print(f"Loading uploaded audio. Original SR: {sampling_rate}, Duration: {duration:.2f}s") | |
| if duration > spaces_limit and spaces: | |
| raise ValueError(f"Audio is too long (> {spaces_limit}s). Please upload a shorter file.") | |
| audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32) | |
| if len(audio.shape) > 1: | |
| audio = librosa.to_mono(audio.transpose(1, 0)) | |
| if sampling_rate != 16000: | |
| audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) | |
| return audio, 16000, None | |
| if vc_audio_mode == "TTS Audio": | |
| if not tts_text or not tts_voice: | |
| raise ValueError("You need to enter text and select a voice.") | |
| if len(tts_text) > 100 and spaces: | |
| raise ValueError("Text is too long (> 100 characters).") | |
| temp_file = "tts.mp3" | |
| print(f"Generating TTS audio for text: '{tts_text[:50]}...'") | |
| asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save(temp_file)) | |
| audio, sr = librosa.load(temp_file, sr=16000, mono=True) | |
| return audio, sr, temp_file | |
| raise ValueError("Invalid audio mode or missing input.") | |
| # Fungsi konversi utama yang sudah diperbaiki | |
| def create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, file_index): | |
| def vc_fn( | |
| vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, | |
| f0_up_key, f0_method, index_rate, filter_radius, | |
| resample_sr, rms_mix_rate, protect, | |
| ): | |
| logs = [] | |
| temp_audio_file = None | |
| try: | |
| logs.append(f"Converting using {model_name}...") | |
| yield "\n".join(logs), None | |
| audio, sr, temp_audio_file = _load_audio_input( | |
| vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice | |
| ) | |
| logs.append("Audio successfully loaded.") | |
| logs.append(f"Starting RVC pipeline with F0 method: {f0_method}...") | |
| yield "\n".join(logs), None | |
| times = [0, 0, 0] | |
| f0_up_key = int(f0_up_key) | |
| audio_opt = vc.pipeline( | |
| hubert_model, net_g, 0, audio, vc_input if vc_input else temp_audio_file, | |
| times, f0_up_key, f0_method, file_index, index_rate, | |
| if_f0, filter_radius, tgt_sr, resample_sr, | |
| rms_mix_rate, version, protect, f0_file=None, | |
| ) | |
| info = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}]: npy: {times[0]}, f0: {times[1]}s, infer: {times[2]}s" | |
| print(f"{model_name} | {info}") | |
| logs.append(f"Successfully Converted!\n{info}") | |
| yield "\n".join(logs), (tgt_sr, audio_opt) | |
| except Exception as e: | |
| error_info = traceback.format_exc() | |
| print(f"An error occurred: {error_info}") | |
| return str(e), None | |
| finally: | |
| if temp_audio_file and os.path.exists(temp_audio_file): | |
| os.remove(temp_audio_file) | |
| return vc_fn | |
| # Fungsi load model (tidak berubah) | |
| def load_model(): | |
| categories = [] | |
| if os.path.isfile("weights/folder_info.json"): | |
| with open("weights/folder_info.json", "r", encoding="utf-8") as f: | |
| folder_info = json.load(f) | |
| for category_name, category_info in folder_info.items(): | |
| if not category_info.get('enable', True): | |
| continue | |
| category_title, category_folder, description = category_info['title'], category_info['folder_path'], category_info['description'] | |
| models = [] | |
| with open(f"weights/{category_folder}/model_info.json", "r", encoding="utf-8") as f: | |
| models_info = json.load(f) | |
| for character_name, info in models_info.items(): | |
| if not info.get('enable', True): | |
| continue | |
| model_title, model_name, model_author = info['title'], info['model_path'], info.get("author") | |
| model_cover = f"weights/{category_folder}/{character_name}/{info['cover']}" | |
| model_index = f"weights/{category_folder}/{character_name}/{info['feature_retrieval_library']}" | |
| cpt = torch.load(f"weights/{category_folder}/{character_name}/{model_name}", map_location="cpu") | |
| tgt_sr = cpt["config"][-1] | |
| cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] | |
| if_f0 = cpt.get("f0", 1) | |
| version = cpt.get("version", "v1") | |
| if version == "v1": | |
| net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) if if_f0 == 1 else SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) | |
| elif version == "v2": | |
| net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) if if_f0 == 1 else SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) | |
| del net_g.enc_q | |
| print(net_g.load_state_dict(cpt["weight"], strict=False)) | |
| net_g.eval().to(config.device) | |
| net_g = net_g.half() if config.is_half else net_g.float() | |
| vc = VC(tgt_sr, config) | |
| print(f"Model loaded: {character_name} ({version})") | |
| models.append((character_name, model_title, model_author, model_cover, version, create_vc_fn(model_name, tgt_sr, net_g, vc, if_f0, version, model_index))) | |
| categories.append([category_title, category_folder, description, models]) | |
| return categories | |
| # Fungsi load Hubert (tidak berubah) | |
| def load_hubert(): | |
| global hubert_model | |
| torch.serialization.add_safe_globals([Dictionary]) | |
| models, _, _ = checkpoint_utils.load_model_ensemble_and_task(["hubert_base.pt"], suffix="",) | |
| hubert_model = models[0].to(config.device) | |
| hubert_model = hubert_model.half() if config.is_half else hubert_model.float() | |
| hubert_model.eval() | |
| # Fungsi untuk mengubah UI berdasarkan mode audio (disederhanakan) | |
| def change_audio_mode(vc_audio_mode): | |
| is_input_path = vc_audio_mode == "Input path" | |
| is_upload = vc_audio_mode == "Upload audio" | |
| is_tts = vc_audio_mode == "TTS Audio" | |
| return ( | |
| gr.Textbox.update(visible=is_input_path), | |
| gr.Checkbox.update(visible=is_upload), | |
| gr.Audio.update(visible=is_upload), | |
| gr.Textbox.update(visible=is_tts), | |
| gr.Dropdown.update(visible=is_tts) | |
| ) | |
| def use_microphone(microphone): | |
| return gr.Audio.update(source="microphone" if microphone else "upload") | |
| if __name__ == '__main__': | |
| load_hubert() | |
| categories = load_model() | |
| tts_voice_list = asyncio.new_event_loop().run_until_complete(edge_tts.list_voices()) | |
| voices = [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] | |
| with gr.Blocks(theme=gr.themes.Base()) as app: | |
| gr.Markdown("# RVC Blue Archive\n### Voice Conversion App") | |
| if not categories: | |
| gr.Markdown("## No model found. Please add models to the 'weights' folder.") | |
| for (folder_title, folder, description, models) in categories: | |
| with gr.TabItem(folder_title): | |
| if description: | |
| gr.Markdown(f"### <center>{description}</center>") | |
| with gr.Tabs(): | |
| if not models: | |
| gr.Markdown("## <center>No models loaded in this category.</center>") | |
| continue | |
| for (name, title, author, cover, model_version, vc_fn) in models: | |
| with gr.TabItem(name): | |
| with gr.Row(): | |
| gr.Markdown( | |
| f'<div align="center">' | |
| f'<div>{title}</div>\n' | |
| f'<div>RVC {model_version} Model</div>\n' | |
| + (f'<div>Author: {author}</div>' if author else "") | |
| + (f'<img style="width:auto;height:300px;" src="file/{cover}">' if cover else "") | |
| + '</div>' | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| vc_audio_mode = gr.Dropdown(label="Input Mode", choices=audio_mode, value="Upload audio") | |
| vc_input = gr.Textbox(label="Input Audio Path", visible=False) | |
| vc_microphone_mode = gr.Checkbox(label="Use Microphone", value=False, visible=True) | |
| vc_upload = gr.Audio(label="Upload Audio File", source="upload", visible=True) | |
| tts_text = gr.Textbox(label="TTS Text", info="Text to speech input", visible=False) | |
| tts_voice = gr.Dropdown(label="Edge-TTS Speaker", choices=voices, visible=False, value="en-US-AnaNeural-Female") | |
| with gr.Column(): | |
| vc_transform0 = gr.Number(label="Transpose", value=0, info='e.g., 12 for male to female') | |
| f0method0 = gr.Radio(label="Pitch Extraction Algorithm", info=f0method_info, choices=f0method_mode, value="pm") | |
| index_rate1 = gr.Slider(minimum=0, maximum=1, label="Retrieval Feature Ratio", value=0.7) | |
| filter_radius0 = gr.Slider(minimum=0, maximum=7, label="Median Filtering", value=3, step=1, info="Reduces breathiness") | |
| resample_sr0 = gr.Slider(minimum=0, maximum=48000, label="Output Resample Rate", value=0, step=1, info="0 for no resampling") | |
| rms_mix_rate0 = gr.Slider(minimum=0, maximum=1, label="Volume Envelope Ratio", value=1) | |
| protect0 = gr.Slider(minimum=0, maximum=0.5, label="Voice Protection", value=0.5, step=0.01, info="Protects voiceless consonants") | |
| with gr.Column(): | |
| vc_log = gr.Textbox(label="Output Information", interactive=False) | |
| vc_output = gr.Audio(label="Output Audio", interactive=False) | |
| vc_convert = gr.Button("Convert", variant="primary") | |
| vc_convert.click( | |
| fn=vc_fn, | |
| inputs=[ | |
| vc_audio_mode, vc_input, vc_upload, tts_text, tts_voice, | |
| vc_transform0, f0method0, index_rate1, filter_radius0, | |
| resample_sr0, rms_mix_rate0, protect0, | |
| ], | |
| outputs=[vc_log, vc_output] | |
| ) | |
| vc_audio_mode.change( | |
| fn=change_audio_mode, | |
| inputs=[vc_audio_mode], | |
| outputs=[vc_input, vc_microphone_mode, vc_upload, tts_text, tts_voice] | |
| ) | |
| vc_microphone_mode.change( | |
| fn=use_microphone, | |
| inputs=vc_microphone_mode, | |
| outputs=vc_upload | |
| ) | |
| app.queue(max_size=20).launch(share=False, server_name="0.0.0.0", server_port=7860) |