from pathlib import Path from numpy import ndarray import numpy as np from pydantic import BaseModel import sys, os sys.path.append(os.path.dirname(os.path.abspath(__file__))) os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ["OMP_NUM_THREADS"] = "16" os.environ["TF_NUM_INTRAOP_THREADS"] = "16" os.environ["TF_NUM_INTEROP_THREADS"] = "2" os.environ['CUDA_LAUNCH_BLOCKING'] = '0' # Suppress ONNX Runtime warnings os.environ['ORT_LOGGING_LEVEL'] = '3' import logging logging.getLogger('tensorflow').setLevel(logging.ERROR) import tensorflow as tf tf.config.threading.set_intra_op_parallelism_threads(16) tf.config.threading.set_inter_op_parallelism_threads(2) os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) tf.get_logger().setLevel('ERROR') tf.autograph.set_verbosity(0) from tensorflow.keras import mixed_precision mixed_precision.set_global_policy('mixed_float16') tf.config.optimizer.set_jit(True) import torch._dynamo torch._dynamo.config.suppress_errors = True import onnxruntime as ort import gc import torch import torch_tensorrt import torchvision.transforms as T import yaml import cv2 from player import player_detection_result from pitch import process_batch_input, get_cls_net, get_cls_net_l class BoundingBox(BaseModel): x1: int y1: int x2: int y2: int cls_id: int conf: float class TVFrameResult(BaseModel): frame_id: int boxes: list[BoundingBox] keypoints: list[tuple[int, int]] class Miner: """ This class is responsible for: - Loading ML models. - Running batched predictions on images. - Parsing ML model outputs into structured results (TVFrameResult). This class can be modified, but it must have the following to be compatible with the chute: - be named `Miner` - have a `predict_batch` function with the inputs and outputs specified - be stored in a file called `miner.py` which lives in the root of the HFHub repo """ def __init__(self, path_hf_repo: Path) -> None: """ Loads all ML models from the repository. -----(Adjust as needed)---- Args: path_hf_repo (Path): Path to the downloaded HuggingFace Hub repository Returns: None """ global torch device = 'cuda' if torch.cuda.is_available() else 'cpu' providers = [ 'CUDAExecutionProvider', 'CPUExecutionProvider' ] # providers = [ 'CPUExecutionProvider'] model_path = path_hf_repo / "object-detection.onnx" session = ort.InferenceSession(model_path, providers=providers) input_name = session.get_inputs()[0].name height = width = 640 dummy = np.zeros((1, 3, height, width), dtype=np.float32) session.run(None, {input_name: dummy}) model = session self.bbox_model = model print(f"✅ BBox Model Loaded") self.kp_threshold = 0.1 # self.lp_threshold = 0.7 model_kp_path = path_hf_repo / 'SV_kp.engine' model_kp = torch_tensorrt.load(model_kp_path) @torch.inference_mode() def run_inference(model, input_tensor: torch.Tensor): input_tensor = input_tensor.to(device).to(memory_format=torch.channels_last) output = model.module().forward(input_tensor) return output run_inference(model_kp, torch.randn(8, 3, 540, 960, device=device, dtype=torch.float32)) # model_kp_path = path_hf_repo / 'SV_kp' # model_lp_path = path_hf_repo / 'SV_lines' # config_kp_path = path_hf_repo / 'hrnetv2_w48.yaml' # config_lp_path = path_hf_repo / 'hrnetv2_w48_l.yaml' # cfg_kp = yaml.safe_load(open(config_kp_path, 'r')) # cfg_lp = yaml.safe_load(open(config_lp_path, 'r')) # loaded_state_kp = torch.load(model_kp_path, map_location=device) # model_kp = get_cls_net(cfg_kp) # model_kp.load_state_dict(loaded_state_kp) # model_kp.to(device) # model_kp.eval() # loaded_state_lp = torch.load(model_lp_path, map_location=device) # model_lp = get_cls_net_l(cfg_lp) # model_lp.load_state_dict(loaded_state_lp) # model_lp.to(device) # model_lp.eval() # self.transform = T.Resize((540, 960)) self.keypoints_model = model_kp # self.lines_model = model_lp # print("🔥 Warming up compiled models...") # self._warmup_models(device) # Increase batch sizes for better GPU utilization self.player_batch_size = 16 # Increased from 32 self.pitch_batch_size = 8 # Increased from 32 print(f"✅ Keypoints Model Loaded") def __repr__(self) -> str: return f"BBox Model: {type(self.bbox_model).__name__}\nKeypoints Model: {type(self.keypoints_model).__name__}" def predict_batch( self, batch_images: list[ndarray], offset: int, n_keypoints: int, ) -> list[TVFrameResult]: player_batch_size = min(self.player_batch_size, len(batch_images)) bboxes: dict[int, list[BoundingBox]] = {} while True: try: gc.collect() if torch.cuda.is_available(): tf.keras.backend.clear_session() torch.cuda.empty_cache() torch.cuda.synchronize() bbox_model_results, _, _, _ = player_detection_result(batch_images, player_batch_size, self.bbox_model) if bbox_model_results is not None: for frame_number_in_batch, detections in enumerate(bbox_model_results): boxes = [] for detection in detections: # Detection format from player.py: {"id": int, "bbox": [x1, y1, x2, y2], "class_id": int} x1, y1, x2, y2 = detection["bbox"] cls_id = detection["class_id"] # Use a default confidence since it's not provided in the processed results conf = detection["conf"] # Default confidence value boxes.append( BoundingBox( x1=int(x1), y1=int(y1), x2=int(x2), y2=int(y2), cls_id=int(cls_id), conf=float(conf), ) ) bboxes[offset + frame_number_in_batch] = boxes print("✅ BBoxes predicted") break except RuntimeError as e: print(self.player_batch_size) if 'out of memory' in str(e): if self.player_batch_size == 1: raise e self.player_batch_size = self.player_batch_size // 2 if self.player_batch_size > 1 else 1 player_batch_size = min(self.player_batch_size, len(batch_images)) else: raise e except Exception as e: print(f"❌ Error during bbox prediction: {e}") raise e pitch_batch_size = min(self.pitch_batch_size, len(batch_images)) keypoints: dict[int, list[tuple[int, int]]] = {} while True: try: gc.collect() if torch.cuda.is_available(): tf.keras.backend.clear_session() torch.cuda.empty_cache() torch.cuda.synchronize() # Removed expensive memory clearing operations for speed keypoints_result = process_batch_input( batch_images, self.keypoints_model, self.kp_threshold, 'cuda' if torch.cuda.is_available() else 'cpu', batch_size=pitch_batch_size ) if keypoints_result is not None: for frame_number_in_batch, kp_dict in enumerate(keypoints_result): frame_keypoints: list[tuple[int, int]] = [] # Get image dimensions for conversion from normalized to pixel coordinates if frame_number_in_batch < len(batch_images): height, width = batch_images[frame_number_in_batch].shape[:2] for idx in range(32): x, y = 0, 0 idx = idx + 1 if idx in kp_dict.keys(): kp_data = kp_dict[idx] # Convert normalized coordinates to pixel coordinates x = int(kp_data['x'] * width) y = int(kp_data['y'] * height) frame_keypoints.append((x, y)) # Pad or truncate to match expected number of keypoints if len(frame_keypoints) < n_keypoints: frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) else: frame_keypoints = frame_keypoints[:n_keypoints] keypoints[offset + frame_number_in_batch] = frame_keypoints print("✅ Keypoints predicted") break except RuntimeError as e: print(self.pitch_batch_size) if 'out of memory' in str(e): if self.pitch_batch_size == 1: raise e self.pitch_batch_size = self.pitch_batch_size // 2 if self.pitch_batch_size > 1 else 1 pitch_batch_size = min(self.pitch_batch_size, len(batch_images)) else: raise e except Exception as e: print(f"❌ Error during keypoints prediction: {e}") raise e # Combine results results: list[TVFrameResult] = [] for i, frame_number in enumerate(range(offset, offset + len(batch_images))): # Get the current frame frame = batch_images[i] # Use index i for batch_images # Get detection results for this frame frame_boxes = bboxes.get(frame_number, []) frame_keypoints = keypoints.get(frame_number, [(0, 0) for _ in range(n_keypoints)]) # Create result object result = TVFrameResult( frame_id=frame_number, boxes=frame_boxes, keypoints=frame_keypoints, ) results.append(result) print("✅ Combined results as TVFrameResult") gc.collect() if torch.cuda.is_available(): tf.keras.backend.clear_session() torch.cuda.empty_cache() torch.cuda.synchronize() return results