import argparse import json from io import BytesIO import os import requests import torch from PIL import Image from tqdm import tqdm from llava.constants import ( IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN, ) from llava.conversation import conv_templates, SeparatorStyle from llava.model.builder import load_pretrained_model from llava.utils import disable_torch_init from llava.mm_utils import ( tokenizer_image_token, process_images, load_image_from_base64, get_model_name_from_path, KeywordsStoppingCriteria ) def load_image(image_file): if image_file.startswith('http') or image_file.startswith('https'): response = requests.get(image_file) image = Image.open(BytesIO(response.content)).convert('RGB') else: image = Image.open(image_file).convert('RGB') return image def eval_model(args): # Model disable_torch_init() model_name = get_model_name_from_path(args.model_path) tokenizer, model, image_processor, context_len = load_pretrained_model( args.model_path, args.model_base, model_name)#, True) with open(args.questions_file) as f: llvqa_data = json.load(f) for i, llddata in enumerate(tqdm(llvqa_data)): image_file = llddata["img_path"] if args.lang == "en": message = llddata["question"] + \ "\nChoose between one of the options as follows:\n" elif args.lang == "zh": message = llddata["question"] + "\在下列选项中选择一个:\n" else: raise NotImplementedError( "Q-Bench does not support languages other than English (en) and Chinese (zh) yet. Contact us (https://github.com/VQAssessment/Q-Bench/) to convert Q-Bench into more languages.") for choice, ans in zip(["A.", "B.", "C.", "D."], llddata["candidates"]): message += f"{choice} {ans}\n" qs = message if model.config.mm_use_im_start_end: qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + \ DEFAULT_IM_END_TOKEN + '\n' + qs else: qs = DEFAULT_IMAGE_TOKEN + '\n' + qs # if 'llama-2' in model_name.lower(): # conv_mode = "share4v_llama_2" # elif "v1" in model_name.lower(): # conv_mode = "share4v_v1" # elif "mpt" in model_name.lower(): # conv_mode = "mpt" # else: # conv_mode = "share4v_v0" # if args.conv_mode is not None and conv_mode != args.conv_mode: # print('[WARNING] the auto inferred conversation mode is {}, while `--conv-mode` is {}, using {}'.format( # conv_mode, args.conv_mode, args.conv_mode)) # else: # args.conv_mode = conv_mode conv = conv_templates[args.conv_mode].copy() conv.append_message(conv.roles[0], qs) conv.append_message(conv.roles[1], None) prompt = conv.get_prompt() image = Image.open(os.path.join(args.image_folder, image_file)) if isinstance(image_processor, list): image_tensor_0 = image_processor[0].preprocess( image, return_tensors="pt" )["pixel_values"][0] image_tensor_1 = image_processor[1].preprocess( image, return_tensors="pt" )["pixel_values"][0] image_tensor = torch.cat((image_tensor_0, image_tensor_1), dim=0) if len(image_processor)>2: image_tensor_2 = image_processor[2].preprocess( image, return_tensors="pt" )["pixel_values"][0] high_image_tensor = image_tensor_2 else: high_image_tensor = image_tensor_0 if len(image_processor)>3: image_tensor_3 = image_processor[3].preprocess( image, return_tensors="pt" )["pixel_values"][0] flattened_image_tensor = image_tensor_3 else: flattened_image_tensor = image_tensor_0 else: image_tensor = image_processor.preprocess(image, return_tensors="pt")[ "pixel_values" ][0] high_image_tensor = image_tensor flattened_image_tensor = image_tensor images = image_tensor.unsqueeze(0).bfloat16().cuda() high_images = high_image_tensor.unsqueeze(0).bfloat16().cuda() flattened_patches = flattened_image_tensor.unsqueeze(0).bfloat16().cuda() input_ids = ( tokenizer_image_token( prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt" ) .unsqueeze(0) .cuda() ) stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2 keywords = [stop_str] stopping_criteria = ( [KeywordsStoppingCriteria(keywords, tokenizer, input_ids)] if conv.version == "v0" else None ) if "qformer" in model_name or 'gate' in model_name: prompts = [["question 0: \n\n".replace("", qs.replace("\n", "").lower())]] else: prompts = None if "gate-mask" in model_name: routing_weights = torch.Tensor([1, 0, 1, 0, 0, 0, 0]) routing_weights = routing_weights.unsqueeze(0).bfloat16().cuda() else: routing_weights = None with torch.inference_mode(): output_ids = model.generate( input_ids, images=images, high_images=high_images if isinstance(image_processor, list) and len(image_processor)>2 else None, flattened_patches=flattened_patches if isinstance(image_processor, list) and len(image_processor)>3 else None, routing_weights=routing_weights, prompts=prompts, do_sample=True if args.temperature > 0 else False, temperature=args.temperature, top_p=args.top_p, num_beams=args.num_beams, # no_repeat_ngram_size=3, max_new_tokens=1024, use_cache=True, ) input_token_len = input_ids.shape[1] n_diff_input_output = ( input_ids != output_ids[:, :input_token_len]).sum().item() if n_diff_input_output > 0: print( f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids') outputs = tokenizer.batch_decode( output_ids[:, input_token_len:], skip_special_tokens=True)[0] outputs = outputs.strip() if outputs.endswith(stop_str): outputs = outputs[:-len(stop_str)] outputs = outputs.strip() llddata["response"] = outputs with open(args.answers_file, "a") as wf: json.dump(llddata, wf) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--model-path", type=str, default="share4v-7b") parser.add_argument("--model-base", type=str, default=None) parser.add_argument("--image-folder", type=str, default="./playground/data/qbench/images_llvisionqa") parser.add_argument("--questions-file", type=str, default="./playground/data/qbench/llvisionqa_dev.json") parser.add_argument("--answers-file", type=str, default="answer.jsonl") parser.add_argument("--conv-mode", type=str, default="llava_v1") parser.add_argument("--lang", type=str, default="en") parser.add_argument("--temperature", type=float, default=0.0) parser.add_argument("--top_p", type=float, default=None) parser.add_argument("--num_beams", type=int, default=1) parser.add_argument("--regen", action="store_true", default=False) args = parser.parse_args() if os.path.exists(args.answers_file) and not args.regen: print("{} already exists, won't regen again.".format(args.answers_file)) else: eval_model(args)