Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| import ffmpeg | |
| import gradio as gr | |
| from tqdm import tqdm | |
| import zstandard as zstd | |
| import brotli | |
| import torch | |
| import torchvision.transforms as transforms | |
| from torch.nn import functional as F | |
| import cupy as cp | |
| import io | |
| import mimetypes | |
| from pydub import AudioSegment | |
| from PyPDF2 import PdfFileReader, PdfFileWriter | |
| import docx | |
| import openpyxl | |
| class GPUAcceleratedCompressionToolkit: | |
| def __init__(self): | |
| self.supported_formats = { | |
| 'image': ['.jpg', '.jpeg', '.png', '.bmp', '.tiff'], | |
| 'video': ['.mp4', '.avi', '.mov', '.mkv'], | |
| 'audio': ['.mp3', '.wav', '.ogg', '.flac'], | |
| 'document': ['.txt', '.pdf', '.doc', '.docx'], | |
| 'spreadsheet': ['.xlsx', '.xls', '.csv'] | |
| } | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def detect_file_type(self, file_path): | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| if mime_type: | |
| type_category = mime_type.split('/')[0] | |
| if type_category in ['image', 'video', 'audio']: | |
| return type_category | |
| elif type_category == 'application': | |
| if mime_type in ['application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document']: | |
| return 'document' | |
| elif mime_type in ['application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet']: | |
| return 'spreadsheet' | |
| return 'other' | |
| def compress_image(self, input_path, output_path, compression_level, use_gpu=False, output_format='original'): | |
| img = Image.open(input_path) | |
| original_format = img.format if img.format else 'JPEG' | |
| if output_format == 'original': | |
| save_format = original_format | |
| else: | |
| save_format = output_format.upper() | |
| quality = int(compression_level) | |
| if use_gpu: | |
| tensor = transforms.ToTensor()(img).unsqueeze(0).to(self.device) | |
| compressed = F.interpolate(tensor, scale_factor=quality/100, mode='bilinear', align_corners=False) | |
| compressed = F.interpolate(compressed, size=tensor.shape[2:], mode='bilinear', align_corners=False) | |
| result = transforms.ToPILImage()(compressed.squeeze(0).cpu()) | |
| result.save(output_path, format=save_format, quality=quality) | |
| else: | |
| img.save(output_path, format=save_format, quality=quality) | |
| def compress_video(self, input_path, output_path, compression_level, use_gpu=False, output_format=None): | |
| if use_gpu: | |
| vcodec = 'h264_nvenc' | |
| else: | |
| vcodec = 'libx264' | |
| crf = int(100 - compression_level) # Invert the scale for CRF | |
| if output_format is None: | |
| output_format = os.path.splitext(input_path)[1][1:] | |
| ( | |
| ffmpeg | |
| .input(input_path) | |
| .output(output_path, vcodec=vcodec, crf=str(crf), acodec='aac', **{'preset': 'slow'}) | |
| .overwrite_output() | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| def compress_audio(self, input_path, output_path, compression_level, output_format=None): | |
| if output_format is None: | |
| output_format = os.path.splitext(input_path)[1][1:] | |
| bitrate = f"{int(compression_level * 3.2)}k" # Scale compression_level to bitrate | |
| audio = AudioSegment.from_file(input_path) | |
| audio.export(output_path, format=output_format, bitrate=bitrate) | |
| def compress_document(self, input_path, output_path, compression_level, output_format=None): | |
| if output_format is None: | |
| output_format = os.path.splitext(input_path)[1][1:] | |
| if output_format == 'pdf': | |
| with open(input_path, 'rb') as file: | |
| reader = PdfFileReader(file) | |
| writer = PdfFileWriter() | |
| for page in range(reader.getNumPages()): | |
| page = reader.getPage(page) | |
| page.compressContentStreams() # This is CPU intensive! | |
| writer.addPage(page) | |
| with open(output_path, 'wb') as output_file: | |
| writer.write(output_file) | |
| elif output_format in ['doc', 'docx']: | |
| doc = docx.Document(input_path) | |
| doc.save(output_path) | |
| else: | |
| # For other document types, use generic file compression | |
| self.compress_file_gpu(input_path, output_path, compression_level) | |
| def compress_spreadsheet(self, input_path, output_path, compression_level, output_format=None): | |
| if output_format is None: | |
| output_format = os.path.splitext(input_path)[1][1:] | |
| wb = openpyxl.load_workbook(input_path) | |
| wb.save(output_path) | |
| def compress_file_gpu(self, input_path, output_path, compression_level): | |
| level = int(compression_level / 10) # Scale compression_level to Zstandard level | |
| with open(input_path, 'rb') as f_in: | |
| data = f_in.read() | |
| d_data = cp.asarray(bytearray(data)) | |
| cctx = zstd.ZstdCompressor(level=level) | |
| d_compressed = cp.asarray(bytearray(cctx.compress(d_data.get()))) | |
| compressed = d_compressed.get().tobytes() | |
| with open(output_path, 'wb') as f_out: | |
| f_out.write(compressed) | |
| def batch_compress_gpu(self, input_files, output_dir, use_gpu, output_format, compression_level): | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| results = [] | |
| for file in tqdm(input_files): | |
| input_path = file.name | |
| file_type = self.detect_file_type(input_path) | |
| # Determine output format and path | |
| if output_format == 'original': | |
| _, ext = os.path.splitext(input_path) | |
| output_path = os.path.join(output_dir, f"compressed_{os.path.basename(input_path)}") | |
| else: | |
| output_path = os.path.join(output_dir, f"compressed_{os.path.splitext(os.path.basename(input_path))[0]}.{output_format}") | |
| if file_type == 'image': | |
| self.compress_image(input_path, output_path, compression_level, use_gpu=use_gpu, output_format=output_format) | |
| elif file_type == 'video': | |
| self.compress_video(input_path, output_path, compression_level, use_gpu=use_gpu, output_format=output_format if output_format != 'original' else None) | |
| elif file_type == 'audio': | |
| self.compress_audio(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) | |
| elif file_type == 'document': | |
| self.compress_document(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) | |
| elif file_type == 'spreadsheet': | |
| self.compress_spreadsheet(input_path, output_path, compression_level, output_format=output_format if output_format != 'original' else None) | |
| else: | |
| self.compress_file_gpu(input_path, output_path, compression_level) | |
| results.append(output_path) | |
| return results | |
| def real_time_preview_gpu(self, input_path, compression_level, use_gpu=False): | |
| file_type = self.detect_file_type(input_path) | |
| if file_type == 'image': | |
| img = Image.open(input_path) | |
| if use_gpu: | |
| tensor = transforms.ToTensor()(img).unsqueeze(0).to(self.device) | |
| tensor = F.interpolate(tensor, size=(300, 300), mode='bilinear', align_corners=False) | |
| compressed = F.interpolate(tensor, scale_factor=compression_level/100, mode='bilinear', align_corners=False) | |
| compressed = F.interpolate(compressed, size=tensor.shape[2:], mode='bilinear', align_corners=False) | |
| result = transforms.ToPILImage()(compressed.squeeze(0).cpu()) | |
| else: | |
| img = img.resize((300, 300)) | |
| buffer = io.BytesIO() | |
| img.save(buffer, format='JPEG', quality=int(compression_level)) | |
| buffer.seek(0) | |
| result = Image.open(buffer) | |
| return result | |
| elif file_type == 'video': | |
| video = cv2.VideoCapture(input_path) | |
| ret, frame = video.read() | |
| if ret: | |
| if use_gpu: | |
| d_frame = cp.asarray(frame) | |
| d_frame = cp.resize(d_frame, (300, 300)) | |
| _, d_buffer = cv2.imencode('.jpg', cp.asnumpy(d_frame), [cv2.IMWRITE_JPEG_QUALITY, int(compression_level)]) | |
| else: | |
| frame = cv2.resize(frame, (300, 300)) | |
| _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, int(compression_level)]) | |
| return Image.open(io.BytesIO(buffer.tobytes())) | |
| elif file_type in ['audio', 'document', 'spreadsheet', 'other']: | |
| placeholder = Image.new('RGB', (300, 300), color='lightgray') | |
| draw = ImageDraw.Draw(placeholder) | |
| draw.text((10, 150), f"{file_type.capitalize()} Preview\nNot Available", fill='black') | |
| return placeholder | |
| return None | |
| def gradio_interface(toolkit): | |
| def process_files(files, use_gpu, output_format, compression_level): | |
| output_dir = "compressed_output" | |
| return toolkit.batch_compress_gpu(files, output_dir, use_gpu, output_format, compression_level) | |
| def update_preview(file, compression_level, use_gpu): | |
| if file is None: | |
| return None | |
| return toolkit.real_time_preview_gpu(file.name, compression_level, use_gpu) | |
| iface = gr.Interface( | |
| fn=process_files, | |
| inputs=[ | |
| gr.File(label="Input Files", file_count="multiple"), | |
| gr.Checkbox(label="Use GPU Acceleration"), | |
| gr.Dropdown( | |
| choices=["original", "jpg", "png", "mp4", "mp3", "pdf", "docx", "xlsx"], | |
| label="Output Format", | |
| value="original" | |
| ), | |
| gr.Slider(1, 100, 50, step=1, label="Compression Level") | |
| ], | |
| outputs=gr.File(label="Compressed Files", file_count="multiple"), | |
| title="GPU-Accelerated Compression Toolkit", | |
| description="Drag and drop files for compression of images, videos, audio, documents, spreadsheets, and other files. File type is automatically detected.", | |
| allow_flagging="never" | |
| ) | |
| preview = gr.Interface( | |
| fn=update_preview, | |
| inputs=[ | |
| gr.File(label="Input File"), | |
| gr.Slider(1, 100, 50, step=1, label="Compression Level"), | |
| gr.Checkbox(label="Use GPU Acceleration") | |
| ], | |
| outputs=gr.Image(label="Preview"), | |
| title="Real-time Compression Preview", | |
| live=True, | |
| allow_flagging="never" | |
| ) | |
| return gr.TabbedInterface([iface, preview], ["Compress", "Preview"]) | |
| if __name__ == "__main__": | |
| toolkit = GPUAcceleratedCompressionToolkit() | |
| interface = gradio_interface(toolkit) | |
| interface.launch(share=True) |