|
|
""" |
|
|
PULSE-7B Enhanced Handler |
|
|
Ubden® Team - Edited by https://github.com/ck-cankurt |
|
|
Support: Text, Image URLs, and Base64 encoded images |
|
|
""" |
|
|
|
|
|
import torch |
|
|
from typing import Dict, List, Any |
|
|
import base64 |
|
|
from io import BytesIO |
|
|
from PIL import Image |
|
|
import requests |
|
|
import time |
|
|
|
|
|
|
|
|
try: |
|
|
from utils import ( |
|
|
performance_monitor, |
|
|
validate_image_input, |
|
|
sanitize_parameters, |
|
|
get_system_info, |
|
|
create_health_check, |
|
|
deepseek_client |
|
|
) |
|
|
UTILS_AVAILABLE = True |
|
|
except ImportError: |
|
|
UTILS_AVAILABLE = False |
|
|
deepseek_client = None |
|
|
print("⚠️ Utils module not found - performance monitoring and DeepSeek integration disabled") |
|
|
|
|
|
|
|
|
class EndpointHandler: |
|
|
def __init__(self, path=""): |
|
|
""" |
|
|
Hey there! Let's get this PULSE-7B model up and running. |
|
|
We'll load it from the HuggingFace hub directly, so no worries about local files. |
|
|
|
|
|
Args: |
|
|
path: Model directory path (we actually ignore this and load from HF hub) |
|
|
""" |
|
|
print("🚀 Starting up PULSE-7B handler...") |
|
|
print("📝 Enhanced by Ubden® Team - github.com/ck-cankurt") |
|
|
import sys |
|
|
print(f"🔧 Python version: {sys.version}") |
|
|
print(f"🔧 PyTorch version: {torch.__version__}") |
|
|
|
|
|
|
|
|
try: |
|
|
import transformers |
|
|
print(f"🔧 Transformers version: {transformers.__version__}") |
|
|
|
|
|
|
|
|
if transformers.__version__ == "4.37.2": |
|
|
print("✅ Using PULSE LLaVA compatible version (4.37.2)") |
|
|
elif "dev" in transformers.__version__ or "git" in str(transformers.__version__): |
|
|
print("⚠️ Using development version - may conflict with PULSE LLaVA") |
|
|
else: |
|
|
print("⚠️ Using different version - PULSE LLaVA prefers 4.37.2") |
|
|
except Exception as e: |
|
|
print(f"❌ Error checking transformers version: {e}") |
|
|
|
|
|
print(f"🔧 CUDA available: {torch.cuda.is_available()}") |
|
|
if torch.cuda.is_available(): |
|
|
print(f"🔧 CUDA device: {torch.cuda.get_device_name(0)}") |
|
|
|
|
|
|
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
print(f"🖥️ Running on: {self.device}") |
|
|
|
|
|
try: |
|
|
|
|
|
from transformers import pipeline |
|
|
|
|
|
print("📦 Fetching model from HuggingFace Hub...") |
|
|
self.pipe = pipeline( |
|
|
"text-generation", |
|
|
model="PULSE-ECG/PULSE-7B", |
|
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
|
device=0 if torch.cuda.is_available() else -1, |
|
|
trust_remote_code=True, |
|
|
model_kwargs={ |
|
|
"low_cpu_mem_usage": True, |
|
|
"use_safetensors": True |
|
|
} |
|
|
) |
|
|
print("✅ Model loaded successfully via pipeline!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Pipeline didn't work out: {e}") |
|
|
print("🔄 Let me try a different approach...") |
|
|
|
|
|
try: |
|
|
|
|
|
from transformers import AutoTokenizer, LlamaForCausalLM |
|
|
|
|
|
|
|
|
print("📖 Setting up tokenizer...") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
|
"PULSE-ECG/PULSE-7B", |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
|
|
|
print("🧠 Loading the model as Llama...") |
|
|
self.model = LlamaForCausalLM.from_pretrained( |
|
|
"PULSE-ECG/PULSE-7B", |
|
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
|
device_map="auto", |
|
|
low_cpu_mem_usage=True, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
|
|
|
if self.tokenizer.pad_token is None: |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id |
|
|
|
|
|
self.model.eval() |
|
|
self.use_pipeline = False |
|
|
print("✅ Model loaded successfully via direct loading!") |
|
|
|
|
|
except Exception as e2: |
|
|
print(f"😓 That didn't work either: {e2}") |
|
|
|
|
|
self.pipe = None |
|
|
self.model = None |
|
|
self.tokenizer = None |
|
|
self.use_pipeline = None |
|
|
else: |
|
|
self.use_pipeline = True |
|
|
|
|
|
|
|
|
print("\n🔍 Model Loading Status Report:") |
|
|
print(f" - use_pipeline: {self.use_pipeline}") |
|
|
print(f" - model: {'✅ Loaded' if self.model is not None else '❌ None'}") |
|
|
print(f" - processor: {'✅ Loaded' if self.processor is not None else '❌ None'}") |
|
|
print(f" - tokenizer: {'✅ Loaded' if self.tokenizer is not None else '❌ None'}") |
|
|
print(f" - pipe: {'✅ Loaded' if self.pipe is not None else '❌ None'}") |
|
|
|
|
|
if all(x is None for x in [self.model, self.processor, self.tokenizer, self.pipe]): |
|
|
print("💥 CRITICAL: No model components loaded successfully!") |
|
|
else: |
|
|
print("✅ At least one model component loaded successfully") |
|
|
|
|
|
def process_image_input(self, image_input): |
|
|
""" |
|
|
Handle both URL and base64 image inputs like a champ! |
|
|
|
|
|
Args: |
|
|
image_input: Can be a URL string or base64 encoded image |
|
|
|
|
|
Returns: |
|
|
PIL Image object or None if something goes wrong |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(image_input, str) and (image_input.startswith('http://') or image_input.startswith('https://')): |
|
|
print(f"🌐 Fetching image from URL: {image_input[:50]}...") |
|
|
response = requests.get(image_input, timeout=10) |
|
|
response.raise_for_status() |
|
|
image = Image.open(BytesIO(response.content)).convert('RGB') |
|
|
print("✅ Image downloaded successfully!") |
|
|
return image |
|
|
|
|
|
|
|
|
elif isinstance(image_input, str): |
|
|
print("🔍 Decoding base64 image...") |
|
|
|
|
|
if "base64," in image_input: |
|
|
image_input = image_input.split("base64,")[1] |
|
|
|
|
|
image_data = base64.b64decode(image_input) |
|
|
image = Image.open(BytesIO(image_data)).convert('RGB') |
|
|
print("✅ Image decoded successfully!") |
|
|
return image |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Couldn't process the image: {e}") |
|
|
return None |
|
|
|
|
|
return None |
|
|
|
|
|
def add_turkish_commentary(self, response: Dict[str, Any], enable_commentary: bool, timeout: int = 30) -> Dict[str, Any]: |
|
|
"""Add Turkish commentary to the response using DeepSeek API""" |
|
|
if not enable_commentary: |
|
|
return response |
|
|
|
|
|
if not UTILS_AVAILABLE or not deepseek_client: |
|
|
print("⚠️ DeepSeek client not available - skipping Turkish commentary") |
|
|
response["commentary_status"] = "unavailable" |
|
|
return response |
|
|
|
|
|
if not deepseek_client.is_available(): |
|
|
print("⚠️ DeepSeek API key not configured - skipping Turkish commentary") |
|
|
response["commentary_status"] = "api_key_missing" |
|
|
return response |
|
|
|
|
|
generated_text = response.get("generated_text", "") |
|
|
if not generated_text: |
|
|
print("⚠️ No generated text to comment on") |
|
|
response["commentary_status"] = "no_text" |
|
|
return response |
|
|
|
|
|
print("🔄 DeepSeek ile Türkçe yorum ekleniyor...") |
|
|
commentary_result = deepseek_client.get_turkish_commentary(generated_text, timeout) |
|
|
|
|
|
if commentary_result["success"]: |
|
|
response["comment_text"] = commentary_result["comment_text"] |
|
|
response["commentary_model"] = commentary_result.get("model", "deepseek-chat") |
|
|
response["commentary_tokens"] = commentary_result.get("tokens_used", 0) |
|
|
response["commentary_status"] = "success" |
|
|
print("✅ Türkçe yorum başarıyla eklendi") |
|
|
else: |
|
|
response["comment_text"] = "" |
|
|
response["commentary_error"] = commentary_result["error"] |
|
|
response["commentary_status"] = "failed" |
|
|
print(f"❌ Türkçe yorum eklenemedi: {commentary_result['error']}") |
|
|
|
|
|
return response |
|
|
|
|
|
def health_check(self) -> Dict[str, Any]: |
|
|
"""Health check endpoint""" |
|
|
if UTILS_AVAILABLE: |
|
|
return create_health_check() |
|
|
else: |
|
|
return { |
|
|
'status': 'healthy', |
|
|
'model': 'PULSE-7B', |
|
|
'timestamp': time.time(), |
|
|
'handler_version': '2.0.0' |
|
|
} |
|
|
|
|
|
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main processing function - where the magic happens! |
|
|
|
|
|
Args: |
|
|
data: Input data with 'inputs' and optional 'parameters' |
|
|
|
|
|
Returns: |
|
|
List with the generated response |
|
|
""" |
|
|
|
|
|
if self.use_pipeline is None: |
|
|
return [{ |
|
|
"generated_text": "Oops! Model couldn't load properly. Please check the deployment settings.", |
|
|
"error": "Model initialization failed", |
|
|
"handler": "Ubden® Team Enhanced Handler" |
|
|
}] |
|
|
|
|
|
try: |
|
|
|
|
|
inputs = data.get("inputs", "") |
|
|
text = "" |
|
|
image = None |
|
|
|
|
|
if isinstance(inputs, dict): |
|
|
|
|
|
|
|
|
text = inputs.get("query", inputs.get("text", inputs.get("prompt", str(inputs)))) |
|
|
|
|
|
|
|
|
image_input = inputs.get("image", inputs.get("image_url", inputs.get("image_base64", None))) |
|
|
if image_input: |
|
|
image = self.process_image_input(image_input) |
|
|
if image: |
|
|
|
|
|
text = f"[Image provided - {image.size[0]}x{image.size[1]} pixels] {text}" |
|
|
else: |
|
|
|
|
|
text = str(inputs) |
|
|
|
|
|
if not text: |
|
|
return [{"generated_text": "Hey, I need some text to work with! Please provide an input."}] |
|
|
|
|
|
|
|
|
parameters = data.get("parameters", {}) |
|
|
max_new_tokens = min(parameters.get("max_new_tokens", 256), 1024) |
|
|
temperature = parameters.get("temperature", 0.7) |
|
|
top_p = parameters.get("top_p", 0.95) |
|
|
do_sample = parameters.get("do_sample", True) |
|
|
repetition_penalty = parameters.get("repetition_penalty", 1.0) |
|
|
|
|
|
|
|
|
enable_turkish_commentary = parameters.get("enable_turkish_commentary", False) |
|
|
|
|
|
|
|
|
if self.use_pipeline: |
|
|
result = self.pipe( |
|
|
text, |
|
|
max_new_tokens=max_new_tokens, |
|
|
temperature=temperature, |
|
|
top_p=top_p, |
|
|
do_sample=do_sample, |
|
|
repetition_penalty=repetition_penalty, |
|
|
return_full_text=False |
|
|
) |
|
|
|
|
|
|
|
|
if isinstance(result, list) and len(result) > 0: |
|
|
generated_text = result[0].get("generated_text", "") |
|
|
|
|
|
|
|
|
response = {"generated_text": generated_text} |
|
|
|
|
|
|
|
|
if enable_turkish_commentary: |
|
|
response = self.add_turkish_commentary(response, True) |
|
|
|
|
|
return [response] |
|
|
else: |
|
|
response = {"generated_text": str(result)} |
|
|
|
|
|
|
|
|
if enable_turkish_commentary: |
|
|
response = self.add_turkish_commentary(response, True) |
|
|
|
|
|
return [response] |
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
encoded = self.tokenizer( |
|
|
text, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
max_length=2048 |
|
|
) |
|
|
|
|
|
input_ids = encoded["input_ids"].to(self.device) |
|
|
attention_mask = encoded.get("attention_mask") |
|
|
if attention_mask is not None: |
|
|
attention_mask = attention_mask.to(self.device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
input_ids, |
|
|
attention_mask=attention_mask, |
|
|
max_new_tokens=max_new_tokens, |
|
|
temperature=temperature, |
|
|
top_p=top_p, |
|
|
do_sample=do_sample, |
|
|
repetition_penalty=repetition_penalty, |
|
|
pad_token_id=self.tokenizer.pad_token_id, |
|
|
eos_token_id=self.tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
|
|
|
generated_ids = outputs[0][input_ids.shape[-1]:] |
|
|
generated_text = self.tokenizer.decode( |
|
|
generated_ids, |
|
|
skip_special_tokens=True, |
|
|
clean_up_tokenization_spaces=True |
|
|
) |
|
|
|
|
|
|
|
|
response = {"generated_text": generated_text} |
|
|
|
|
|
|
|
|
if enable_turkish_commentary: |
|
|
response = self.add_turkish_commentary(response, True) |
|
|
|
|
|
return [response] |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Something went wrong during generation: {str(e)}" |
|
|
print(f"❌ {error_msg}") |
|
|
return [{ |
|
|
"generated_text": "", |
|
|
"error": error_msg, |
|
|
"handler": "Ubden® Team Enhanced Handler" |
|
|
}] |