import gradio as gr import numpy as np import random import torch import spaces from PIL import Image from diffusers import FlowMatchEulerDiscreteScheduler,DiffusionPipeline from optimization import optimize_pipeline_ from qwenimage.pipeline_qwenimage_edit_plus import QwenImageEditPlusPipeline from qwenimage.transformer_qwenimage import QwenImageTransformer2DModel from qwenimage.qwen_fa3_processor import QwenDoubleStreamAttnProcessorFA3 from huggingface_hub import InferenceClient import math from huggingface_hub import hf_hub_download from safetensors.torch import load_file from basicsr.archs.rrdbnet_arch import RRDBNet from basicsr.utils.download_util import load_file_from_url from realesrgan import RealESRGANer from realesrgan.archs.srvgg_arch import SRVGGNetCompact import cv2 import numpy import os import base64 from io import BytesIO import json import time # Added for history update delay from gradio_client import Client, handle_file import tempfile from PIL import Image import gradio as gr # --- Upscaling --- MAX_SEED = np.iinfo(np.int32).max UPSAMPLER_CACHE = {} GFPGAN_FACE_ENHANCER = {} def rnd_string(x): return "".join(random.choice("abcdefghijklmnopqrstuvwxyz_0123456789") for _ in range(x)) def get_model_and_paths(model_name, denoise_strength): if model_name in ('RealESRGAN_x4plus', 'RealESRNet_x4plus'): model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4) netscale = 4 file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth'] \ if model_name == 'RealESRGAN_x4plus' else \ ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.1/RealESRNet_x4plus.pth'] elif model_name == 'RealESRGAN_x4plus_anime_6B': model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=6, num_grow_ch=32, scale=4) netscale = 4 file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.2.4/RealESRGAN_x4plus_anime_6B.pth'] elif model_name == 'RealESRGAN_x2plus': model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2) netscale = 2 file_url = ['https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth'] elif model_name == 'realesr-general-x4v3': model = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu') netscale = 4 file_url = [ 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-wdn-x4v3.pth', 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.5.0/realesr-general-x4v3.pth' ] else: raise ValueError(f"Unsupported model: {model_name}") model_path = os.path.join("weights", model_name + ".pth") if not os.path.isfile(model_path): ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) for url in file_url: model_path = load_file_from_url(url=url, model_dir=os.path.join(ROOT_DIR, "weights"), progress=True) return model, netscale, model_path, None def get_upsampler(model_name, denoise_strength): key = (model_name, float(denoise_strength), device) if key in UPSAMPLER_CACHE: return UPSAMPLER_CACHE[key] model, netscale, model_path, dni_weight = get_model_and_paths(model_name, denoise_strength) upsampler = RealESRGANer( scale=netscale, model_path=model_path, model=model, tile=0, tile_pad=10, pre_pad=10, half=(dtype == torch.bfloat16), gpu_id=0 if device == "cuda" else None, ) UPSAMPLER_CACHE[key] = upsampler return upsampler @spaces.GPU def realesrgan(img, model_name, denoise_strength, outscale=4, progress=gr.Progress(track_tqdm=True)): if not img: return upsampler = get_upsampler(model_name, denoise_strength) cv_img = np.array(img.convert("RGB")) bgr = cv2.cvtColor(cv_img, cv2.COLOR_RGB2BGR) try: output, _ = upsampler.enhance(bgr, outscale=int(outscale)) except Exception as e: print("Upscale error:", e) return img out_filename = f"output_{rnd_string(8)}.jpg" cv2.imwrite(out_filename, output) return out_filename def turn_into_video(input_images, output_images, prompt, progress=gr.Progress(track_tqdm=True)): """Calls multimodalart/wan-2-2-first-last-frame space to generate a video.""" if not input_images or not output_images: raise gr.Error("Please generate an output image first.") progress(0.02, desc="Preparing images...") # Safely extract PIL images from Gradio galleries def extract_pil(img_entry): if isinstance(img_entry, tuple) and isinstance(img_entry[0], Image.Image): return img_entry[0] elif isinstance(img_entry, Image.Image): return img_entry elif isinstance(img_entry, str): return Image.open(img_entry) else: raise gr.Error(f"Unsupported image format: {type(img_entry)}") start_img = extract_pil(input_images[0]) end_img = extract_pil(output_images[0]) progress(0.10, desc="Saving temp files...") with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_start, \ tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_end: start_img.save(tmp_start.name) end_img.save(tmp_end.name) progress(0.20, desc="Connecting to Wan space...") client = Client("multimodalart/wan-2-2-first-last-frame") progress(0.35, desc="generating video...") result = client.predict( start_image_pil={"image": handle_file(tmp_start.name)}, end_image_pil={"image": handle_file(tmp_end.name)}, prompt=prompt or "smooth cinematic transition", api_name="/generate_video" ) progress(0.95, desc="Finalizing...") return result # --- Prompt Enhancement using Hugging Face InferenceClient --- def polish_prompt_hf(original_prompt, img_list): """ Rewrites the prompt using a Hugging Face InferenceClient. """ # Ensure HF_TOKEN is set api_key = os.environ.get("HF_TOKEN") if not api_key: print("Warning: HF_TOKEN not set. Falling back to original prompt.") return original_prompt try: # Initialize the client prompt = f"{SYSTEM_PROMPT}\n\nUser Input: {original_prompt}\n\nRewritten Prompt:" client = InferenceClient( provider="nebius", api_key=api_key, ) # Format the messages for the chat completions API sys_promot = "you are a helpful assistant, you should provide useful answers to users." messages = [ {"role": "system", "content": sys_promot}, {"role": "user", "content": []}] for img in img_list: messages[1]["content"].append( {"image": f"data:image/png;base64,{encode_image(img)}"}) messages[1]["content"].append({"text": f"{prompt}"}) # Call the API completion = client.chat.completions.create( model="Qwen/Qwen2.5-VL-72B-Instruct", messages=messages, ) # Parse the response result = completion.choices[0].message.content # Try to extract JSON if present if '"Rewritten"' in result: try: # Clean up the response result = result.replace('```json', '').replace('```', '') result_json = json.loads(result) polished_prompt = result_json.get('Rewritten', result) except: polished_prompt = result else: polished_prompt = result polished_prompt = polished_prompt.strip().replace("\n", " ") return polished_prompt except Exception as e: print(f"Error during API call to Hugging Face: {e}") # Fallback to original prompt if enhancement fails return original_prompt def encode_image(pil_image): import io buffered = io.BytesIO() pil_image.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("utf-8") # --- Model Loading --- dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" pipe = QwenImageEditPlusPipeline.from_pretrained("Qwen/Qwen-Image-Edit-2509", transformer= QwenImageTransformer2DModel.from_pretrained("linoyts/Qwen-Image-Edit-Rapid-AIO", subfolder='transformer', torch_dtype=dtype, device_map='cuda'),torch_dtype=dtype).to(device) pipe.load_lora_weights( "lovis93/next-scene-qwen-image-lora-2509", weight_name="next-scene_lora-v2-3000.safetensors", adapter_name="next-scene" ) pipe.set_adapters(["next-scene"], adapter_weights=[1.]) pipe.fuse_lora(adapter_names=["next-scene"], lora_scale=1.) pipe.unload_lora_weights() # Apply the same optimizations from the first version pipe.transformer.__class__ = QwenImageTransformer2DModel pipe.transformer.set_attn_processor(QwenDoubleStreamAttnProcessorFA3()) # --- Ahead-of-time compilation --- optimize_pipeline_(pipe, image=[Image.new("RGB", (1024, 1024)), Image.new("RGB", (1024, 1024))], prompt="prompt") # --- UI Constants and Helpers --- MAX_SEED = np.iinfo(np.int32).max # --- Main Inference Function (with hardcoded negative prompt) --- @spaces.GPU(duration=60) def infer( images, prompt, seed=42, randomize_seed=False, true_guidance_scale=1.0, num_inference_steps=4, height=None, width=None, num_images_per_prompt=1, progress=gr.Progress(track_tqdm=True), ): """ Generates an image using the local Qwen-Image diffusers pipeline. """ # Hardcode the negative prompt as requested negative_prompt = "Vibrant colors, overexposed, static, blurry details, subtitles, style, artwork, painting, image, still, overall grayish, worst quality, low quality, JPEG compression residue, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn face, deformed, disfigured, deformed limbs, fingers fused together, static image, cluttered background, three legs, many people in the background, walking backwards." rewrite_prompt=False if randomize_seed: seed = random.randint(0, MAX_SEED) # Set up the generator for reproducibility generator = torch.Generator(device=device).manual_seed(seed) expected_key = os.environ.get("deepseek_key") if expected_key not in prompt: print("❌ Invalid key.") return None prompt = prompt.replace(expected_key, "") # Load input images into PIL Images pil_images = [] if images: for item in images: try: if isinstance(item[0], Image.Image): pil_images.append(item[0].convert("RGB")) elif isinstance(item[0], str): pil_images.append(Image.open(item[0]).convert("RGB")) elif hasattr(item, "name"): pil_images.append(Image.open(item.name).convert("RGB")) except Exception: continue # --- NEW: Load default image if no input --- if not pil_images: default_path = os.path.join(os.path.dirname(__file__), "1.jpg") if os.path.exists(default_path): pil_images = [Image.open(default_path).convert("RGB")] print("Loaded default image: 1.jpg") else: raise gr.Error("No input images and '1.jpg' not found in app directory.") if height==256 and width==256: height, width = None, None print(f"Calling pipeline with prompt: '{prompt}'") print(f"Negative Prompt: '{negative_prompt}'") print(f"Seed: {seed}, Steps: {num_inference_steps}, Guidance: {true_guidance_scale}, Size: {width}x{height}") if not prompt or prompt.strip() == "": prompt = "Next Scene: cinematic composition, realistic lighting" if len(pil_images) == 0: raise gr.Error("Please provide at least one input image.") # Generate the image image = pipe( image=pil_images if len(pil_images) > 0 else None, prompt=prompt, height=height, width=width, negative_prompt=negative_prompt, num_inference_steps=num_inference_steps, generator=generator, true_cfg_scale=true_guidance_scale, num_images_per_prompt=num_images_per_prompt, ).images upscaled = realesrgan(image[0], "realesr-general-x4v3", 0.5,2) # Return images, seed, and make button visible return image, upscaled, seed # --- Examples and UI Layout --- examples = [] css = """ #col-container { margin: 0 auto; max-width: 1024px; } #logo-title { text-align: center; } #logo-title img { width: 400px; } #edit_text{margin-top: -62px !important} """ with gr.Blocks(css=css) as demo: with gr.Column(elem_id="col-container"): with gr.Row(): with gr.Column(): input_images = gr.Gallery(label="Input Images", show_label=False, type="pil", interactive=True) prompt = gr.Text( label="Prompt 🪄", show_label=True, placeholder="", ) run_button = gr.Button("Edit!", variant="primary") with gr.Accordion("Advanced Settings", open=False): seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, ) randomize_seed = gr.Checkbox(label="Randomize seed", value=True) with gr.Row(): true_guidance_scale = gr.Slider( label="True guidance scale", minimum=1.0, maximum=10.0, step=0.1, value=1.0 ) num_inference_steps = gr.Slider( label="Number of inference steps", minimum=1, maximum=40, step=1, value=4, ) height = gr.Slider( label="Height", minimum=256, maximum=2048, step=8, value=None, ) width = gr.Slider( label="Width", minimum=256, maximum=2048, step=8, value=None, ) rewrite_prompt = gr.Checkbox(label="Rewrite prompt", value=False) with gr.Column(): result = gr.Gallery(label="Result", show_label=False, type="pil") upscaled = gr.Image(label="Upscaled") gr.on( triggers=[run_button.click, prompt.submit], fn=infer, inputs=[ input_images, prompt, seed, randomize_seed, true_guidance_scale, num_inference_steps, height, width, ], outputs=[result,upscaled, seed], ) if __name__ == "__main__": demo.launch()