|
|
import cv2 |
|
|
import numpy as np |
|
|
from sklearn.cluster import KMeans |
|
|
import warnings |
|
|
import time |
|
|
|
|
|
import torch |
|
|
from torchvision.ops import batched_nms |
|
|
from numpy import ndarray |
|
|
|
|
|
warnings.filterwarnings('ignore', category=RuntimeWarning) |
|
|
warnings.filterwarnings('ignore', category=FutureWarning) |
|
|
warnings.filterwarnings('ignore', category=UserWarning) |
|
|
|
|
|
|
|
|
import logging |
|
|
logging.getLogger('sklearn').setLevel(logging.ERROR) |
|
|
|
|
|
def get_grass_color(img): |
|
|
|
|
|
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_green = np.array([30, 40, 40]) |
|
|
upper_green = np.array([80, 255, 255]) |
|
|
|
|
|
|
|
|
mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
|
|
|
masked_img = cv2.bitwise_and(img, img, mask=mask) |
|
|
grass_color = cv2.mean(img, mask=mask) |
|
|
return grass_color[:3] |
|
|
|
|
|
def get_players_boxes(frame, result): |
|
|
players_imgs = [] |
|
|
players_boxes = [] |
|
|
for (box, score, cls) in result: |
|
|
label = int(cls) |
|
|
if label == 0: |
|
|
x1, y1, x2, y2 = box.astype(int) |
|
|
player_img = frame[y1: y2, x1: x2] |
|
|
players_imgs.append(player_img) |
|
|
players_boxes.append([box, score, cls]) |
|
|
return players_imgs, players_boxes |
|
|
|
|
|
def get_kits_colors(players, grass_hsv=None, frame=None): |
|
|
kits_colors = [] |
|
|
if grass_hsv is None: |
|
|
grass_color = get_grass_color(frame) |
|
|
grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
|
|
|
|
|
for player_img in players: |
|
|
|
|
|
if player_img is None or player_img.size == 0 or len(player_img.shape) != 3: |
|
|
continue |
|
|
|
|
|
|
|
|
hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) |
|
|
|
|
|
|
|
|
lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) |
|
|
upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) |
|
|
|
|
|
|
|
|
mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
|
|
|
|
|
|
mask = cv2.bitwise_not(mask) |
|
|
upper_mask = np.zeros(player_img.shape[:2], np.uint8) |
|
|
upper_mask[0:player_img.shape[0]//2, 0:player_img.shape[1]] = 255 |
|
|
mask = cv2.bitwise_and(mask, upper_mask) |
|
|
|
|
|
kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) |
|
|
|
|
|
kits_colors.append(kit_color) |
|
|
return kits_colors |
|
|
|
|
|
def get_kits_classifier(kits_colors): |
|
|
if len(kits_colors) == 0: |
|
|
return None |
|
|
if len(kits_colors) == 1: |
|
|
|
|
|
return None |
|
|
kits_kmeans = KMeans(n_clusters=2) |
|
|
kits_kmeans.fit(kits_colors) |
|
|
return kits_kmeans |
|
|
|
|
|
def classify_kits(kits_classifer, kits_colors): |
|
|
if kits_classifer is None or len(kits_colors) == 0: |
|
|
return np.array([0]) |
|
|
team = kits_classifer.predict(kits_colors) |
|
|
return team |
|
|
|
|
|
def get_left_team_label(players_boxes, kits_colors, kits_clf): |
|
|
left_team_label = 0 |
|
|
team_0 = [] |
|
|
team_1 = [] |
|
|
|
|
|
for i in range(len(players_boxes)): |
|
|
x1, y1, x2, y2 = players_boxes[i][0].astype(int) |
|
|
team = classify_kits(kits_clf, [kits_colors[i]]).item() |
|
|
if team == 0: |
|
|
team_0.append(np.array([x1])) |
|
|
else: |
|
|
team_1.append(np.array([x1])) |
|
|
|
|
|
team_0 = np.array(team_0) |
|
|
team_1 = np.array(team_1) |
|
|
|
|
|
|
|
|
avg_team_0 = np.average(team_0) if len(team_0) > 0 else 0 |
|
|
avg_team_1 = np.average(team_1) if len(team_1) > 0 else 0 |
|
|
|
|
|
if avg_team_0 - avg_team_1 > 0: |
|
|
left_team_label = 1 |
|
|
|
|
|
return left_team_label |
|
|
|
|
|
def check_box_boundaries(boxes, img_height, img_width): |
|
|
""" |
|
|
Check if bounding boxes are within image boundaries and clip them if necessary. |
|
|
|
|
|
Args: |
|
|
boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] format |
|
|
img_height: height of the image |
|
|
img_width: width of the image |
|
|
|
|
|
Returns: |
|
|
valid_boxes: numpy array of valid boxes within boundaries |
|
|
valid_indices: indices of valid boxes |
|
|
""" |
|
|
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
|
|
|
|
|
|
|
|
valid_mask = (x1 >= 0) & (y1 >= 0) & (x2 < img_width) & (y2 < img_height) & (x1 < x2) & (y1 < y2) |
|
|
|
|
|
if not np.any(valid_mask): |
|
|
return np.array([]), np.array([]) |
|
|
|
|
|
valid_boxes = boxes[valid_mask] |
|
|
valid_indices = np.where(valid_mask)[0] |
|
|
|
|
|
|
|
|
valid_boxes[:, 0] = np.clip(valid_boxes[:, 0], 0, img_width - 1) |
|
|
valid_boxes[:, 1] = np.clip(valid_boxes[:, 1], 0, img_height - 1) |
|
|
valid_boxes[:, 2] = np.clip(valid_boxes[:, 2], 0, img_width - 1) |
|
|
valid_boxes[:, 3] = np.clip(valid_boxes[:, 3], 0, img_height - 1) |
|
|
|
|
|
return valid_boxes, valid_indices |
|
|
|
|
|
def process_team_identification_batch(frames, results, kits_clf, left_team_label, grass_hsv): |
|
|
""" |
|
|
Process team identification and label formatting for batch results. |
|
|
|
|
|
Args: |
|
|
frames: list of frames |
|
|
results: list of detection results for each frame |
|
|
kits_clf: trained kit classifier |
|
|
left_team_label: label for left team |
|
|
grass_hsv: grass color in HSV format |
|
|
|
|
|
Returns: |
|
|
processed_results: list of processed results with team identification |
|
|
""" |
|
|
processed_results = [] |
|
|
|
|
|
for frame_idx, frame in enumerate(frames): |
|
|
frame_results = [] |
|
|
frame_detections = results[frame_idx] |
|
|
|
|
|
if not frame_detections: |
|
|
processed_results.append([]) |
|
|
continue |
|
|
|
|
|
|
|
|
players_imgs = [] |
|
|
players_boxes = [] |
|
|
player_indices = [] |
|
|
|
|
|
for idx, (box, score, cls) in enumerate(frame_detections): |
|
|
label = int(cls) |
|
|
if label == 0: |
|
|
x1, y1, x2, y2 = box.astype(int) |
|
|
|
|
|
|
|
|
if (x1 >= 0 and y1 >= 0 and x2 < frame.shape[1] and y2 < frame.shape[0] and x1 < x2 and y1 < y2): |
|
|
player_img = frame[y1:y2, x1:x2] |
|
|
if player_img.size > 0: |
|
|
players_imgs.append(player_img) |
|
|
players_boxes.append([box, score, cls]) |
|
|
player_indices.append(idx) |
|
|
|
|
|
|
|
|
player_team_map = {} |
|
|
|
|
|
|
|
|
if players_imgs and kits_clf is not None: |
|
|
kits_colors = get_kits_colors(players_imgs, grass_hsv) |
|
|
teams = classify_kits(kits_clf, kits_colors) |
|
|
|
|
|
|
|
|
for i, team in enumerate(teams): |
|
|
player_team_map[player_indices[i]] = team.item() |
|
|
|
|
|
id = 0 |
|
|
|
|
|
for idx, (box, score, cls) in enumerate(frame_detections): |
|
|
label = int(cls) |
|
|
x1, y1, x2, y2 = box.astype(int) |
|
|
|
|
|
|
|
|
valid_boxes, valid_indices = check_box_boundaries( |
|
|
np.array([[x1, y1, x2, y2]]), frame.shape[0], frame.shape[1] |
|
|
) |
|
|
|
|
|
if len(valid_boxes) == 0: |
|
|
continue |
|
|
|
|
|
x1, y1, x2, y2 = valid_boxes[0].astype(int) |
|
|
|
|
|
|
|
|
if label == 0: |
|
|
if players_imgs and kits_clf is not None and idx in player_team_map: |
|
|
team = player_team_map[idx] |
|
|
if team == left_team_label: |
|
|
final_label = 6 |
|
|
else: |
|
|
final_label = 7 |
|
|
else: |
|
|
final_label = 6 |
|
|
|
|
|
elif label == 1: |
|
|
final_label = 1 |
|
|
|
|
|
elif label == 2: |
|
|
final_label = 0 |
|
|
|
|
|
elif label == 3 or label == 4: |
|
|
final_label = 3 |
|
|
|
|
|
else: |
|
|
continue |
|
|
|
|
|
|
|
|
frame_results.append({ |
|
|
"id": int(id), |
|
|
"bbox": [int(x1), int(y1), int(x2), int(y2)], |
|
|
"class_id": int(final_label), |
|
|
"conf": float(score) |
|
|
}) |
|
|
id = id + 1 |
|
|
|
|
|
processed_results.append(frame_results) |
|
|
|
|
|
return processed_results |
|
|
|
|
|
def convert_numpy_types(obj): |
|
|
"""Convert numpy types to native Python types for JSON serialization.""" |
|
|
if isinstance(obj, np.integer): |
|
|
return int(obj) |
|
|
elif isinstance(obj, np.floating): |
|
|
return float(obj) |
|
|
elif isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, dict): |
|
|
return {key: convert_numpy_types(value) for key, value in obj.items()} |
|
|
elif isinstance(obj, list): |
|
|
return [convert_numpy_types(item) for item in obj] |
|
|
else: |
|
|
return obj |
|
|
|
|
|
def pre_process_img(frames, scale): |
|
|
imgs = np.stack([cv2.resize(frame, (int(scale), int(scale))) for frame in frames]) |
|
|
imgs = imgs.transpose(0, 3, 1, 2) |
|
|
imgs = imgs.astype(np.float32) / 255.0 |
|
|
return imgs |
|
|
|
|
|
def post_process_output(outputs, x_scale, y_scale, conf_thresh=0.6, nms_thresh=0.75): |
|
|
B, C, N = outputs.shape |
|
|
outputs = torch.from_numpy(outputs) |
|
|
outputs = outputs.permute(0, 2, 1) |
|
|
boxes = outputs[..., :4] |
|
|
class_scores = 1 / (1 + torch.exp(-outputs[..., 4:])) |
|
|
conf, class_id = class_scores.max(dim=2) |
|
|
|
|
|
mask = conf > conf_thresh |
|
|
|
|
|
for i in range(class_id.shape[0]): |
|
|
|
|
|
ball_idx = np.where(class_id[i] == 2)[0] |
|
|
if ball_idx.size > 0: |
|
|
|
|
|
top = ball_idx[np.argmax(conf[i, ball_idx])] |
|
|
if conf[i, top] > 0.55: |
|
|
mask[i, top] = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
batch_idx, pred_idx = mask.nonzero(as_tuple=True) |
|
|
|
|
|
if len(batch_idx) == 0: |
|
|
return [[] for _ in range(B)] |
|
|
|
|
|
boxes = boxes[batch_idx, pred_idx] |
|
|
conf = conf[batch_idx, pred_idx] |
|
|
class_id = class_id[batch_idx, pred_idx] |
|
|
|
|
|
x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
|
|
x1 = (x - w / 2) * x_scale |
|
|
y1 = (y - h / 2) * y_scale |
|
|
x2 = (x + w / 2) * x_scale |
|
|
y2 = (y + h / 2) * y_scale |
|
|
boxes_xyxy = torch.stack([x1, y1, x2, y2], dim=1) |
|
|
|
|
|
max_coord = 1e4 |
|
|
offset = batch_idx.to(boxes_xyxy) * max_coord |
|
|
boxes_for_nms = boxes_xyxy + offset[:, None] |
|
|
|
|
|
keep = batched_nms(boxes_for_nms, conf, batch_idx, nms_thresh) |
|
|
|
|
|
boxes_final = boxes_xyxy[keep] |
|
|
conf_final = conf[keep] |
|
|
class_final = class_id[keep] |
|
|
batch_final = batch_idx[keep] |
|
|
|
|
|
results = [[] for _ in range(B)] |
|
|
for b in range(B): |
|
|
mask_b = batch_final == b |
|
|
if mask_b.sum() == 0: |
|
|
continue |
|
|
results[b] = list(zip(boxes_final[mask_b].numpy(), |
|
|
conf_final[mask_b].numpy(), |
|
|
class_final[mask_b].numpy())) |
|
|
return results |
|
|
|
|
|
def player_detection_result(frames: list[ndarray], batch_size, model, kits_clf=None, left_team_label=None, grass_hsv=None): |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
height, width = frames[0].shape[:2] |
|
|
scale = 640.0 |
|
|
x_scale = width / scale |
|
|
y_scale = height / scale |
|
|
|
|
|
|
|
|
|
|
|
infer_time = time.time() |
|
|
kits_clf = kits_clf |
|
|
left_team_label = left_team_label |
|
|
grass_hsv = grass_hsv |
|
|
results = [] |
|
|
for i in range(0, len(frames), batch_size): |
|
|
if i + batch_size > len(frames): |
|
|
batch_size = len(frames) - i |
|
|
batch_frames = frames[i:i + batch_size] |
|
|
imgs = pre_process_img(batch_frames, scale) |
|
|
|
|
|
input_name = model.get_inputs()[0].name |
|
|
outputs = model.run(None, {input_name: imgs})[0] |
|
|
raw_results = post_process_output(np.array(outputs), x_scale, y_scale) |
|
|
|
|
|
if kits_clf is None or left_team_label is None or grass_hsv is None: |
|
|
|
|
|
first_frame = batch_frames[0] |
|
|
first_frame_results = raw_results[0] if raw_results else [] |
|
|
|
|
|
if first_frame_results: |
|
|
players_imgs, players_boxes = get_players_boxes(first_frame, first_frame_results) |
|
|
if players_imgs: |
|
|
grass_color = get_grass_color(first_frame) |
|
|
grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
|
|
kits_colors = get_kits_colors(players_imgs, grass_hsv) |
|
|
if kits_colors: |
|
|
kits_clf = get_kits_classifier(kits_colors) |
|
|
if kits_clf is not None: |
|
|
left_team_label = int(get_left_team_label(players_boxes, kits_colors, kits_clf)) |
|
|
|
|
|
|
|
|
processed_results = process_team_identification_batch( |
|
|
batch_frames, raw_results, kits_clf, left_team_label, grass_hsv |
|
|
) |
|
|
|
|
|
processed_results = convert_numpy_types(processed_results) |
|
|
results.extend(processed_results) |
|
|
|
|
|
|
|
|
return results, kits_clf, left_team_label, grass_hsv |