|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Dict |
|
|
import sys |
|
|
import os |
|
|
|
|
|
from numpy import ndarray |
|
|
import numpy as np |
|
|
from pydantic import BaseModel |
|
|
import cv2 |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" |
|
|
os.environ["OMP_NUM_THREADS"] = "16" |
|
|
os.environ["TF_NUM_INTRAOP_THREADS"] = "16" |
|
|
os.environ["TF_NUM_INTEROP_THREADS"] = "2" |
|
|
os.environ["CUDA_LAUNCH_BLOCKING"] = "0" |
|
|
os.environ["ORT_LOGGING_LEVEL"] = "3" |
|
|
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" |
|
|
|
|
|
import logging |
|
|
import tensorflow as tf |
|
|
from tensorflow.keras import mixed_precision |
|
|
import torch._dynamo |
|
|
import torch |
|
|
|
|
|
import gc |
|
|
from ultralytics import YOLO |
|
|
from pitch import process_batch_input, get_cls_net |
|
|
import yaml |
|
|
|
|
|
logging.getLogger("tensorflow").setLevel(logging.ERROR) |
|
|
tf.config.threading.set_intra_op_parallelism_threads(16) |
|
|
tf.config.threading.set_inter_op_parallelism_threads(2) |
|
|
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) |
|
|
tf.get_logger().setLevel("ERROR") |
|
|
tf.autograph.set_verbosity(0) |
|
|
mixed_precision.set_global_policy("mixed_float16") |
|
|
tf.config.optimizer.set_jit(True) |
|
|
torch._dynamo.config.suppress_errors = True |
|
|
|
|
|
|
|
|
class BoundingBox(BaseModel): |
|
|
x1: int |
|
|
y1: int |
|
|
x2: int |
|
|
y2: int |
|
|
cls_id: int |
|
|
conf: float |
|
|
|
|
|
|
|
|
class TVFrameResult(BaseModel): |
|
|
frame_id: int |
|
|
boxes: List[BoundingBox] |
|
|
keypoints: List[Tuple[int, int]] |
|
|
|
|
|
|
|
|
class Miner: |
|
|
QUASI_TOTAL_IOA: float = 0.90 |
|
|
SMALL_CONTAINED_IOA: float = 0.85 |
|
|
SMALL_RATIO_MAX: float = 0.50 |
|
|
SINGLE_PLAYER_HUE_PIVOT: float = 90.0 |
|
|
|
|
|
def __init__(self, path_hf_repo: Path) -> None: |
|
|
print(path_hf_repo / "objdetect.pt") |
|
|
self.bbox_model = YOLO(path_hf_repo / "objdetect.pt") |
|
|
print(" BBox Model (objdetect.pt) Loaded") |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
|
|
|
|
|
|
model_kp_path = path_hf_repo / 'keypoint' |
|
|
config_kp_path = path_hf_repo / 'hrnetv2_w48.yaml' |
|
|
cfg_kp = yaml.safe_load(open(config_kp_path, 'r')) |
|
|
|
|
|
loaded_state_kp = torch.load(model_kp_path, map_location=device) |
|
|
model = get_cls_net(cfg_kp) |
|
|
model.load_state_dict(loaded_state_kp) |
|
|
model.to(device) |
|
|
model.eval() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.keypoints_model = model |
|
|
self.kp_threshold = 0.1 |
|
|
self.pitch_batch_size = 8 |
|
|
print("✅ Keypoints Model Loaded") |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return ( |
|
|
f"BBox Model: {type(self.bbox_model).__name__}\n" |
|
|
f"Keypoints Model: {type(self.keypoints_model).__name__}" |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def _clip_box_to_image(x1: int, y1: int, x2: int, y2: int, w: int, h: int) -> Tuple[int, int, int, int]: |
|
|
x1 = max(0, min(int(x1), w - 1)) |
|
|
y1 = max(0, min(int(y1), h - 1)) |
|
|
x2 = max(0, min(int(x2), w - 1)) |
|
|
y2 = max(0, min(int(y2), h - 1)) |
|
|
if x2 <= x1: |
|
|
x2 = min(w - 1, x1 + 1) |
|
|
if y2 <= y1: |
|
|
y2 = min(h - 1, y1 + 1) |
|
|
return x1, y1, x2, y2 |
|
|
|
|
|
@staticmethod |
|
|
def _area(bb: BoundingBox) -> int: |
|
|
return max(0, bb.x2 - bb.x1) * max(0, bb.y2 - bb.y1) |
|
|
|
|
|
@staticmethod |
|
|
def _intersect_area(a: BoundingBox, b: BoundingBox) -> int: |
|
|
ix1 = max(a.x1, b.x1) |
|
|
iy1 = max(a.y1, b.y1) |
|
|
ix2 = min(a.x2, b.x2) |
|
|
iy2 = min(a.y2, b.y2) |
|
|
if ix2 <= ix1 or iy2 <= iy1: |
|
|
return 0 |
|
|
return (ix2 - ix1) * (iy2 - iy1) |
|
|
|
|
|
@staticmethod |
|
|
def _center(bb: BoundingBox) -> Tuple[float, float]: |
|
|
return (0.5 * (bb.x1 + bb.x2), 0.5 * (bb.y1 + bb.y2)) |
|
|
|
|
|
@staticmethod |
|
|
def _mean_hs(img_bgr: np.ndarray) -> Tuple[float, float]: |
|
|
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) |
|
|
return float(np.mean(hsv[:, :, 0])), float(np.mean(hsv[:, :, 1])) |
|
|
|
|
|
def _hs_feature_from_roi(self, img_bgr: np.ndarray, box: BoundingBox) -> np.ndarray: |
|
|
H, W = img_bgr.shape[:2] |
|
|
x1, y1, x2, y2 = self._clip_box_to_image(box.x1, box.y1, box.x2, box.y2, W, H) |
|
|
roi = img_bgr[y1:y2, x1:x2] |
|
|
if roi.size == 0: |
|
|
return np.array([0.0, 0.0], dtype=np.float32) |
|
|
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) |
|
|
lower_green = np.array([35, 60, 60], dtype=np.uint8) |
|
|
upper_green = np.array([85, 255, 255], dtype=np.uint8) |
|
|
green_mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
non_green_mask = cv2.bitwise_not(green_mask) |
|
|
num_non_green = int(np.count_nonzero(non_green_mask)) |
|
|
total = hsv.shape[0] * hsv.shape[1] |
|
|
if num_non_green > max(50, total // 20): |
|
|
h_vals = hsv[:, :, 0][non_green_mask > 0] |
|
|
s_vals = hsv[:, :, 1][non_green_mask > 0] |
|
|
h_mean = float(np.mean(h_vals)) if h_vals.size else 0.0 |
|
|
s_mean = float(np.mean(s_vals)) if s_vals.size else 0.0 |
|
|
else: |
|
|
h_mean, s_mean = self._mean_hs(roi) |
|
|
return np.array([h_mean, s_mean], dtype=np.float32) |
|
|
|
|
|
def _ioa(self, a: BoundingBox, b: BoundingBox) -> float: |
|
|
inter = self._intersect_area(a, b) |
|
|
aa = self._area(a) |
|
|
if aa <= 0: |
|
|
return 0.0 |
|
|
return inter / aa |
|
|
|
|
|
def suppress_quasi_total_containment(self, boxes: List[BoundingBox]) -> List[BoundingBox]: |
|
|
if len(boxes) <= 1: |
|
|
return boxes |
|
|
keep = [True] * len(boxes) |
|
|
for i in range(len(boxes)): |
|
|
if not keep[i]: |
|
|
continue |
|
|
for j in range(len(boxes)): |
|
|
if i == j or not keep[j]: |
|
|
continue |
|
|
ioa_i_in_j = self._ioa(boxes[i], boxes[j]) |
|
|
if ioa_i_in_j >= self.QUASI_TOTAL_IOA: |
|
|
keep[i] = False |
|
|
break |
|
|
return [bb for bb, k in zip(boxes, keep) if k] |
|
|
|
|
|
def suppress_small_contained(self, boxes: List[BoundingBox]) -> List[BoundingBox]: |
|
|
if len(boxes) <= 1: |
|
|
return boxes |
|
|
keep = [True] * len(boxes) |
|
|
areas = [self._area(bb) for bb in boxes] |
|
|
for i in range(len(boxes)): |
|
|
if not keep[i]: |
|
|
continue |
|
|
for j in range(len(boxes)): |
|
|
if i == j or not keep[j]: |
|
|
continue |
|
|
ai, aj = areas[i], areas[j] |
|
|
if ai == 0 or aj == 0: |
|
|
continue |
|
|
if ai <= aj: |
|
|
ratio = ai / aj |
|
|
if ratio <= self.SMALL_RATIO_MAX: |
|
|
ioa_i_in_j = self._ioa(boxes[i], boxes[j]) |
|
|
if ioa_i_in_j >= self.SMALL_CONTAINED_IOA: |
|
|
keep[i] = False |
|
|
break |
|
|
else: |
|
|
ratio = aj / ai |
|
|
if ratio <= self.SMALL_RATIO_MAX: |
|
|
ioa_j_in_i = self._ioa(boxes[j], boxes[i]) |
|
|
if ioa_j_in_i >= self.SMALL_CONTAINED_IOA: |
|
|
keep[j] = False |
|
|
return [bb for bb, k in zip(boxes, keep) if k] |
|
|
|
|
|
def _assign_players_two_clusters(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0) |
|
|
_, labels, centers = cv2.kmeans( |
|
|
np.float32(features), |
|
|
K=2, |
|
|
bestLabels=None, |
|
|
criteria=criteria, |
|
|
attempts=5, |
|
|
flags=cv2.KMEANS_PP_CENTERS, |
|
|
) |
|
|
return labels.reshape(-1), centers |
|
|
|
|
|
def _reclass_extra_goalkeepers(self, img_bgr: np.ndarray, boxes: List[BoundingBox], cluster_centers: np.ndarray | None) -> None: |
|
|
gk_idxs = [i for i, bb in enumerate(boxes) if int(bb.cls_id) == 1] |
|
|
if len(gk_idxs) <= 1: |
|
|
return |
|
|
gk_idxs_sorted = sorted(gk_idxs, key=lambda i: boxes[i].conf, reverse=True) |
|
|
keep_gk_idx = gk_idxs_sorted[0] |
|
|
to_reclass = gk_idxs_sorted[1:] |
|
|
for gki in to_reclass: |
|
|
hs_gk = self._hs_feature_from_roi(img_bgr, boxes[gki]) |
|
|
if cluster_centers is not None: |
|
|
d0 = float(np.linalg.norm(hs_gk - cluster_centers[0])) |
|
|
d1 = float(np.linalg.norm(hs_gk - cluster_centers[1])) |
|
|
assign_cls = 6 if d0 <= d1 else 7 |
|
|
else: |
|
|
assign_cls = 6 if float(hs_gk[0]) < self.SINGLE_PLAYER_HUE_PIVOT else 7 |
|
|
boxes[gki].cls_id = int(assign_cls) |
|
|
|
|
|
def predict_batch(self, batch_images: List[ndarray], offset: int, n_keypoints: int) -> List[TVFrameResult]: |
|
|
bboxes: Dict[int, List[BoundingBox]] = {} |
|
|
bbox_model_results = self.bbox_model.predict(batch_images) |
|
|
if bbox_model_results is not None: |
|
|
for frame_idx_in_batch, detection in enumerate(bbox_model_results): |
|
|
if not hasattr(detection, "boxes") or detection.boxes is None: |
|
|
continue |
|
|
boxes: List[BoundingBox] = [] |
|
|
for box in detection.boxes.data: |
|
|
x1, y1, x2, y2, conf, cls_id = box.tolist() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
boxes.append( |
|
|
BoundingBox( |
|
|
x1=int(x1), |
|
|
y1=int(y1), |
|
|
x2=int(x2), |
|
|
y2=int(y2), |
|
|
cls_id=int(cls_id), |
|
|
conf=float(conf), |
|
|
) |
|
|
) |
|
|
footballs = [bb for bb in boxes if int(bb.cls_id) == 0] |
|
|
if len(footballs) > 1: |
|
|
best_ball = max(footballs, key=lambda b: b.conf) |
|
|
boxes = [bb for bb in boxes if int(bb.cls_id) != 0] |
|
|
boxes.append(best_ball) |
|
|
boxes = self.suppress_quasi_total_containment(boxes) |
|
|
boxes = self.suppress_small_contained(boxes) |
|
|
img_bgr = batch_images[frame_idx_in_batch] |
|
|
player_indices: List[int] = [] |
|
|
player_feats: List[np.ndarray] = [] |
|
|
for i, bb in enumerate(boxes): |
|
|
if int(bb.cls_id) == 2: |
|
|
hs = self._hs_feature_from_roi(img_bgr, bb) |
|
|
player_indices.append(i) |
|
|
player_feats.append(hs) |
|
|
cluster_centers = None |
|
|
n_players = len(player_feats) |
|
|
if n_players >= 2: |
|
|
feats = np.vstack(player_feats) |
|
|
labels, centers = self._assign_players_two_clusters(feats) |
|
|
order = np.argsort(centers[:, 0]) |
|
|
centers = centers[order] |
|
|
remap = {old_idx: new_idx for new_idx, old_idx in enumerate(order)} |
|
|
labels = np.vectorize(remap.get)(labels) |
|
|
cluster_centers = centers |
|
|
for idx_in_list, lbl in zip(player_indices, labels): |
|
|
boxes[idx_in_list].cls_id = 6 if int(lbl) == 0 else 7 |
|
|
elif n_players == 1: |
|
|
hue, _ = player_feats[0] |
|
|
boxes[player_indices[0]].cls_id = 6 if float(hue) < self.SINGLE_PLAYER_HUE_PIVOT else 7 |
|
|
self._reclass_extra_goalkeepers(img_bgr, boxes, cluster_centers) |
|
|
bboxes[offset + frame_idx_in_batch] = boxes |
|
|
|
|
|
pitch_batch_size = min(self.pitch_batch_size, len(batch_images)) |
|
|
keypoints: Dict[int, List[Tuple[int, int]]] = {} |
|
|
while True: |
|
|
|
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
tf.keras.backend.clear_session() |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.synchronize() |
|
|
device_str = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
keypoints_result = process_batch_input( |
|
|
batch_images, |
|
|
self.keypoints_model, |
|
|
self.kp_threshold, |
|
|
device_str, |
|
|
batch_size=pitch_batch_size, |
|
|
) |
|
|
if keypoints_result is not None and len(keypoints_result) > 0: |
|
|
for frame_number_in_batch, kp_dict in enumerate(keypoints_result): |
|
|
if frame_number_in_batch >= len(batch_images): |
|
|
break |
|
|
frame_keypoints: List[Tuple[int, int]] = [] |
|
|
try: |
|
|
height, width = batch_images[frame_number_in_batch].shape[:2] |
|
|
if kp_dict is not None and isinstance(kp_dict, dict): |
|
|
for idx in range(32): |
|
|
x, y = 0, 0 |
|
|
kp_idx = idx + 1 |
|
|
if kp_idx in kp_dict: |
|
|
try: |
|
|
kp_data = kp_dict[kp_idx] |
|
|
if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data: |
|
|
x = int(kp_data["x"] * width) |
|
|
y = int(kp_data["y"] * height) |
|
|
except (KeyError, TypeError, ValueError): |
|
|
pass |
|
|
frame_keypoints.append((x, y)) |
|
|
except (IndexError, ValueError, AttributeError): |
|
|
frame_keypoints = [(0, 0)] * 32 |
|
|
if len(frame_keypoints) < n_keypoints: |
|
|
frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) |
|
|
else: |
|
|
frame_keypoints = frame_keypoints[:n_keypoints] |
|
|
keypoints[offset + frame_number_in_batch] = frame_keypoints |
|
|
print("✅ Keypoints predicted") |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
results: List[TVFrameResult] = [] |
|
|
for frame_number in range(offset, offset + len(batch_images)): |
|
|
frame_boxes = bboxes.get(frame_number, []) |
|
|
frame_keypoints = keypoints.get(frame_number, [(0, 0) for _ in range(n_keypoints)]) |
|
|
result = TVFrameResult( |
|
|
frame_id=frame_number, |
|
|
boxes=frame_boxes, |
|
|
keypoints=frame_keypoints, |
|
|
) |
|
|
results.append(result) |
|
|
|
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
tf.keras.backend.clear_session() |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
return results |