Spaces:
Running
Running
| import streamlit as st | |
| import tempfile | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import subprocess | |
| # Ensure FFmpeg is installed | |
| os.system("apt-get update && apt-get install -y ffmpeg") | |
| # Function to remove watermark from frames | |
| def remove_watermark(frame): | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| _, mask = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY) | |
| result = cv2.inpaint(frame, mask, 3, cv2.INPAINT_TELEA) | |
| return result | |
| # Function to enhance resolution to 1080p | |
| def enhance_resolution(input_path, output_path): | |
| os.system(f"ffmpeg -i {input_path} -vf scale=1920:1080 {output_path}") | |
| # Function to get video duration using subprocess (fix for ffmpeg.probe error) | |
| def get_video_duration(video_path): | |
| try: | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", video_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| return float(result.stdout.strip()) # Convert duration to float (seconds) | |
| except Exception as e: | |
| print("Error getting video duration:", e) | |
| return None | |
| # Function to trim last 3 seconds (removes Invideo AI branding) | |
| def trim_invidea_ai_branding(input_path, output_path): | |
| duration = get_video_duration(input_path) | |
| if duration: | |
| duration = max(0, duration - 3) # Trim last 3 seconds | |
| os.system(f"ffmpeg -i {input_path} -t {duration} {output_path}") | |
| else: | |
| os.system(f"cp {input_path} {output_path}") # Copy file without trimming | |
| # Function to process the video | |
| def process_video(input_video): | |
| cap = cv2.VideoCapture(input_video) | |
| if not cap.isOpened(): | |
| return None | |
| temp_output = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(temp_output.name, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4)))) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| processed_frame = remove_watermark(frame) | |
| out.write(processed_frame) | |
| cap.release() | |
| out.release() | |
| # Enhance resolution | |
| enhanced_output = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| enhance_resolution(temp_output.name, enhanced_output.name) | |
| # Trim branding | |
| final_output = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| trim_invidea_ai_branding(enhanced_output.name, final_output.name) | |
| return final_output.name | |
| # Streamlit UI | |
| st.title("AI Video Enhancement App 🎥") | |
| uploaded_file = st.file_uploader("Upload a video", type=["mp4", "avi", "mov"]) | |
| if uploaded_file: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: | |
| temp_file.write(uploaded_file.read()) | |
| temp_file_path = temp_file.name | |
| st.write("Processing video...") | |
| processed_video_path = process_video(temp_file_path) | |
| if processed_video_path: | |
| st.video(processed_video_path) | |
| with open(processed_video_path, "rb") as file: | |
| st.download_button("Download Processed Video", file, "enhanced_video.mp4", "video/mp4") | |