|
|
|
|
|
""" |
|
|
Audio-Enhanced Video Highlights Generator |
|
|
Combines SmolVLM2 visual analysis with Whisper audio transcription |
|
|
Supports 99+ languages including Telugu, Hindi, English |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import cv2 |
|
|
import argparse |
|
|
import json |
|
|
import subprocess |
|
|
import threading |
|
|
import time |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
from typing import List, Dict, Optional |
|
|
import logging |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent / "src")) |
|
|
|
|
|
try: |
|
|
from src.smolvlm2_handler import SmolVLM2Handler |
|
|
except ImportError: |
|
|
print("β SmolVLM2Handler not found. Make sure to install dependencies first.") |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
import whisper |
|
|
WHISPER_AVAILABLE = True |
|
|
print("β
Whisper available for audio transcription") |
|
|
except ImportError: |
|
|
WHISPER_AVAILABLE = False |
|
|
print("β Whisper not available. Install with: pip install openai-whisper") |
|
|
sys.exit(1) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AudioVisualAnalyzer: |
|
|
"""Comprehensive analyzer combining visual and audio analysis""" |
|
|
|
|
|
def __init__(self, whisper_model_size="base", timeout_seconds=90, enable_visual=True, visual_only_mode=False): |
|
|
"""Initialize with SmolVLM2 and optionally Whisper models""" |
|
|
print("π§ Initializing Visual Analyzer...") |
|
|
|
|
|
self.enable_visual = enable_visual |
|
|
self.visual_only_mode = visual_only_mode |
|
|
|
|
|
|
|
|
if self.enable_visual: |
|
|
print("π₯ Loading SmolVLM2...") |
|
|
self.vlm_handler = SmolVLM2Handler() |
|
|
else: |
|
|
print("π Visual analysis disabled") |
|
|
self.vlm_handler = None |
|
|
self.timeout_seconds = timeout_seconds |
|
|
|
|
|
|
|
|
if self.visual_only_mode: |
|
|
print("ποΈ Visual-only mode enabled - skipping audio processing to optimize performance") |
|
|
self.whisper_model = None |
|
|
elif WHISPER_AVAILABLE: |
|
|
print(f"π₯ Loading Whisper model ({whisper_model_size})...") |
|
|
self.whisper_model = whisper.load_model(whisper_model_size) |
|
|
print("β
Whisper model loaded successfully") |
|
|
else: |
|
|
self.whisper_model = None |
|
|
print("β οΈ Whisper not available - audio analysis disabled") |
|
|
|
|
|
def extract_audio_segments(self, video_path: str, segments: List[Dict]) -> List[str]: |
|
|
"""Extract audio for specific video segments""" |
|
|
audio_files = [] |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
for i, segment in enumerate(segments): |
|
|
start_time = segment['start_time'] |
|
|
duration = segment['duration'] |
|
|
|
|
|
audio_path = os.path.join(temp_dir, f"segment_{i}.wav") |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-i', video_path, |
|
|
'-ss', str(start_time), |
|
|
'-t', str(duration), |
|
|
'-vn', |
|
|
'-acodec', 'pcm_s16le', |
|
|
'-ar', '16000', |
|
|
'-ac', '1', |
|
|
'-f', 'wav', |
|
|
'-y', |
|
|
audio_path |
|
|
] |
|
|
|
|
|
try: |
|
|
result = subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
|
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: |
|
|
audio_files.append(audio_path) |
|
|
logger.info(f"π Extracted audio segment {i+1}: {duration:.1f}s") |
|
|
else: |
|
|
logger.warning(f"β οΈ Audio segment {i+1} is empty or missing") |
|
|
audio_files.append(None) |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.warning(f"β οΈ No audio stream in segment {i+1} (this is normal for silent videos)") |
|
|
audio_files.append(None) |
|
|
|
|
|
return audio_files |
|
|
|
|
|
def transcribe_audio_segment(self, audio_path: str) -> Dict: |
|
|
"""Transcribe audio segment with Whisper""" |
|
|
if not WHISPER_AVAILABLE or not audio_path or not os.path.exists(audio_path): |
|
|
return {"text": "", "language": "unknown", "confidence": 0.0} |
|
|
|
|
|
try: |
|
|
result = self.whisper_model.transcribe( |
|
|
audio_path, |
|
|
language=None, |
|
|
task="transcribe" |
|
|
) |
|
|
|
|
|
return { |
|
|
"text": result.get("text", "").strip(), |
|
|
"language": result.get("language", "unknown"), |
|
|
"confidence": 1.0 |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"β Audio transcription failed: {e}") |
|
|
return {"text": "", "language": "unknown", "confidence": 0.0} |
|
|
|
|
|
def analyze_visual_content(self, frame_path: str) -> Dict: |
|
|
"""Analyze visual content using SmolVLM2 with robust error handling""" |
|
|
|
|
|
if not self.enable_visual or self.vlm_handler is None: |
|
|
logger.info("πΉ Visual analysis disabled, using audio-only mode") |
|
|
return {"description": "Audio-only analysis mode - visual analysis disabled", "score": 7.0} |
|
|
|
|
|
max_retries = 2 |
|
|
retry_count = 0 |
|
|
|
|
|
while retry_count < max_retries: |
|
|
try: |
|
|
def generate_with_timeout(): |
|
|
prompt = ("Analyze this video frame for interesting, engaging, or highlight-worthy content. " |
|
|
"IMPORTANT: Start your response with 'Score: X/10' where X is a number from 1-10. " |
|
|
"Then explain what makes it noteworthy. Focus on action, emotion, important moments, or visually striking elements. " |
|
|
"Rate based on: Action/movement (high scores), People talking/interacting (medium-high), " |
|
|
"Static scenes (low-medium), Boring/empty scenes (low scores).") |
|
|
return self.vlm_handler.generate_response(frame_path, prompt) |
|
|
|
|
|
|
|
|
thread_result = [None] |
|
|
exception_result = [None] |
|
|
|
|
|
def target(): |
|
|
try: |
|
|
thread_result[0] = generate_with_timeout() |
|
|
except Exception as e: |
|
|
exception_result[0] = e |
|
|
|
|
|
thread = threading.Thread(target=target) |
|
|
thread.daemon = True |
|
|
thread.start() |
|
|
thread.join(self.timeout_seconds) |
|
|
|
|
|
if thread.is_alive(): |
|
|
logger.warning(f"β° Visual analysis timed out after {self.timeout_seconds}s (attempt {retry_count + 1})") |
|
|
retry_count += 1 |
|
|
if retry_count >= max_retries: |
|
|
logger.info("π Switching to audio-only mode due to visual timeout") |
|
|
return {"description": "Visual analysis timed out - using audio-only mode", "score": 7.0} |
|
|
continue |
|
|
|
|
|
if exception_result[0]: |
|
|
error_msg = str(exception_result[0]) |
|
|
if "probability tensor" in error_msg or "inf" in error_msg or "nan" in error_msg: |
|
|
logger.warning(f"β οΈ Model inference error, retrying (attempt {retry_count + 1}): {error_msg}") |
|
|
retry_count += 1 |
|
|
if retry_count >= max_retries: |
|
|
return {"description": "Model inference failed after retries", "score": 6.0} |
|
|
continue |
|
|
else: |
|
|
raise exception_result[0] |
|
|
|
|
|
response = thread_result[0] |
|
|
if not response or len(response.strip()) == 0: |
|
|
logger.warning(f"β οΈ Empty response, retrying (attempt {retry_count + 1})") |
|
|
retry_count += 1 |
|
|
if retry_count >= max_retries: |
|
|
return {"description": "No meaningful response after retries", "score": 6.0} |
|
|
continue |
|
|
|
|
|
|
|
|
score = self.extract_score_from_text(response) |
|
|
return {"description": response, "score": score} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
logger.warning(f"β οΈ Visual analysis error (attempt {retry_count + 1}): {error_msg}") |
|
|
retry_count += 1 |
|
|
if retry_count >= max_retries: |
|
|
return {"description": f"Analysis failed after {max_retries} attempts: {error_msg}", "score": 6.0} |
|
|
|
|
|
|
|
|
return {"description": "Analysis failed after all retry attempts", "score": 6.0} |
|
|
|
|
|
def extract_score_from_text(self, text: str) -> float: |
|
|
"""Extract numeric score from analysis text""" |
|
|
import re |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'score:\s*(\d+(?:\.\d+)?)\s*/\s*10', |
|
|
r'(\d+(?:\.\d+)?)\s*/\s*10', |
|
|
r'(?:score|rating|rate)(?:\s*[:=]\s*)(\d+(?:\.\d+)?)', |
|
|
r'(\d+(?:\.\d+)?)\s*(?:out of|/)\s*10', |
|
|
r'(?:^|\s)(\d+(?:\.\d+)?)(?:\s*[/]\s*10)?(?:\s|$)', |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text.lower()) |
|
|
if matches: |
|
|
try: |
|
|
score = float(matches[0]) |
|
|
return min(max(score, 1.0), 10.0) |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
return 6.0 |
|
|
|
|
|
def calculate_combined_score(self, visual_score: float, audio_text: str, audio_lang: str) -> float: |
|
|
"""Calculate combined score from visual and audio analysis""" |
|
|
|
|
|
combined_score = visual_score |
|
|
|
|
|
|
|
|
if audio_text: |
|
|
audio_bonus = 0.0 |
|
|
text_lower = audio_text.lower() |
|
|
|
|
|
|
|
|
excitement_words = ['amazing', 'incredible', 'wow', 'fantastic', 'awesome', 'perfect', 'excellent'] |
|
|
action_words = ['goal', 'win', 'victory', 'success', 'breakthrough', 'achievement'] |
|
|
emotion_words = ['happy', 'excited', 'thrilled', 'surprised', 'shocked', 'love'] |
|
|
|
|
|
|
|
|
telugu_positive = ['ΰ°
ΰ°¦ΰ±ΰ°ΰ±ΰ°€ΰ°', 'ΰ°ΰ°Ύΰ°²ΰ°Ύ ΰ°¬ΰ°Ύΰ°ΰ±ΰ°ΰ°¦ΰ°Ώ', 'ΰ°΅ΰ°Ύΰ°΅ΰ±', 'ΰ°Έΰ±ΰ°ͺΰ°°ΰ±'] |
|
|
|
|
|
|
|
|
for word_list in [excitement_words, action_words, emotion_words, telugu_positive]: |
|
|
for word in word_list: |
|
|
if word in text_lower: |
|
|
audio_bonus += 0.5 |
|
|
|
|
|
|
|
|
if len(audio_text) > 50: |
|
|
audio_bonus += 0.3 |
|
|
elif len(audio_text) > 20: |
|
|
audio_bonus += 0.1 |
|
|
|
|
|
|
|
|
if audio_lang in ['te', 'telugu']: |
|
|
audio_bonus += 0.2 |
|
|
elif audio_lang in ['hi', 'hindi']: |
|
|
audio_bonus += 0.2 |
|
|
|
|
|
combined_score += audio_bonus |
|
|
|
|
|
|
|
|
return min(max(combined_score, 1.0), 10.0) |
|
|
|
|
|
def analyze_segment(self, video_path: str, segment: Dict, temp_frame_path: str) -> Dict: |
|
|
"""Analyze a single video segment with both visual and audio""" |
|
|
start_time = segment['start_time'] |
|
|
duration = segment['duration'] |
|
|
|
|
|
logger.info(f"π Analyzing segment at {start_time:.1f}s ({duration:.1f}s duration)") |
|
|
|
|
|
|
|
|
visual_analysis = self.analyze_visual_content(temp_frame_path) |
|
|
|
|
|
|
|
|
if self.visual_only_mode: |
|
|
logger.info("ποΈ Visual-only mode: skipping audio analysis") |
|
|
audio_analysis = {"text": "", "language": "unknown", "confidence": 0.0} |
|
|
|
|
|
combined_score = visual_analysis['score'] |
|
|
else: |
|
|
|
|
|
audio_files = self.extract_audio_segments(video_path, [segment]) |
|
|
audio_analysis = {"text": "", "language": "unknown", "confidence": 0.0} |
|
|
|
|
|
if audio_files and audio_files[0]: |
|
|
audio_analysis = self.transcribe_audio_segment(audio_files[0]) |
|
|
|
|
|
try: |
|
|
os.unlink(audio_files[0]) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
combined_score = self.calculate_combined_score( |
|
|
visual_analysis['score'], |
|
|
audio_analysis['text'], |
|
|
audio_analysis['language'] |
|
|
) |
|
|
|
|
|
return { |
|
|
'start_time': start_time, |
|
|
'duration': duration, |
|
|
'visual_score': visual_analysis['score'], |
|
|
'visual_description': visual_analysis['description'], |
|
|
'audio_text': audio_analysis['text'], |
|
|
'audio_language': audio_analysis['language'], |
|
|
'combined_score': combined_score, |
|
|
'selected': False |
|
|
} |
|
|
|
|
|
def extract_frames_at_intervals(video_path: str, interval_seconds: float = 10.0) -> List[Dict]: |
|
|
"""Extract frames at regular intervals from video""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video file: {video_path}") |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
duration = total_frames / fps |
|
|
|
|
|
logger.info(f"πΉ Video: {duration:.1f}s, {fps:.1f} FPS, {total_frames} frames") |
|
|
|
|
|
segments = [] |
|
|
current_time = 0 |
|
|
|
|
|
while current_time < duration: |
|
|
segment_duration = min(interval_seconds, duration - current_time) |
|
|
segments.append({ |
|
|
'start_time': current_time, |
|
|
'duration': segment_duration, |
|
|
'frame_number': int(current_time * fps) |
|
|
}) |
|
|
current_time += interval_seconds |
|
|
|
|
|
cap.release() |
|
|
return segments |
|
|
|
|
|
def save_frame_at_time(video_path: str, time_seconds: float, output_path: str) -> bool: |
|
|
"""Save a frame at specific time with robust frame extraction""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
return False |
|
|
|
|
|
try: |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
frame_number = int(time_seconds * fps) |
|
|
|
|
|
|
|
|
frame_number = min(frame_number, total_frames - 1) |
|
|
frame_number = max(frame_number, 0) |
|
|
|
|
|
|
|
|
for attempt in range(3): |
|
|
try: |
|
|
|
|
|
test_frame = frame_number + attempt |
|
|
if test_frame >= total_frames: |
|
|
test_frame = frame_number - attempt |
|
|
if test_frame < 0: |
|
|
test_frame = frame_number |
|
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, test_frame) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if ret and frame is not None and frame.size > 0: |
|
|
|
|
|
if len(frame.shape) == 3 and frame.shape[2] == 3: |
|
|
success = cv2.imwrite(output_path, frame) |
|
|
if success: |
|
|
cap.release() |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Frame extraction attempt {attempt + 1} failed: {e}") |
|
|
continue |
|
|
|
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Critical error in frame extraction: {e}") |
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
def create_highlights_video(video_path: str, selected_segments: List[Dict], output_path: str): |
|
|
"""Create highlights video from selected segments""" |
|
|
if not selected_segments: |
|
|
logger.error("β No segments selected for highlights") |
|
|
return False |
|
|
|
|
|
|
|
|
temp_files = [] |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
for i, segment in enumerate(selected_segments): |
|
|
temp_file = os.path.join(temp_dir, f"segment_{i}.mp4") |
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-i', video_path, |
|
|
'-ss', str(segment['start_time']), |
|
|
'-t', str(segment['duration']), |
|
|
'-c', 'copy', |
|
|
'-y', temp_file |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
temp_files.append(temp_file) |
|
|
logger.info(f"β
Created segment {i+1}/{len(selected_segments)}") |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"β Failed to create segment {i+1}: {e}") |
|
|
continue |
|
|
|
|
|
if not temp_files: |
|
|
logger.error("β No valid segments created") |
|
|
return False |
|
|
|
|
|
|
|
|
concat_file = os.path.join(temp_dir, "concat.txt") |
|
|
with open(concat_file, 'w') as f: |
|
|
for temp_file in temp_files: |
|
|
f.write(f"file '{temp_file}'\n") |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', '-f', 'concat', '-safe', '0', |
|
|
'-i', concat_file, |
|
|
'-c', 'copy', |
|
|
'-y', output_path |
|
|
] |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd, check=True, capture_output=True) |
|
|
logger.info(f"β
Highlights video created: {output_path}") |
|
|
|
|
|
|
|
|
for temp_file in temp_files: |
|
|
try: |
|
|
os.unlink(temp_file) |
|
|
except: |
|
|
pass |
|
|
try: |
|
|
os.unlink(concat_file) |
|
|
os.rmdir(temp_dir) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return True |
|
|
except subprocess.CalledProcessError as e: |
|
|
logger.error(f"β Failed to create highlights video: {e}") |
|
|
return False |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Audio-Enhanced Video Highlights Generator") |
|
|
parser.add_argument("video_path", help="Path to input video file") |
|
|
parser.add_argument("--output", "-o", default="audio_enhanced_highlights.mp4", |
|
|
help="Output highlights video path") |
|
|
parser.add_argument("--interval", "-i", type=float, default=10.0, |
|
|
help="Analysis interval in seconds (default: 10.0)") |
|
|
parser.add_argument("--min-score", "-s", type=float, default=7.0, |
|
|
help="Minimum score for highlights (default: 7.0)") |
|
|
parser.add_argument("--max-highlights", "-m", type=int, default=5, |
|
|
help="Maximum number of highlights (default: 5)") |
|
|
parser.add_argument("--whisper-model", "-w", default="base", |
|
|
choices=["tiny", "base", "small", "medium", "large"], |
|
|
help="Whisper model size (default: base)") |
|
|
parser.add_argument("--timeout", "-t", type=int, default=30, |
|
|
help="Timeout for each analysis in seconds (default: 30)") |
|
|
parser.add_argument("--save-analysis", action="store_true", |
|
|
help="Save detailed analysis to JSON file") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if not os.path.exists(args.video_path): |
|
|
print(f"β Video file not found: {args.video_path}") |
|
|
sys.exit(1) |
|
|
|
|
|
print("π¬ Audio-Enhanced Video Highlights Generator") |
|
|
print(f"π Input: {args.video_path}") |
|
|
print(f"π Output: {args.output}") |
|
|
print(f"β±οΈ Analysis interval: {args.interval}s") |
|
|
print(f"π― Minimum score: {args.min_score}") |
|
|
print(f"π Max highlights: {args.max_highlights}") |
|
|
print(f"ποΈ Whisper model: {args.whisper_model}") |
|
|
print() |
|
|
|
|
|
try: |
|
|
|
|
|
analyzer = AudioVisualAnalyzer( |
|
|
whisper_model_size=args.whisper_model, |
|
|
timeout_seconds=args.timeout |
|
|
) |
|
|
|
|
|
|
|
|
segments = extract_frames_at_intervals(args.video_path, args.interval) |
|
|
print(f"π Analyzing {len(segments)} segments...") |
|
|
|
|
|
analyzed_segments = [] |
|
|
temp_frame_path = "temp_frame.jpg" |
|
|
|
|
|
for i, segment in enumerate(segments): |
|
|
print(f"\nπ Segment {i+1}/{len(segments)} (t={segment['start_time']:.1f}s)") |
|
|
|
|
|
|
|
|
if save_frame_at_time(args.video_path, segment['start_time'], temp_frame_path): |
|
|
|
|
|
analysis = analyzer.analyze_segment(args.video_path, segment, temp_frame_path) |
|
|
analyzed_segments.append(analysis) |
|
|
|
|
|
print(f" ποΈ Visual: {analysis['visual_score']:.1f}/10") |
|
|
print(f" ποΈ Audio: '{analysis['audio_text'][:50]}...' ({analysis['audio_language']})") |
|
|
print(f" π― Combined: {analysis['combined_score']:.1f}/10") |
|
|
else: |
|
|
print(f" β Failed to extract frame") |
|
|
|
|
|
|
|
|
try: |
|
|
os.unlink(temp_frame_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
if not analyzed_segments: |
|
|
print("β No segments analyzed successfully") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
analyzed_segments.sort(key=lambda x: x['combined_score'], reverse=True) |
|
|
selected_segments = [s for s in analyzed_segments if s['combined_score'] >= args.min_score] |
|
|
selected_segments = selected_segments[:args.max_highlights] |
|
|
|
|
|
print(f"\nπ Selected {len(selected_segments)} highlights:") |
|
|
for i, segment in enumerate(selected_segments): |
|
|
print(f"{i+1}. t={segment['start_time']:.1f}s, score={segment['combined_score']:.1f}") |
|
|
if segment['audio_text']: |
|
|
print(f" Audio: \"{segment['audio_text'][:100]}...\"") |
|
|
|
|
|
if not selected_segments: |
|
|
print(f"β No segments met minimum score of {args.min_score}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
print(f"\n㪠Creating highlights video...") |
|
|
success = create_highlights_video(args.video_path, selected_segments, args.output) |
|
|
|
|
|
if success: |
|
|
print(f"β
Audio-enhanced highlights created: {args.output}") |
|
|
|
|
|
|
|
|
if args.save_analysis: |
|
|
analysis_file = args.output.replace('.mp4', '_analysis.json') |
|
|
with open(analysis_file, 'w') as f: |
|
|
json.dump({ |
|
|
'input_video': args.video_path, |
|
|
'output_video': args.output, |
|
|
'settings': { |
|
|
'interval': args.interval, |
|
|
'min_score': args.min_score, |
|
|
'max_highlights': args.max_highlights, |
|
|
'whisper_model': args.whisper_model, |
|
|
'timeout': args.timeout |
|
|
}, |
|
|
'segments': analyzed_segments, |
|
|
'selected_segments': selected_segments |
|
|
}, f, indent=2) |
|
|
print(f"π Analysis saved: {analysis_file}") |
|
|
else: |
|
|
print("β Failed to create highlights video") |
|
|
sys.exit(1) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nβΉοΈ Operation cancelled by user") |
|
|
sys.exit(1) |
|
|
except Exception as e: |
|
|
print(f"β Error: {e}") |
|
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|