Qwen-Training / app.py
rahul7star's picture
Update app.py
58ba31b verified
"""
PromptWizard Qwen Training — Configurable Dataset & Repo
Fine-tunes Qwen using a user-selected dataset and uploads the trained model
to a user-specified Hugging Face Hub repo asynchronously with detailed logs.
"""
import gradio as gr
import spaces
import torch
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
Trainer,
TrainingArguments,
)
from datasets import load_dataset
from peft import LoraConfig, get_peft_model, TaskType
from huggingface_hub import HfApi, HfFolder, Repository
import os, tempfile, shutil, asyncio, threading, time
from datetime import datetime
# ==== Async upload wrapper ====
def start_async_upload(local_dir, hf_repo, output_log):
"""Starts async model upload in a background thread."""
def runner():
output_log.append(f"[INFO] 🚀 Async upload thread started for repo: {hf_repo}")
asyncio.run(async_upload_model(local_dir, hf_repo, output_log))
output_log.append(f"[INFO] 🛑 Async upload thread finished for repo: {hf_repo}")
threading.Thread(target=runner, daemon=True).start()
from huggingface_hub import upload_folder, HfFolder
async def async_upload_model(local_dir, hf_repo, output_log, max_retries=3):
"""
Uploads a local model directory to HF Hub asynchronously using HTTP API.
"""
try:
token = HfFolder.get_token()
output_log.append(f"[INFO] ☁️ Preparing to upload to repo: {hf_repo}")
attempt = 0
while attempt < max_retries:
try:
output_log.append(f"[INFO] 🔄 Attempt {attempt+1} to upload folder via HTTP API...")
upload_folder(
folder_path=local_dir,
repo_id=hf_repo,
repo_type="model",
token=token,
ignore_patterns=["*.lock", "*.tmp"], # ignore temp files
create_pr=False,
)
output_log.append("[SUCCESS] ✅ Model successfully uploaded to HF Hub!")
break
except Exception as e:
attempt += 1
output_log.append(f"[ERROR] Upload attempt {attempt} failed: {e}")
if attempt >= max_retries:
output_log.append("[ERROR] ❌ Max retries reached. Upload failed.")
else:
output_log.append("[INFO] Retrying upload in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
output_log.append(f"[ERROR] ❌ Unexpected error during upload: {e}")
# ==== GPU check ====
def check_gpu_status():
return "🚀 Zero GPU Ready - GPU will be allocated when training starts"
# ==== Logging helper ====
def log_message(output_log, msg):
line = f"[{datetime.now().strftime('%H:%M:%S')}] {msg}"
print(line)
output_log.append(line)
# ==== Main TExt Training ====
# @spaces.GPU(duration=300)
# def train_model(base_model, dataset_name, num_epochs, batch_size, learning_rate, hf_repo):
# output_log = []
# test_split = 0.2
# try:
# log_message(output_log, "🔍 Initializing training sequence...")
# # ===== Device =====
# device = "cuda" if torch.cuda.is_available() else "cpu"
# log_message(output_log, f"🎮 Using device: {device}")
# if device == "cuda":
# log_message(output_log, f"✅ GPU: {torch.cuda.get_device_name(0)}")
# # ===== Load dataset =====
# log_message(output_log, f"\n📚 Loading dataset: {dataset_name} ...")
# dataset = load_dataset(dataset_name)
# dataset = dataset["train"].train_test_split(test_size=test_split)
# train_dataset = dataset["train"]
# test_dataset = dataset["test"]
# log_message(output_log, f" Training samples: {len(train_dataset)}")
# log_message(output_log, f" Test samples: {len(test_dataset)}")
# log_message(output_log, f" Columns: {train_dataset.column_names}")
# # ===== Format examples =====
# # def format_example(item):
# # text = item.get("text") or item.get("content") or " ".join(str(v) for v in item.values())
# # prompt = f"""<|system|>
# # You are a wise teacher interpreting Bhagavad Gita with deep insights.
# # <|user|>
# # {text}
# # <|assistant|>
# # """
# # return {"text": prompt}
# # ===== Format examples dynamically =====
# def format_example(item):
# text_content = item.get("text") or item.get("content") or str(item.get("path", "")) or " ".join(str(v) for v in item.values())
# # Use shorter, clean system prompt + user content for better loss
# prompt = (
# f"<|system|>\nYou are an expert AI assistant.\n<|user|>\n{text_content}\n<|assistant|>\n"
# )
# return {"text": prompt}
# train_dataset = train_dataset.map(format_example)
# test_dataset = test_dataset.map(format_example)
# log_message(output_log, f"✅ Formatted {len(train_dataset)} train + {len(test_dataset)} test examples")
# # ===== Load model & tokenizer =====
# log_message(output_log, f"\n🤖 Loading model: {base_model}")
# tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
# if tokenizer.pad_token is None:
# tokenizer.pad_token = tokenizer.eos_token
# model = AutoModelForCausalLM.from_pretrained(
# base_model,
# trust_remote_code=True,
# torch_dtype=torch.float16 if device == "cuda" else torch.float32,
# low_cpu_mem_usage=True,
# )
# if device == "cuda":
# model = model.to(device)
# log_message(output_log, "✅ Model and tokenizer loaded successfully")
# log_message(output_log, f"Tokenizer vocab size: {tokenizer.vocab_size}")
# # ===== LoRA configuration =====
# log_message(output_log, "\n⚙️ Configuring LoRA for efficient fine-tuning...")
# lora_config = LoraConfig(
# task_type=TaskType.CAUSAL_LM,
# r=8,
# lora_alpha=16,
# lora_dropout=0.1,
# target_modules=["q_proj", "v_proj"],
# bias="none",
# )
# model = get_peft_model(model, lora_config)
# trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
# log_message(output_log, f"Trainable params after LoRA: {trainable_params:,}")
# # ===== Tokenization + labels =====
# def tokenize_fn(examples):
# tokenized = tokenizer(
# examples["text"],
# padding="max_length",
# truncation=True,
# max_length=256,
# )
# tokenized["labels"] = tokenized["input_ids"].copy()
# return tokenized
# train_dataset = train_dataset.map(tokenize_fn, batched=True)
# test_dataset = test_dataset.map(tokenize_fn, batched=True)
# log_message(output_log, "✅ Tokenization + labels done")
# # ===== Training arguments =====
# output_dir = "./qwen-gita-lora"
# training_args = TrainingArguments(
# output_dir=output_dir,
# num_train_epochs=num_epochs,
# per_device_train_batch_size=batch_size,
# gradient_accumulation_steps=2,
# warmup_steps=10,
# logging_steps=5,
# save_strategy="epoch",
# fp16=device == "cuda",
# optim="adamw_torch",
# learning_rate=learning_rate,
# max_steps=500, # Limit for demo is 100
# )
# trainer = Trainer(
# model=model,
# args=training_args,
# train_dataset=train_dataset,
# eval_dataset=test_dataset,
# tokenizer=tokenizer,
# )
# # ===== Train =====
# log_message(output_log, "\n🚀 Starting training...")
# trainer.train()
# log_message(output_log, "\n💾 Saving trained model locally...")
# trainer.save_model(output_dir)
# tokenizer.save_pretrained(output_dir)
# # ===== Async upload =====
# log_message(output_log, f"\n☁️ Initiating async upload to {hf_repo}")
# start_async_upload(output_dir, hf_repo, output_log)
# log_message(output_log, "✅ Training complete & async upload started!")
# except Exception as e:
# log_message(output_log, f"\n❌ Error during training: {e}")
# return "\n".join(output_log)
# =====================================================
# 🧠 Train model to expand short prompts into long ones
# =====================================================
@spaces.GPU(duration=300)
def train_model(
base_model, dataset_name, num_epochs, batch_size, learning_rate, hf_repo
):
output_log = []
try:
log_message(output_log, "🚀 Starting FAST test training...")
# ===== Device =====
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16
log_message(output_log, f"🎮 Device: {device}, dtype: {dtype}")
if device == "cuda":
log_message(output_log, f"✅ GPU: {torch.cuda.get_device_name(0)}")
# ===== Load dataset =====
log_message(output_log, f"\n📚 Loading dataset: {dataset_name}")
dataset = load_dataset(dataset_name)
dataset = dataset["train"].train_test_split(test_size=0.2, seed=42)
train_dataset, test_dataset = dataset["train"], dataset["test"]
# ===== ⚡ FAST mode: use small subset =====
train_dataset = train_dataset.select(range(min(1000, len(train_dataset))))
test_dataset = test_dataset.select(range(min(200, len(test_dataset))))
log_message(output_log, f"⚡ Using {len(train_dataset)} train / {len(test_dataset)} test samples")
# ===== Format samples =====
def format_example(example):
short_prompt = example.get("short", "").strip()
long_response = example.get("long", "").strip()
return {
"text": (
f"<|system|>\nYou are an AI that expands short prompts into detailed, descriptive ones.\n"
f"<|user|>\nShort: {short_prompt}\n"
f"<|assistant|>\n{long_response}"
)
}
train_dataset = train_dataset.map(format_example)
test_dataset = test_dataset.map(format_example)
# ===== Format examples dynamically =====
def format_example(item):
text_content = item.get("text") or item.get("content") or str(item.get("path", "")) or " ".join(str(v) for v in item.values())
# Use shorter, clean system prompt + user content for better loss
prompt = (
f"<|system|>\nYou are an expert AI assistant.\n<|user|>\n{text_content}\n<|assistant|>\n"
)
return {"text": prompt}
train_dataset = train_dataset.map(format_example)
test_dataset = test_dataset.map(format_example)
log_message(output_log, f"✅ Formatted {len(train_dataset)} train + {len(test_dataset)} test examples")
# ===== Load model & tokenizer =====
log_message(output_log, f"\n🤖 Loading model: {base_model}")
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
base_model,
trust_remote_code=True,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
low_cpu_mem_usage=True,
)
if device == "cuda":
model = model.to(device)
log_message(output_log, "✅ Model and tokenizer loaded successfully")
log_message(output_log, f"Tokenizer vocab size: {tokenizer.vocab_size}")
# ===== LoRA configuration =====
log_message(output_log, "\n⚙️ Configuring LoRA for efficient fine-tuning...")
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=8,
lora_alpha=16,
lora_dropout=0.1,
target_modules=["q_proj", "v_proj"],
bias="none",
)
model = get_peft_model(model, lora_config)
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
log_message(output_log, f"Trainable params after LoRA: {trainable_params:,}")
# ===== Tokenization + labels =====
def tokenize_fn(examples):
tokenized = tokenizer(
examples["text"],
padding="max_length",
truncation=True,
max_length=256,
)
tokenized["labels"] = tokenized["input_ids"].copy()
return tokenized
train_dataset = train_dataset.map(tokenize_fn, batched=True)
test_dataset = test_dataset.map(tokenize_fn, batched=True)
log_message(output_log, "✅ Tokenization + labels done")
# ===== Training arguments =====
output_dir = "./qwen-gita-lora"
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=num_epochs,
per_device_train_batch_size=batch_size,
gradient_accumulation_steps=2,
warmup_steps=10,
logging_steps=5,
save_strategy="epoch",
fp16=device == "cuda",
optim="adamw_torch",
learning_rate=learning_rate,
max_steps=500, # Limit for demo is 100
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=test_dataset,
tokenizer=tokenizer,
)
# ===== Train =====
log_message(output_log, "\n🚀 Starting training...")
trainer.train()
log_message(output_log, "\n💾 Saving trained model locally...")
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
# ===== Async upload =====
log_message(output_log, f"\n☁️ Initiating async upload to {hf_repo}")
start_async_upload(output_dir, hf_repo, output_log)
log_message(output_log, "✅ Training complete & async upload started!")
except Exception as e:
log_message(output_log, f"\n❌ Error during training: {e}")
return "\n".join(output_log)
# ==== Gradio Interface ====
def create_interface():
with gr.Blocks(title="PromptWizard — Qwen Trainer") as demo:
gr.Markdown("""
# 🧘 PromptWizard Qwen Fine-tuning
Fine-tune Qwen on any dataset and upload to any Hugging Face repo.
""")
with gr.Row():
with gr.Column():
gr.Textbox(label="GPU Status", value=check_gpu_status(), interactive=False)
base_model = gr.Textbox(label="Base Model", value="Qwen/Qwen2.5-0.5B")
dataset_name = gr.Textbox(label="Dataset Name", value="rahul7star/Gita")
hf_repo = gr.Textbox(label="HF Repo for Upload", value="rahul7star/Qwen0.5-3B-Gita")
num_epochs = gr.Slider(1, 3, value=1, step=1, label="Epochs")
batch_size = gr.Slider(1, 4, value=2, step=1, label="Batch Size")
learning_rate = gr.Number(value=5e-5, label="Learning Rate")
train_btn = gr.Button("🚀 Start Fine-tuning", variant="primary")
with gr.Column():
output = gr.Textbox(
label="Training Log",
lines=25,
max_lines=40,
value="Click 'Start Fine-tuning' to train and upload your model.",
)
train_btn.click(
fn=train_model,
inputs=[base_model, dataset_name, num_epochs, batch_size, learning_rate, hf_repo],
outputs=output,
)
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(server_name="0.0.0.0", server_port=7860)