|
|
|
|
|
""" |
|
|
HuggingFace Segment-Based Video Highlights Generator |
|
|
Based on HuggingFace's SmolVLM2-HighlightGenerator approach |
|
|
Optimized for HuggingFace Spaces with 256M model for resource efficiency |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import argparse |
|
|
import json |
|
|
import subprocess |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
import logging |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent / "src")) |
|
|
|
|
|
try: |
|
|
from src.smolvlm2_handler import SmolVLM2Handler |
|
|
except ImportError: |
|
|
print("β SmolVLM2Handler not found. Make sure to install dependencies first.") |
|
|
sys.exit(1) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class HuggingFaceVideoHighlightDetector: |
|
|
""" |
|
|
HuggingFace Segment-Based Video Highlight Detection |
|
|
Uses fixed-length segments for consistent AI classification |
|
|
""" |
|
|
|
|
|
def __init__(self, model_name: str = "HuggingFaceTB/SmolVLM2-256M-Video-Instruct"): |
|
|
"""Initialize with SmolVLM2 model - 2.2B provides much better reasoning than 256M""" |
|
|
print(f"π₯ Loading {model_name} for HuggingFace Segment-Based Analysis...") |
|
|
self.vlm_handler = SmolVLM2Handler(model_name=model_name) |
|
|
print("β
SmolVLM2 loaded successfully!") |
|
|
|
|
|
def get_video_duration_seconds(self, video_path: str) -> float: |
|
|
"""Get video duration using ffprobe""" |
|
|
cmd = [ |
|
|
"ffprobe", "-v", "quiet", "-show_entries", |
|
|
"format=duration", "-of", "csv=p=0", video_path |
|
|
] |
|
|
try: |
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
|
|
return float(result.stdout.strip()) |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to get video duration: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def analyze_video_content(self, video_path: str) -> str: |
|
|
"""Get overall video description by analyzing multiple frames""" |
|
|
duration = self.get_video_duration_seconds(video_path) |
|
|
|
|
|
|
|
|
frame_times = [duration * 0.1, duration * 0.3, duration * 0.5, duration * 0.7, duration * 0.9] |
|
|
descriptions = [] |
|
|
|
|
|
for i, time_point in enumerate(frame_times): |
|
|
with tempfile.NamedTemporaryFile(suffix=f'_frame_{i}.jpg', delete=False) as temp_frame: |
|
|
cmd = [ |
|
|
"ffmpeg", "-v", "quiet", "-i", video_path, |
|
|
"-ss", str(time_point), "-vframes", "1", "-y", temp_frame.name |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
|
|
|
|
prompt = f"Describe what is happening in this video frame at {time_point:.1f}s. Focus on activities, actions, and interesting visual elements." |
|
|
description = self.vlm_handler.generate_response(temp_frame.name, prompt) |
|
|
descriptions.append(f"At {time_point:.1f}s: {description}") |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to extract frame at {time_point}s: {e}") |
|
|
continue |
|
|
finally: |
|
|
|
|
|
if os.path.exists(temp_frame.name): |
|
|
os.unlink(temp_frame.name) |
|
|
|
|
|
|
|
|
if descriptions: |
|
|
return "Video content analysis:\n" + "\n".join(descriptions) |
|
|
else: |
|
|
return "Unable to analyze video content" |
|
|
|
|
|
def determine_highlights(self, video_description: str) -> Tuple[str, str]: |
|
|
"""Generate simple, focused criteria based on actual video content""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
criteria_set_1 = """Look for segments with: |
|
|
- Significant movement or action |
|
|
- Clear visual activity or events happening |
|
|
- People interacting or doing activities |
|
|
- Changes in scene or camera angle |
|
|
- Dynamic or interesting visual content""" |
|
|
|
|
|
criteria_set_2 = """Look for segments with: |
|
|
- Interesting facial expressions or gestures |
|
|
- Multiple people or subjects in frame |
|
|
- Good lighting and clear visibility |
|
|
- Engaging activities or behaviors |
|
|
- Visually appealing or well-composed shots""" |
|
|
|
|
|
return criteria_set_1, criteria_set_2 |
|
|
|
|
|
def process_segment(self, video_path: str, start_time: float, end_time: float, |
|
|
highlight_criteria: str, segment_num: int, total_segments: int) -> str: |
|
|
"""Process a single 5-second segment and determine if it matches criteria""" |
|
|
|
|
|
|
|
|
segment_duration = end_time - start_time |
|
|
frame_times = [ |
|
|
start_time + segment_duration * 0.2, |
|
|
start_time + segment_duration * 0.5, |
|
|
start_time + segment_duration * 0.8 |
|
|
] |
|
|
|
|
|
temp_frames = [] |
|
|
try: |
|
|
|
|
|
for i, frame_time in enumerate(frame_times): |
|
|
temp_frame = tempfile.NamedTemporaryFile(suffix=f'_frame_{i}.jpg', delete=False) |
|
|
temp_frames.append(temp_frame.name) |
|
|
temp_frame.close() |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-v", "quiet", "-i", video_path, |
|
|
"-ss", str(frame_time), "-vframes", "1", "-y", temp_frame.name |
|
|
] |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
|
|
|
|
prompt = f"""Look at this frame from a {segment_duration:.1f}-second video segment. |
|
|
|
|
|
Rate this video segment for highlight potential on a scale of 1-10, where: |
|
|
- 1-3: Boring, static, nothing interesting happening |
|
|
- 4-6: Moderately interesting, some activity or visual interest |
|
|
- 7-10: Very interesting, dynamic action, engaging content worth highlighting |
|
|
|
|
|
Consider: |
|
|
- Amount of movement and activity |
|
|
- Visual interest and composition |
|
|
- People interactions or engaging behavior |
|
|
- Overall entertainment value |
|
|
|
|
|
Give ONLY a number from 1-10, nothing else.""" |
|
|
|
|
|
|
|
|
response = self.vlm_handler.generate_response(temp_frames[0], prompt) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
import re |
|
|
numbers = re.findall(r'\b(\d+)\b', response) |
|
|
if numbers: |
|
|
score = int(numbers[0]) |
|
|
if 1 <= score <= 10: |
|
|
print(f" π€ Score: {score}/10") |
|
|
return str(score) |
|
|
|
|
|
print(f" π€ Response: {response} (couldn't extract valid score)") |
|
|
return "1" |
|
|
except: |
|
|
print(f" π€ Response: {response} (error parsing)") |
|
|
return "1" |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to process segment {segment_num}: {e}") |
|
|
return "no" |
|
|
finally: |
|
|
|
|
|
for temp_frame in temp_frames: |
|
|
if os.path.exists(temp_frame): |
|
|
os.unlink(temp_frame) |
|
|
|
|
|
def create_video_segment(self, video_path: str, start_sec: float, end_sec: float, output_path: str) -> bool: |
|
|
"""Create a video segment using ffmpeg.""" |
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-v", "quiet", |
|
|
"-y", |
|
|
"-i", video_path, |
|
|
"-ss", str(start_sec), |
|
|
"-to", str(end_sec), |
|
|
"-c", "copy", |
|
|
output_path |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
return True |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to create segment: {e}") |
|
|
return False |
|
|
|
|
|
def concatenate_scenes(self, video_path: str, scene_times: List[Tuple[float, float]], |
|
|
output_path: str, with_effects: bool = True) -> bool: |
|
|
"""Concatenate selected scenes with optional effects""" |
|
|
if with_effects: |
|
|
return self._concatenate_with_effects(video_path, scene_times, output_path) |
|
|
else: |
|
|
return self._concatenate_basic(video_path, scene_times, output_path) |
|
|
|
|
|
def _concatenate_basic(self, video_path: str, scene_times: List[Tuple[float, float]], output_path: str) -> bool: |
|
|
"""Basic concatenation without effects""" |
|
|
if not scene_times: |
|
|
logger.error("No scenes to concatenate") |
|
|
return False |
|
|
|
|
|
|
|
|
temp_files = [] |
|
|
temp_list_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) |
|
|
|
|
|
try: |
|
|
for i, (start_sec, end_sec) in enumerate(scene_times): |
|
|
temp_file = tempfile.NamedTemporaryFile(suffix=f'_segment_{i}.mp4', delete=False) |
|
|
temp_files.append(temp_file.name) |
|
|
temp_file.close() |
|
|
|
|
|
|
|
|
if not self.create_video_segment(video_path, start_sec, end_sec, temp_file.name): |
|
|
return False |
|
|
|
|
|
|
|
|
temp_list_file.write(f"file '{temp_file.name}'\n") |
|
|
|
|
|
temp_list_file.close() |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", "-v", "quiet", "-y", |
|
|
"-f", "concat", "-safe", "0", |
|
|
"-i", temp_list_file.name, |
|
|
"-c", "copy", |
|
|
output_path |
|
|
] |
|
|
|
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
return True |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to concatenate scenes: {e}") |
|
|
return False |
|
|
finally: |
|
|
|
|
|
for temp_file in temp_files: |
|
|
if os.path.exists(temp_file): |
|
|
os.unlink(temp_file) |
|
|
if os.path.exists(temp_list_file.name): |
|
|
os.unlink(temp_list_file.name) |
|
|
|
|
|
def _concatenate_with_effects(self, video_path: str, scene_times: List[Tuple[float, float]], output_path: str) -> bool: |
|
|
"""Simple concatenation with basic fade transitions.""" |
|
|
filter_complex_parts = [] |
|
|
concat_inputs = [] |
|
|
|
|
|
|
|
|
fade_duration = 0.5 |
|
|
|
|
|
for i, (start_sec, end_sec) in enumerate(scene_times): |
|
|
print(f" β¨ Segment {i+1}: {start_sec:.1f}s - {end_sec:.1f}s ({end_sec-start_sec:.1f}s) with FADE effect") |
|
|
|
|
|
|
|
|
video_effects = ( |
|
|
f"trim=start={start_sec}:end={end_sec}," |
|
|
f"setpts=PTS-STARTPTS," |
|
|
f"fade=t=in:st=0:d={fade_duration}," |
|
|
f"fade=t=out:st={max(0, end_sec-start_sec-fade_duration)}:d={fade_duration}" |
|
|
) |
|
|
|
|
|
filter_complex_parts.append(f"[0:v]{video_effects}[v{i}];") |
|
|
|
|
|
|
|
|
audio_effects = ( |
|
|
f"atrim=start={start_sec}:end={end_sec}," |
|
|
f"asetpts=PTS-STARTPTS," |
|
|
f"afade=t=in:st=0:d={fade_duration}," |
|
|
f"afade=t=out:st={max(0, end_sec-start_sec-fade_duration)}:d={fade_duration}" |
|
|
) |
|
|
|
|
|
filter_complex_parts.append(f"[0:a]{audio_effects}[a{i}];") |
|
|
concat_inputs.append(f"[v{i}][a{i}]") |
|
|
|
|
|
|
|
|
concat_filter = f"{''.join(concat_inputs)}concat=n={len(scene_times)}:v=1:a=1[outv][outa];" |
|
|
|
|
|
filter_complex = "".join(filter_complex_parts) + concat_filter |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-v", "quiet", |
|
|
"-y", |
|
|
"-i", video_path, |
|
|
"-filter_complex", filter_complex, |
|
|
"-map", "[outv]", |
|
|
"-map", "[outa]", |
|
|
"-c:v", "libx264", |
|
|
"-preset", "medium", |
|
|
"-crf", "23", |
|
|
"-c:a", "aac", |
|
|
"-b:a", "128k", |
|
|
"-pix_fmt", "yuv420p", |
|
|
output_path |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
return True |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to concatenate scenes with effects: {e}") |
|
|
return False |
|
|
|
|
|
def _single_segment_with_effects(self, video_path: str, scene_time: Tuple[float, float], output_path: str) -> bool: |
|
|
"""Apply simple effects to a single segment.""" |
|
|
start_sec, end_sec = scene_time |
|
|
print(f" β¨ Single segment: {start_sec:.1f}s - {end_sec:.1f}s ({end_sec-start_sec:.1f}s) with fade effect") |
|
|
|
|
|
|
|
|
video_effects = ( |
|
|
f"trim=start={start_sec}:end={end_sec}," |
|
|
f"setpts=PTS-STARTPTS," |
|
|
f"fade=t=in:st=0:d=0.5," |
|
|
f"fade=t=out:st={max(0, end_sec-start_sec-0.5)}:d=0.5" |
|
|
) |
|
|
|
|
|
|
|
|
audio_effects = ( |
|
|
f"atrim=start={start_sec}:end={end_sec}," |
|
|
f"asetpts=PTS-STARTPTS," |
|
|
f"afade=t=in:st=0:d=0.5," |
|
|
f"afade=t=out:st={max(0, end_sec-start_sec-0.5)}:d=0.5" |
|
|
) |
|
|
|
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-v", "quiet", |
|
|
"-y", |
|
|
"-i", video_path, |
|
|
"-vf", video_effects, |
|
|
"-af", audio_effects, |
|
|
"-c:v", "libx264", |
|
|
"-preset", "medium", |
|
|
"-crf", "23", |
|
|
"-c:a", "aac", |
|
|
"-b:a", "128k", |
|
|
"-pix_fmt", "yuv420p", |
|
|
output_path |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
return True |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"Failed to create single segment with effects: {e}") |
|
|
return False |
|
|
|
|
|
def process_video(self, video_path: str, output_path: str, segment_length: float = 5.0, with_effects: bool = True) -> Dict: |
|
|
"""Process video using HuggingFace's segment-based approach.""" |
|
|
print("π Starting HuggingFace Segment-Based Video Highlight Detection") |
|
|
print(f"π Input: {video_path}") |
|
|
print(f"π Output: {output_path}") |
|
|
print(f"β±οΈ Segment Length: {segment_length}s") |
|
|
print() |
|
|
|
|
|
|
|
|
duration = self.get_video_duration_seconds(video_path) |
|
|
if duration <= 0: |
|
|
return {"error": "Could not determine video duration"} |
|
|
|
|
|
print(f"πΉ Video duration: {duration:.1f}s ({duration/60:.1f} minutes)") |
|
|
|
|
|
|
|
|
print("π¬ Step 1: Analyzing overall video content...") |
|
|
video_description = self.analyze_video_content(video_path) |
|
|
print(f"π Video Description:") |
|
|
print(f" {video_description}") |
|
|
print() |
|
|
|
|
|
|
|
|
print("π― Step 2: Using direct scoring approach - each segment rated 1-10 for highlight potential") |
|
|
print() |
|
|
|
|
|
|
|
|
num_segments = int(duration / segment_length) + (1 if duration % segment_length > 0 else 0) |
|
|
print(f"π Step 3: Processing {num_segments} segments of {segment_length}s each...") |
|
|
print(" Each segment will be scored 1-10 for highlight potential") |
|
|
print() |
|
|
|
|
|
segment_scores = [] |
|
|
|
|
|
for i in range(num_segments): |
|
|
start_time = i * segment_length |
|
|
end_time = min(start_time + segment_length, duration) |
|
|
|
|
|
progress = int((i / num_segments) * 100) if num_segments > 0 else 0 |
|
|
print(f"π Processing segment {i+1}/{num_segments} ({progress}%)") |
|
|
print(f" β° Time: {start_time:.0f}s - {end_time:.1f}s") |
|
|
|
|
|
|
|
|
score_str = self.process_segment(video_path, start_time, end_time, "", i+1, num_segments) |
|
|
|
|
|
try: |
|
|
score = int(score_str) |
|
|
segment_scores.append({ |
|
|
'start': start_time, |
|
|
'end': end_time, |
|
|
'score': score |
|
|
}) |
|
|
|
|
|
if score >= 7: |
|
|
print(f" β
HIGH SCORE ({score}/10) - Excellent highlight material") |
|
|
elif score >= 5: |
|
|
print(f" π‘ MEDIUM SCORE ({score}/10) - Moderate interest") |
|
|
else: |
|
|
print(f" β LOW SCORE ({score}/10) - Not highlight worthy") |
|
|
|
|
|
except ValueError: |
|
|
print(f" β Invalid score: {score_str}") |
|
|
segment_scores.append({ |
|
|
'start': start_time, |
|
|
'end': end_time, |
|
|
'score': 1 |
|
|
}) |
|
|
print() |
|
|
|
|
|
|
|
|
segment_scores.sort(key=lambda x: x['score'], reverse=True) |
|
|
|
|
|
|
|
|
high_score_segments = [s for s in segment_scores if s['score'] >= 6] |
|
|
|
|
|
|
|
|
if len(high_score_segments) < 3: |
|
|
high_score_segments = [s for s in segment_scores if s['score'] >= 5] |
|
|
|
|
|
|
|
|
if len(high_score_segments) < 3: |
|
|
top_count = max(3, len(segment_scores) // 5) |
|
|
high_score_segments = segment_scores[:top_count] |
|
|
|
|
|
selected_segments = [(s['start'], s['end']) for s in high_score_segments] |
|
|
|
|
|
print("π Results Summary:") |
|
|
print(f" π Average score: {sum(s['score'] for s in segment_scores) / len(segment_scores):.1f}/10") |
|
|
print(f" π High-scoring segments (β₯6): {len([s for s in segment_scores if s['score'] >= 6])}") |
|
|
print(f" β
Selected for highlights: {len(selected_segments)} segments ({len(selected_segments)/num_segments*100:.1f}% of video)") |
|
|
print() |
|
|
|
|
|
if not selected_segments: |
|
|
return { |
|
|
"error": "No segments had sufficient scores for highlights", |
|
|
"video_description": video_description, |
|
|
"segment_scores": segment_scores, |
|
|
"total_segments": num_segments |
|
|
} |
|
|
|
|
|
|
|
|
print(f"π¬ Step 4: Concatenating {len(selected_segments)} selected segments with {'beautiful effects & transitions' if with_effects else 'basic concatenation'}...") |
|
|
|
|
|
success = self.concatenate_scenes(video_path, selected_segments, output_path, with_effects) |
|
|
|
|
|
if success: |
|
|
print("β
Highlights video created successfully!") |
|
|
total_duration = sum(end - start for start, end in selected_segments) |
|
|
print(f"π SUCCESS! Created highlights with {len(selected_segments)} segments") |
|
|
print(f" πΉ Total highlight duration: {total_duration:.1f}s") |
|
|
print(f" π Percentage of original video: {total_duration/duration*100:.1f}%") |
|
|
else: |
|
|
print("β Failed to create highlights video") |
|
|
return {"error": "Failed to create highlights video"} |
|
|
|
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"video_description": video_description, |
|
|
"scoring_approach": "Direct segment scoring (1-10 scale)", |
|
|
"total_segments": num_segments, |
|
|
"selected_segments": len(selected_segments), |
|
|
"selected_times": selected_segments, |
|
|
"segment_scores": segment_scores, |
|
|
"average_score": sum(s['score'] for s in segment_scores) / len(segment_scores), |
|
|
"total_duration": total_duration, |
|
|
"compression_ratio": total_duration/duration, |
|
|
"output_path": output_path |
|
|
} |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description='HuggingFace Segment-Based Video Highlights') |
|
|
parser.add_argument('video_path', help='Path to input video file') |
|
|
parser.add_argument('--output', required=True, help='Path to output highlights video') |
|
|
parser.add_argument('--save-analysis', action='store_true', help='Save analysis results to JSON') |
|
|
parser.add_argument('--segment-length', type=float, default=5.0, help='Length of each segment in seconds (default: 5.0)') |
|
|
parser.add_argument('--model', default='HuggingFaceTB/SmolVLM2-256M-Video-Instruct', help='SmolVLM2 model to use') |
|
|
parser.add_argument('--effects', action='store_true', default=True, help='Enable beautiful effects & transitions (default: True)') |
|
|
parser.add_argument('--no-effects', action='store_true', help='Disable effects - basic concatenation only') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
with_effects = args.effects and not args.no_effects |
|
|
|
|
|
print("π HuggingFace Approach SmolVLM2 Video Highlights") |
|
|
print(" Based on: https://huggingface.co/spaces/HuggingFaceTB/SmolVLM2-HighlightGenerator") |
|
|
print(f" Model: {args.model}") |
|
|
print(f" Effects: {'β¨ Beautiful effects & transitions enabled' if with_effects else 'π§ Basic concatenation only'}") |
|
|
print() |
|
|
|
|
|
|
|
|
detector = HuggingFaceVideoHighlightDetector(model_name=args.model) |
|
|
|
|
|
|
|
|
results = detector.process_video( |
|
|
video_path=args.video_path, |
|
|
output_path=args.output, |
|
|
segment_length=args.segment_length, |
|
|
with_effects=with_effects |
|
|
) |
|
|
|
|
|
|
|
|
if args.save_analysis and 'error' not in results: |
|
|
analysis_path = args.output.replace('.mp4', '_hf_analysis.json') |
|
|
with open(analysis_path, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"π Analysis saved: {analysis_path}") |
|
|
|
|
|
if 'error' in results: |
|
|
print(f"β {results['error']}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|