ScoreVision / miner.py
gloriforge's picture
Upload folder using huggingface_hub
7f59c00 verified
raw
history blame
11.4 kB
from pathlib import Path
from numpy import ndarray
import numpy as np
from pydantic import BaseModel
import sys, os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ["OMP_NUM_THREADS"] = "16"
os.environ["TF_NUM_INTRAOP_THREADS"] = "16"
os.environ["TF_NUM_INTEROP_THREADS"] = "2"
os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
# Suppress ONNX Runtime warnings
os.environ['ORT_LOGGING_LEVEL'] = '3'
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import tensorflow as tf
tf.config.threading.set_intra_op_parallelism_threads(16)
tf.config.threading.set_inter_op_parallelism_threads(2)
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.get_logger().setLevel('ERROR')
tf.autograph.set_verbosity(0)
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')
tf.config.optimizer.set_jit(True)
import torch._dynamo
torch._dynamo.config.suppress_errors = True
import onnxruntime as ort
import gc
import torch
import torch_tensorrt
import torchvision.transforms as T
import yaml
import cv2
from player import player_detection_result
from pitch import process_batch_input, get_cls_net, get_cls_net_l
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
"""
This class is responsible for:
- Loading ML models.
- Running batched predictions on images.
- Parsing ML model outputs into structured results (TVFrameResult).
This class can be modified, but it must have the following to be compatible with the chute:
- be named `Miner`
- have a `predict_batch` function with the inputs and outputs specified
- be stored in a file called `miner.py` which lives in the root of the HFHub repo
"""
def __init__(self, path_hf_repo: Path) -> None:
"""
Loads all ML models from the repository.
-----(Adjust as needed)----
Args:
path_hf_repo (Path):
Path to the downloaded HuggingFace Hub repository
Returns:
None
"""
global torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
providers = [
'CUDAExecutionProvider',
'CPUExecutionProvider'
]
# providers = [ 'CPUExecutionProvider']
model_path = path_hf_repo / "object-detection.onnx"
session = ort.InferenceSession(model_path, providers=providers)
input_name = session.get_inputs()[0].name
height = width = 640
dummy = np.zeros((1, 3, height, width), dtype=np.float32)
session.run(None, {input_name: dummy})
model = session
self.bbox_model = model
print(f"βœ… BBox Model Loaded")
self.kp_threshold = 0.1
# self.lp_threshold = 0.7
model_kp_path = path_hf_repo / 'SV_kp.engine'
model_kp = torch_tensorrt.load(model_kp_path)
@torch.inference_mode()
def run_inference(model, input_tensor: torch.Tensor):
input_tensor = input_tensor.to(device).to(memory_format=torch.channels_last)
output = model.module().forward(input_tensor)
return output
run_inference(model_kp, torch.randn(8, 3, 540, 960, device=device, dtype=torch.float32))
# model_kp_path = path_hf_repo / 'SV_kp'
# model_lp_path = path_hf_repo / 'SV_lines'
# config_kp_path = path_hf_repo / 'hrnetv2_w48.yaml'
# config_lp_path = path_hf_repo / 'hrnetv2_w48_l.yaml'
# cfg_kp = yaml.safe_load(open(config_kp_path, 'r'))
# cfg_lp = yaml.safe_load(open(config_lp_path, 'r'))
# loaded_state_kp = torch.load(model_kp_path, map_location=device)
# model_kp = get_cls_net(cfg_kp)
# model_kp.load_state_dict(loaded_state_kp)
# model_kp.to(device)
# model_kp.eval()
# loaded_state_lp = torch.load(model_lp_path, map_location=device)
# model_lp = get_cls_net_l(cfg_lp)
# model_lp.load_state_dict(loaded_state_lp)
# model_lp.to(device)
# model_lp.eval()
# self.transform = T.Resize((540, 960))
self.keypoints_model = model_kp
# self.lines_model = model_lp
# print("πŸ”₯ Warming up compiled models...")
# self._warmup_models(device)
# Increase batch sizes for better GPU utilization
self.player_batch_size = 16 # Increased from 32
self.pitch_batch_size = 8 # Increased from 32
print(f"βœ… Keypoints Model Loaded")
def __repr__(self) -> str:
return f"BBox Model: {type(self.bbox_model).__name__}\nKeypoints Model: {type(self.keypoints_model).__name__}"
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
player_batch_size = min(self.player_batch_size, len(batch_images))
bboxes: dict[int, list[BoundingBox]] = {}
while True:
try:
gc.collect()
if torch.cuda.is_available():
tf.keras.backend.clear_session()
torch.cuda.empty_cache()
torch.cuda.synchronize()
bbox_model_results, _, _, _ = player_detection_result(batch_images, player_batch_size, self.bbox_model)
if bbox_model_results is not None:
for frame_number_in_batch, detections in enumerate(bbox_model_results):
boxes = []
for detection in detections:
# Detection format from player.py: {"id": int, "bbox": [x1, y1, x2, y2], "class_id": int}
x1, y1, x2, y2 = detection["bbox"]
cls_id = detection["class_id"]
# Use a default confidence since it's not provided in the processed results
conf = detection["conf"] # Default confidence value
boxes.append(
BoundingBox(
x1=int(x1),
y1=int(y1),
x2=int(x2),
y2=int(y2),
cls_id=int(cls_id),
conf=float(conf),
)
)
bboxes[offset + frame_number_in_batch] = boxes
print("βœ… BBoxes predicted")
break
except RuntimeError as e:
print(self.player_batch_size)
if 'out of memory' in str(e):
if self.player_batch_size == 1:
raise e
self.player_batch_size = self.player_batch_size // 2 if self.player_batch_size > 1 else 1
player_batch_size = min(self.player_batch_size, len(batch_images))
else:
raise e
except Exception as e:
print(f"❌ Error during bbox prediction: {e}")
raise e
pitch_batch_size = min(self.pitch_batch_size, len(batch_images))
keypoints: dict[int, list[tuple[int, int]]] = {}
while True:
try:
gc.collect()
if torch.cuda.is_available():
tf.keras.backend.clear_session()
torch.cuda.empty_cache()
torch.cuda.synchronize()
# Removed expensive memory clearing operations for speed
keypoints_result = process_batch_input(
batch_images,
self.keypoints_model,
self.kp_threshold,
'cuda' if torch.cuda.is_available() else 'cpu',
batch_size=pitch_batch_size
)
if keypoints_result is not None:
for frame_number_in_batch, kp_dict in enumerate(keypoints_result):
frame_keypoints: list[tuple[int, int]] = []
# Get image dimensions for conversion from normalized to pixel coordinates
if frame_number_in_batch < len(batch_images):
height, width = batch_images[frame_number_in_batch].shape[:2]
for idx in range(32):
x, y = 0, 0
idx = idx + 1
if idx in kp_dict.keys():
kp_data = kp_dict[idx]
# Convert normalized coordinates to pixel coordinates
x = int(kp_data['x'] * width)
y = int(kp_data['y'] * height)
frame_keypoints.append((x, y))
# Pad or truncate to match expected number of keypoints
if len(frame_keypoints) < n_keypoints:
frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints)))
else:
frame_keypoints = frame_keypoints[:n_keypoints]
keypoints[offset + frame_number_in_batch] = frame_keypoints
print("βœ… Keypoints predicted")
break
except RuntimeError as e:
print(self.pitch_batch_size)
if 'out of memory' in str(e):
if self.pitch_batch_size == 1:
raise e
self.pitch_batch_size = self.pitch_batch_size // 2 if self.pitch_batch_size > 1 else 1
pitch_batch_size = min(self.pitch_batch_size, len(batch_images))
else:
raise e
except Exception as e:
print(f"❌ Error during keypoints prediction: {e}")
raise e
# Combine results
results: list[TVFrameResult] = []
for i, frame_number in enumerate(range(offset, offset + len(batch_images))):
# Get the current frame
frame = batch_images[i] # Use index i for batch_images
# Get detection results for this frame
frame_boxes = bboxes.get(frame_number, [])
frame_keypoints = keypoints.get(frame_number, [(0, 0) for _ in range(n_keypoints)])
# Create result object
result = TVFrameResult(
frame_id=frame_number,
boxes=frame_boxes,
keypoints=frame_keypoints,
)
results.append(result)
print("βœ… Combined results as TVFrameResult")
gc.collect()
if torch.cuda.is_available():
tf.keras.backend.clear_session()
torch.cuda.empty_cache()
torch.cuda.synchronize()
return results