|
|
import torch |
|
|
import easyocr |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
from typing import List, Dict |
|
|
import re |
|
|
|
|
|
class OCREngineManager: |
|
|
"""Text extraction using EasyOCR with brand-optimized preprocessing""" |
|
|
|
|
|
def __init__(self): |
|
|
print("Loading EasyOCR (English + Traditional Chinese)...") |
|
|
|
|
|
|
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
print(" Attempting GPU initialization...") |
|
|
self.reader = easyocr.Reader(['en', 'ch_tra'], gpu=True) |
|
|
print(" β EasyOCR loaded with GPU") |
|
|
else: |
|
|
print(" CUDA not available, using CPU...") |
|
|
self.reader = easyocr.Reader(['en', 'ch_tra'], gpu=False) |
|
|
print(" β EasyOCR loaded with CPU") |
|
|
except Exception as e: |
|
|
print(f" β οΈ GPU initialization failed: {e}") |
|
|
print(" Falling back to CPU...") |
|
|
self.reader = easyocr.Reader(['en', 'ch_tra'], gpu=False) |
|
|
print(" β EasyOCR loaded with CPU (fallback)") |
|
|
|
|
|
print("β EasyOCR loaded") |
|
|
|
|
|
def extract_text(self, image: Image.Image, use_brand_preprocessing: bool = False) -> List[Dict]: |
|
|
"""Extract text from image with optional brand-optimized preprocessing""" |
|
|
if use_brand_preprocessing: |
|
|
|
|
|
processed_image = self.preprocess_for_brand_ocr(image) |
|
|
img_array = np.array(processed_image) |
|
|
else: |
|
|
img_array = np.array(image) |
|
|
|
|
|
|
|
|
if use_brand_preprocessing: |
|
|
results = self.reader.readtext( |
|
|
img_array, |
|
|
detail=1, |
|
|
paragraph=False, |
|
|
min_size=10, |
|
|
text_threshold=0.5, |
|
|
link_threshold=0.3, |
|
|
contrast_ths=0.1, |
|
|
adjust_contrast=0.8 |
|
|
) |
|
|
else: |
|
|
results = self.reader.readtext( |
|
|
img_array, |
|
|
detail=1, |
|
|
paragraph=False, |
|
|
min_size=20, |
|
|
text_threshold=0.7, |
|
|
link_threshold=0.4 |
|
|
) |
|
|
|
|
|
structured_results = [] |
|
|
for bbox, text, confidence in results: |
|
|
structured_results.append({ |
|
|
'bbox': bbox, |
|
|
'text': self.clean_and_normalize(text), |
|
|
'confidence': confidence, |
|
|
'raw_text': text |
|
|
}) |
|
|
|
|
|
return structured_results |
|
|
|
|
|
def clean_and_normalize(self, text: str) -> str: |
|
|
"""Clean and normalize text""" |
|
|
|
|
|
text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text) |
|
|
text = ' '.join(text.split()) |
|
|
return text.upper() |
|
|
|
|
|
def preprocess_for_brand_ocr(self, image_region: Image.Image) -> Image.Image: |
|
|
""" |
|
|
Preprocess image for brand OCR recognition |
|
|
Optimizes for detecting brand logos and text on products (especially metallic logos) |
|
|
|
|
|
Args: |
|
|
image_region: PIL Image (typically a cropped region) |
|
|
|
|
|
Returns: |
|
|
Preprocessed PIL Image |
|
|
""" |
|
|
|
|
|
img_array = np.array(image_region) |
|
|
|
|
|
|
|
|
if len(img_array.shape) == 3: |
|
|
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
|
|
else: |
|
|
gray = img_array |
|
|
|
|
|
|
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) |
|
|
enhanced = clahe.apply(gray) |
|
|
|
|
|
|
|
|
denoised = cv2.fastNlMeansDenoising(enhanced, None, h=8, templateWindowSize=7, searchWindowSize=21) |
|
|
|
|
|
|
|
|
|
|
|
binary = cv2.adaptiveThreshold( |
|
|
denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY, 15, 2 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.uint8) |
|
|
morph = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) |
|
|
|
|
|
|
|
|
kernel_sharp = np.array([[-1, -1, -1], [-1, 11, -1], [-1, -1, -1]]) |
|
|
sharpened = cv2.filter2D(morph, -1, kernel_sharp) |
|
|
|
|
|
|
|
|
return Image.fromarray(sharpened) |
|
|
|
|
|
print("β OCREngineManager (with brand OCR preprocessing) defined") |
|
|
|