Spaces:
Sleeping
Sleeping
| """ | |
| Zen VL Training Space - HuggingFace Pro GPU Training | |
| Trains zen-vl-4b with combined ADP+xLAM datasets | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import random | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import torch | |
| from transformers import ( | |
| Qwen3VLForConditionalGeneration, | |
| Qwen3VLProcessor, | |
| TrainingArguments, | |
| Trainer, | |
| ) | |
| from datasets import load_dataset, Dataset | |
| import gradio as gr | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Global training state | |
| training_state = { | |
| "status": "idle", | |
| "progress": 0, | |
| "current_step": 0, | |
| "total_steps": 0, | |
| "loss": 0.0, | |
| "logs": [] | |
| } | |
| def log_message(message: str): | |
| """Add message to training logs""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| training_state["logs"].append(log_entry) | |
| logger.info(message) | |
| return log_entry | |
| class ZenVLTrainer: | |
| def __init__(self, model_size="4b", gpu_type="a10g"): | |
| self.model_size = model_size | |
| self.gpu_type = gpu_type | |
| self.model_name = f"zenlm/zen-vl-{model_size}-instruct" | |
| self.output_name = f"zenlm/zen-vl-{model_size}-agent" | |
| # GPU-specific configs | |
| self.configs = { | |
| "a10g": { | |
| "batch_size": 1, | |
| "gradient_accumulation": 8, | |
| "max_samples": 30000, | |
| "learning_rate": 2e-5, | |
| }, | |
| "a100-large": { | |
| "batch_size": 2, | |
| "gradient_accumulation": 8, | |
| "max_samples": 50000, | |
| "learning_rate": 2e-5, | |
| }, | |
| "a100": { | |
| "batch_size": 4, | |
| "gradient_accumulation": 8, | |
| "max_samples": 100000, | |
| "learning_rate": 2e-5, | |
| } | |
| } | |
| self.config = self.configs.get(gpu_type, self.configs["a10g"]) | |
| log_message(f"Initialized Zen VL Trainer for {model_size} on {gpu_type}") | |
| log_message(f"Config: {self.config}") | |
| def load_adp_data(self, max_samples: int = None) -> List[Dict[str, Any]]: | |
| """Load Agent Data Protocol dataset""" | |
| log_message("Loading ADP dataset...") | |
| data_dir = Path("data/adp") | |
| all_data = [] | |
| if data_dir.exists(): | |
| # Load from local cache | |
| for json_file in data_dir.glob("*.jsonl"): | |
| log_message(f"Loading {json_file.name}...") | |
| with open(json_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| all_data.append(json.loads(line)) | |
| if max_samples and len(all_data) >= max_samples: | |
| break | |
| else: | |
| # Download from HuggingFace | |
| log_message("Downloading ADP dataset from HuggingFace...") | |
| configs = [ | |
| 'agenttuning_os', 'agenttuning_kg', 'agenttuning_db', | |
| 'synatra', 'code_feedback', 'go-browse-wa' | |
| ] | |
| for config in configs: | |
| try: | |
| dataset = load_dataset( | |
| "neulab/agent-data-collection", | |
| config, | |
| split="train", | |
| streaming=True | |
| ) | |
| for i, example in enumerate(dataset): | |
| all_data.append(example) | |
| if max_samples and len(all_data) >= max_samples: | |
| break | |
| log_message(f"Loaded {len(all_data)} samples from {config}") | |
| if max_samples and len(all_data) >= max_samples: | |
| break | |
| except Exception as e: | |
| log_message(f"Warning: Could not load {config}: {e}") | |
| continue | |
| log_message(f"Loaded {len(all_data)} ADP samples") | |
| return all_data | |
| def load_xlam_data(self, max_samples: int = None) -> List[Dict[str, Any]]: | |
| """Load xLAM function calling dataset""" | |
| log_message("Loading xLAM dataset...") | |
| data_dir = Path("data/xlam") | |
| all_data = [] | |
| if data_dir.exists(): | |
| # Load from local cache | |
| json_file = data_dir / "xlam_converted.jsonl" | |
| if json_file.exists(): | |
| with open(json_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| all_data.append(json.loads(line)) | |
| if max_samples and len(all_data) >= max_samples: | |
| break | |
| else: | |
| # Download from HuggingFace | |
| log_message("Downloading xLAM dataset from HuggingFace...") | |
| try: | |
| dataset = load_dataset( | |
| "Salesforce/xlam-function-calling-60k", | |
| split="train" | |
| ) | |
| for i, example in enumerate(dataset): | |
| all_data.append(example) | |
| if max_samples and len(all_data) >= max_samples: | |
| break | |
| log_message(f"Loaded {len(all_data)} xLAM samples") | |
| except Exception as e: | |
| log_message(f"Error loading xLAM: {e}") | |
| return all_data | |
| def create_balanced_mixture( | |
| self, | |
| adp_data: List[Dict], | |
| xlam_data: List[Dict], | |
| adp_weight: float = 0.80, | |
| xlam_weight: float = 0.20 | |
| ) -> List[Dict]: | |
| """Create balanced mixture of ADP and xLAM data""" | |
| log_message(f"Creating balanced mixture: {adp_weight:.0%} ADP, {xlam_weight:.0%} xLAM") | |
| total_size = min(len(adp_data), int(len(xlam_data) / xlam_weight)) | |
| adp_target = int(total_size * adp_weight) | |
| xlam_target = int(total_size * xlam_weight) | |
| adp_sample = random.sample(adp_data, min(adp_target, len(adp_data))) | |
| xlam_sample = random.sample(xlam_data, min(xlam_target, len(xlam_data))) | |
| combined = adp_sample + xlam_sample | |
| random.shuffle(combined) | |
| log_message(f"Created mixture: {len(adp_sample)} ADP + {len(xlam_sample)} xLAM = {len(combined)} total") | |
| return combined | |
| def train(self): | |
| """Main training function""" | |
| try: | |
| training_state["status"] = "preparing" | |
| log_message("=" * 80) | |
| log_message("Starting Zen VL Training on HuggingFace Space") | |
| log_message("=" * 80) | |
| # Load model and processor | |
| training_state["status"] = "loading_model" | |
| log_message(f"Loading model: {self.model_name}") | |
| model = Qwen3VLForConditionalGeneration.from_pretrained( | |
| self.model_name, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto" | |
| ) | |
| processor = Qwen3VLProcessor.from_pretrained(self.model_name) | |
| log_message("Model and processor loaded successfully") | |
| # Load datasets | |
| training_state["status"] = "loading_data" | |
| max_samples = self.config["max_samples"] | |
| adp_data = self.load_adp_data(max_samples=int(max_samples * 0.8)) | |
| xlam_data = self.load_xlam_data(max_samples=int(max_samples * 0.2)) | |
| # Create mixture | |
| combined_data = self.create_balanced_mixture(adp_data, xlam_data) | |
| # Convert to HuggingFace Dataset | |
| dataset = Dataset.from_list(combined_data) | |
| log_message(f"Created dataset with {len(dataset)} examples") | |
| # Training arguments | |
| training_state["status"] = "configuring" | |
| output_dir = f"./output/{self.model_size}" | |
| training_args = TrainingArguments( | |
| output_dir=output_dir, | |
| num_train_epochs=3, | |
| per_device_train_batch_size=self.config["batch_size"], | |
| gradient_accumulation_steps=self.config["gradient_accumulation"], | |
| learning_rate=self.config["learning_rate"], | |
| warmup_steps=500, | |
| logging_steps=10, | |
| save_steps=500, | |
| save_total_limit=3, | |
| fp16=False, | |
| bf16=True, | |
| push_to_hub=True, | |
| hub_model_id=self.output_name, | |
| hub_strategy="every_save", | |
| report_to="tensorboard", | |
| ) | |
| log_message("Training configuration:") | |
| log_message(f" Epochs: {training_args.num_train_epochs}") | |
| log_message(f" Batch size: {training_args.per_device_train_batch_size}") | |
| log_message(f" Gradient accumulation: {training_args.gradient_accumulation_steps}") | |
| log_message(f" Learning rate: {training_args.learning_rate}") | |
| log_message(f" Total samples: {len(dataset)}") | |
| # Calculate total steps | |
| total_steps = ( | |
| len(dataset) | |
| // (training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps) | |
| * training_args.num_train_epochs | |
| ) | |
| training_state["total_steps"] = total_steps | |
| log_message(f" Total training steps: {total_steps}") | |
| # Initialize trainer | |
| training_state["status"] = "training" | |
| log_message("Initializing trainer...") | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=dataset, | |
| ) | |
| # Start training | |
| log_message("=" * 80) | |
| log_message("TRAINING STARTED") | |
| log_message("=" * 80) | |
| result = trainer.train() | |
| # Training completed | |
| training_state["status"] = "uploading" | |
| log_message("=" * 80) | |
| log_message("TRAINING COMPLETED") | |
| log_message("=" * 80) | |
| log_message(f"Final loss: {result.training_loss:.4f}") | |
| # Push to hub | |
| log_message(f"Uploading model to {self.output_name}...") | |
| trainer.push_to_hub() | |
| training_state["status"] = "completed" | |
| training_state["progress"] = 100 | |
| log_message("=" * 80) | |
| log_message("SUCCESS! Model uploaded to HuggingFace") | |
| log_message("=" * 80) | |
| return "Training completed successfully!" | |
| except Exception as e: | |
| training_state["status"] = "error" | |
| error_msg = f"Training failed: {str(e)}" | |
| log_message(error_msg) | |
| return error_msg | |
| def get_training_status(): | |
| """Get current training status for Gradio UI""" | |
| status = training_state["status"] | |
| progress = training_state["progress"] | |
| current_step = training_state["current_step"] | |
| total_steps = training_state["total_steps"] | |
| loss = training_state["loss"] | |
| status_text = { | |
| "idle": "βΈοΈ Ready to start training", | |
| "preparing": "π§ Preparing training environment...", | |
| "loading_model": "π¦ Loading model and processor...", | |
| "loading_data": "π Loading training datasets...", | |
| "configuring": "βοΈ Configuring training parameters...", | |
| "training": f"π Training in progress: {current_step}/{total_steps} steps", | |
| "uploading": "βοΈ Uploading model to HuggingFace...", | |
| "completed": "β Training completed successfully!", | |
| "error": "β Training failed" | |
| } | |
| return status_text.get(status, status), progress, "\n".join(training_state["logs"][-50:]) | |
| def start_training(model_size, gpu_type): | |
| """Start training job""" | |
| log_message(f"Starting training for {model_size} on {gpu_type}") | |
| trainer = ZenVLTrainer(model_size=model_size, gpu_type=gpu_type) | |
| result = trainer.train() | |
| return result | |
| # Gradio Interface | |
| with gr.Blocks(title="Zen VL Training") as demo: | |
| gr.Markdown(""" | |
| # π§ Zen VL Training Space | |
| Train zen-vl models with combined ADP+xLAM datasets on HuggingFace Pro GPUs. | |
| **Datasets:** | |
| - Agent Data Protocol (ADP): ~220k agent trajectories | |
| - xLAM Function Calling: 60k function calling examples | |
| **Training Time Estimates:** | |
| - 4B model on A10G: ~6-8 hours | |
| - 8B model on A100: ~10-12 hours | |
| - 30B model on A100-80GB: ~20-24 hours | |
| """) | |
| with gr.Row(): | |
| model_size = gr.Dropdown( | |
| choices=["4b", "8b", "30b"], | |
| value="4b", | |
| label="Model Size" | |
| ) | |
| gpu_type = gr.Dropdown( | |
| choices=["a10g", "a100-large", "a100"], | |
| value="a10g", | |
| label="GPU Type" | |
| ) | |
| start_btn = gr.Button("π Start Training", variant="primary") | |
| status_text = gr.Textbox(label="Status", value="Ready to start training") | |
| progress_bar = gr.Slider(minimum=0, maximum=100, value=0, label="Progress") | |
| logs_text = gr.Textbox(label="Training Logs", lines=20, max_lines=50) | |
| # Auto-refresh status every 10 seconds | |
| demo.load( | |
| get_training_status, | |
| None, | |
| [status_text, progress_bar, logs_text], | |
| every=10 | |
| ) | |
| start_btn.click( | |
| start_training, | |
| inputs=[model_size, gpu_type], | |
| outputs=[status_text] | |
| ) | |
| if __name__ == "__main__": | |
| # Check if running in HF Space | |
| if os.environ.get("SPACE_ID"): | |
| log_message(f"Running in HuggingFace Space: {os.environ['SPACE_ID']}") | |
| # Launch Gradio interface | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) | |
| # AUTO-START TRAINING | |
| import threading | |
| def auto_start_training(): | |
| """Auto-start training when Space launches""" | |
| time.sleep(5) # Wait for Space to fully initialize | |
| print("π AUTO-STARTING TRAINING (4B model on A10G)") | |
| trainer = ZenVLTrainer(model_size="4b", gpu_type="a10g") | |
| trainer.train() | |
| # Launch training in background thread | |
| training_thread = threading.Thread(target=auto_start_training, daemon=True) | |
| training_thread.start() | |