import cv2 import numpy as np from sklearn.cluster import KMeans import warnings import time import torch from torchvision.ops import batched_nms from numpy import ndarray # Suppress ALL runtime and sklearn warnings warnings.filterwarnings('ignore', category=RuntimeWarning) warnings.filterwarnings('ignore', category=FutureWarning) warnings.filterwarnings('ignore', category=UserWarning) # Suppress sklearn warnings specifically import logging logging.getLogger('sklearn').setLevel(logging.ERROR) def get_grass_color(img): # Convert image to HSV color space hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # Define range of green color in HSV lower_green = np.array([30, 40, 40]) upper_green = np.array([80, 255, 255]) # Threshold the HSV image to get only green colors mask = cv2.inRange(hsv, lower_green, upper_green) # Calculate the mean value of the pixels that are not masked masked_img = cv2.bitwise_and(img, img, mask=mask) grass_color = cv2.mean(img, mask=mask) return grass_color[:3] def get_players_boxes(frame, result): players_imgs = [] players_boxes = [] for (box, score, cls) in result: label = int(cls) if label == 0: x1, y1, x2, y2 = box.astype(int) player_img = frame[y1: y2, x1: x2] players_imgs.append(player_img) players_boxes.append([box, score, cls]) return players_imgs, players_boxes def get_kits_colors(players, grass_hsv=None, frame=None): kits_colors = [] if grass_hsv is None: grass_color = get_grass_color(frame) grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) for player_img in players: # Skip empty or invalid images if player_img is None or player_img.size == 0 or len(player_img.shape) != 3: continue # Convert image to HSV color space hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) # Define range of green color in HSV lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) # Threshold the HSV image to get only green colors mask = cv2.inRange(hsv, lower_green, upper_green) # Bitwise-AND mask and original image mask = cv2.bitwise_not(mask) upper_mask = np.zeros(player_img.shape[:2], np.uint8) upper_mask[0:player_img.shape[0]//2, 0:player_img.shape[1]] = 255 mask = cv2.bitwise_and(mask, upper_mask) kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) kits_colors.append(kit_color) return kits_colors def get_kits_classifier(kits_colors): if len(kits_colors) == 0: return None if len(kits_colors) == 1: # Only one kit color, create a dummy classifier return None kits_kmeans = KMeans(n_clusters=2) kits_kmeans.fit(kits_colors) return kits_kmeans def classify_kits(kits_classifer, kits_colors): if kits_classifer is None or len(kits_colors) == 0: return np.array([0]) # Default to team 0 team = kits_classifer.predict(kits_colors) return team def get_left_team_label(players_boxes, kits_colors, kits_clf): left_team_label = 0 team_0 = [] team_1 = [] for i in range(len(players_boxes)): x1, y1, x2, y2 = players_boxes[i][0].astype(int) team = classify_kits(kits_clf, [kits_colors[i]]).item() if team == 0: team_0.append(np.array([x1])) else: team_1.append(np.array([x1])) team_0 = np.array(team_0) team_1 = np.array(team_1) # Safely calculate averages with fallback for empty arrays avg_team_0 = np.average(team_0) if len(team_0) > 0 else 0 avg_team_1 = np.average(team_1) if len(team_1) > 0 else 0 if avg_team_0 - avg_team_1 > 0: left_team_label = 1 return left_team_label def check_box_boundaries(boxes, img_height, img_width): """ Check if bounding boxes are within image boundaries and clip them if necessary. Args: boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] format img_height: height of the image img_width: width of the image Returns: valid_boxes: numpy array of valid boxes within boundaries valid_indices: indices of valid boxes """ x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] # Check if boxes are within boundaries valid_mask = (x1 >= 0) & (y1 >= 0) & (x2 < img_width) & (y2 < img_height) & (x1 < x2) & (y1 < y2) if not np.any(valid_mask): return np.array([]), np.array([]) valid_boxes = boxes[valid_mask] valid_indices = np.where(valid_mask)[0] # Clip boxes to image boundaries valid_boxes[:, 0] = np.clip(valid_boxes[:, 0], 0, img_width - 1) # x1 valid_boxes[:, 1] = np.clip(valid_boxes[:, 1], 0, img_height - 1) # y1 valid_boxes[:, 2] = np.clip(valid_boxes[:, 2], 0, img_width - 1) # x2 valid_boxes[:, 3] = np.clip(valid_boxes[:, 3], 0, img_height - 1) # y2 return valid_boxes, valid_indices def process_team_identification_batch(frames, results, kits_clf, left_team_label, grass_hsv): """ Process team identification and label formatting for batch results. Args: frames: list of frames results: list of detection results for each frame kits_clf: trained kit classifier left_team_label: label for left team grass_hsv: grass color in HSV format Returns: processed_results: list of processed results with team identification """ processed_results = [] for frame_idx, frame in enumerate(frames): frame_results = [] frame_detections = results[frame_idx] if not frame_detections: processed_results.append([]) continue # Extract player boxes and images players_imgs = [] players_boxes = [] player_indices = [] for idx, (box, score, cls) in enumerate(frame_detections): label = int(cls) if label == 0: # Player detection x1, y1, x2, y2 = box.astype(int) # Check boundaries if (x1 >= 0 and y1 >= 0 and x2 < frame.shape[1] and y2 < frame.shape[0] and x1 < x2 and y1 < y2): player_img = frame[y1:y2, x1:x2] if player_img.size > 0: # Ensure valid image players_imgs.append(player_img) players_boxes.append([box, score, cls]) player_indices.append(idx) # Initialize player team mapping player_team_map = {} # Process team identification if we have players if players_imgs and kits_clf is not None: kits_colors = get_kits_colors(players_imgs, grass_hsv) teams = classify_kits(kits_clf, kits_colors) # Create mapping from player index to team for i, team in enumerate(teams): player_team_map[player_indices[i]] = team.item() id = 0 # Process all detections with team identification for idx, (box, score, cls) in enumerate(frame_detections): label = int(cls) x1, y1, x2, y2 = box.astype(int) # Check boundaries valid_boxes, valid_indices = check_box_boundaries( np.array([[x1, y1, x2, y2]]), frame.shape[0], frame.shape[1] ) if len(valid_boxes) == 0: continue x1, y1, x2, y2 = valid_boxes[0].astype(int) # Apply team identification logic if label == 0: # Player if players_imgs and kits_clf is not None and idx in player_team_map: team = player_team_map[idx] if team == left_team_label: final_label = 6 # Player-L (Left team) else: final_label = 7 # Player-R (Right team) else: final_label = 6 # Default player label elif label == 1: # Goalkeeper final_label = 1 # GK elif label == 2: # Ball final_label = 0 # Ball elif label == 3 or label == 4: # Referee or other final_label = 3 # Referee else: continue # final_label = int(label) # Keep original label, ensure it's int frame_results.append({ "id": int(id), "bbox": [int(x1), int(y1), int(x2), int(y2)], "class_id": int(final_label), "conf": float(score) }) id = id + 1 processed_results.append(frame_results) return processed_results def convert_numpy_types(obj): """Convert numpy types to native Python types for JSON serialization.""" if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_numpy_types(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(item) for item in obj] else: return obj def pre_process_img(frames, scale): imgs = np.stack([cv2.resize(frame, (int(scale), int(scale))) for frame in frames]) imgs = imgs.transpose(0, 3, 1, 2) imgs = imgs.astype(np.float32) / 255.0 # Normalize return imgs def post_process_output(outputs, x_scale, y_scale, conf_thresh=0.6, nms_thresh=0.75): B, C, N = outputs.shape outputs = torch.from_numpy(outputs) outputs = outputs.permute(0, 2, 1) boxes = outputs[..., :4] class_scores = 1 / (1 + torch.exp(-outputs[..., 4:])) conf, class_id = class_scores.max(dim=2) mask = conf > conf_thresh for i in range(class_id.shape[0]): # loop over batch # Find detections that are balls ball_idx = np.where(class_id[i] == 2)[0] if ball_idx.size > 0: # Pick the one with the highest confidence top = ball_idx[np.argmax(conf[i, ball_idx])] if conf[i, top] > 0.55: # apply confidence threshold mask[i, top] = True # ball_mask = (class_id == 2) & (conf > 0.51) # mask = mask | ball_mask batch_idx, pred_idx = mask.nonzero(as_tuple=True) if len(batch_idx) == 0: return [[] for _ in range(B)] boxes = boxes[batch_idx, pred_idx] conf = conf[batch_idx, pred_idx] class_id = class_id[batch_idx, pred_idx] x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x1 = (x - w / 2) * x_scale y1 = (y - h / 2) * y_scale x2 = (x + w / 2) * x_scale y2 = (y + h / 2) * y_scale boxes_xyxy = torch.stack([x1, y1, x2, y2], dim=1) max_coord = 1e4 offset = batch_idx.to(boxes_xyxy) * max_coord boxes_for_nms = boxes_xyxy + offset[:, None] keep = batched_nms(boxes_for_nms, conf, batch_idx, nms_thresh) boxes_final = boxes_xyxy[keep] conf_final = conf[keep] class_final = class_id[keep] batch_final = batch_idx[keep] results = [[] for _ in range(B)] for b in range(B): mask_b = batch_final == b if mask_b.sum() == 0: continue results[b] = list(zip(boxes_final[mask_b].numpy(), conf_final[mask_b].numpy(), class_final[mask_b].numpy())) return results def player_detection_result(frames: list[ndarray], batch_size, model, kits_clf=None, left_team_label=None, grass_hsv=None): start_time = time.time() # input_layer = model.input(0) # output_layer = model.output(0) height, width = frames[0].shape[:2] scale = 640.0 x_scale = width / scale y_scale = height / scale # infer_queue = AsyncInferQueue(model, len(frames)) infer_time = time.time() kits_clf = kits_clf left_team_label = left_team_label grass_hsv = grass_hsv results = [] for i in range(0, len(frames), batch_size): if i + batch_size > len(frames): batch_size = len(frames) - i batch_frames = frames[i:i + batch_size] imgs = pre_process_img(batch_frames, scale) input_name = model.get_inputs()[0].name outputs = model.run(None, {input_name: imgs})[0] raw_results = post_process_output(np.array(outputs), x_scale, y_scale) if kits_clf is None or left_team_label is None or grass_hsv is None: # Use first frame to initialize team classification first_frame = batch_frames[0] first_frame_results = raw_results[0] if raw_results else [] if first_frame_results: players_imgs, players_boxes = get_players_boxes(first_frame, first_frame_results) if players_imgs: grass_color = get_grass_color(first_frame) grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) kits_colors = get_kits_colors(players_imgs, grass_hsv) if kits_colors: # Only proceed if we have valid kit colors kits_clf = get_kits_classifier(kits_colors) if kits_clf is not None: left_team_label = int(get_left_team_label(players_boxes, kits_colors, kits_clf)) # Process team identification and boundary checking processed_results = process_team_identification_batch( batch_frames, raw_results, kits_clf, left_team_label, grass_hsv ) processed_results = convert_numpy_types(processed_results) results.extend(processed_results) # Return the same format as before for compatibility return results, kits_clf, left_team_label, grass_hsv