Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -27,10 +27,11 @@ except Exception:
|
|
| 27 |
logger.exception("Failed to load GLiNER model")
|
| 28 |
raise
|
| 29 |
|
| 30 |
-
# Regex patterns
|
| 31 |
EMAIL_REGEX = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
|
| 32 |
-
WEBSITE_REGEX = re.compile(r"
|
| 33 |
-
|
|
|
|
| 34 |
SAUDI_CODE = '+966'
|
| 35 |
UAE_CODE = '+971'
|
| 36 |
PHONE_REGEX = re.compile(r'^(?:\+9665\d{8}|\+9715\d{8}|05\d{8}|5\d{8})$')
|
|
@@ -42,6 +43,10 @@ def extract_emails(text: str) -> list[str]:
|
|
| 42 |
def extract_websites(text: str) -> list[str]:
|
| 43 |
return [m.lower() for m in WEBSITE_REGEX.findall(text)]
|
| 44 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 45 |
def clean_phone_number(phone: str) -> str | None:
|
| 46 |
cleaned = re.sub(r"[^\d+]", "", phone)
|
| 47 |
# International formats
|
|
@@ -51,11 +56,12 @@ def clean_phone_number(phone: str) -> str | None:
|
|
| 51 |
return cleaned
|
| 52 |
# Local to international
|
| 53 |
if cleaned.startswith('05') and len(cleaned) == 10:
|
| 54 |
-
|
|
|
|
| 55 |
if cleaned.startswith('5') and len(cleaned) == 9:
|
| 56 |
-
return
|
| 57 |
if cleaned.startswith('9665') and len(cleaned) == 12:
|
| 58 |
-
return
|
| 59 |
return None
|
| 60 |
|
| 61 |
def process_phone_numbers(text: str) -> list[str]:
|
|
@@ -66,12 +72,6 @@ def process_phone_numbers(text: str) -> list[str]:
|
|
| 66 |
found.append(c)
|
| 67 |
return list(set(found))
|
| 68 |
|
| 69 |
-
def normalize_website(url: str) -> str | None:
|
| 70 |
-
u = url.lower().replace('www.', '').split('/')[0]
|
| 71 |
-
if re.match(r"^[a-z0-9-]+\.[a-z]{2,}$", u):
|
| 72 |
-
return f"www.{u}"
|
| 73 |
-
return None
|
| 74 |
-
|
| 75 |
def extract_address(ocr_texts: list[str]) -> str | None:
|
| 76 |
keywords = ["block","street","ave","area","industrial","road"]
|
| 77 |
parts = [t for t in ocr_texts if any(kw in t.lower() for kw in keywords)]
|
|
@@ -119,9 +119,9 @@ def deduplicate_data(results: dict[str, list[str]]) -> None:
|
|
| 119 |
seen.add(norm); out.append(norm)
|
| 120 |
return out
|
| 121 |
# Normalize lists
|
| 122 |
-
results['Email Address'] = clean_list(results
|
| 123 |
-
results['Website'] = clean_list(results
|
| 124 |
-
results['Phone Number'] = clean_list(results
|
| 125 |
# Others: simple dedupe
|
| 126 |
for key in ['Person Name','Company Name','Job Title','Address','QR Code']:
|
| 127 |
seen = set(); out = []
|
|
@@ -150,27 +150,35 @@ def inference(img: Image.Image, confidence: float):
|
|
| 150 |
# Entity processing
|
| 151 |
for ent in entities:
|
| 152 |
txt, lbl = ent['text'].strip(), ent['label'].lower()
|
| 153 |
-
if lbl == 'person name':
|
| 154 |
-
|
| 155 |
-
elif lbl == '
|
|
|
|
|
|
|
|
|
|
| 156 |
elif lbl == 'phone number':
|
| 157 |
-
if (c:=clean_phone_number(txt)):
|
|
|
|
| 158 |
elif lbl == 'email address' and EMAIL_REGEX.fullmatch(txt):
|
| 159 |
results['Email Address'].append(txt.lower())
|
| 160 |
-
elif lbl == 'website' and WEBSITE_REGEX.
|
| 161 |
-
if (n:=normalize_website(txt)):
|
| 162 |
-
|
|
|
|
|
|
|
| 163 |
# Regex fallbacks
|
| 164 |
results['Email Address'] += extract_emails(full_text)
|
| 165 |
results['Website'] += extract_websites(full_text)
|
| 166 |
# Phone regex fallback
|
| 167 |
results['Phone Number'] += process_phone_numbers(full_text)
|
| 168 |
-
# QR
|
| 169 |
-
if qr := scan_qr_code(img):
|
|
|
|
| 170 |
# Address fallback
|
| 171 |
if not results['Address']:
|
| 172 |
-
if addr := extract_address(ocr_texts):
|
| 173 |
-
|
|
|
|
| 174 |
deduplicate_data(results)
|
| 175 |
# Company fallback
|
| 176 |
if not results['Company Name']:
|
|
@@ -184,17 +192,18 @@ def inference(img: Image.Image, confidence: float):
|
|
| 184 |
if not results['Person Name']:
|
| 185 |
for t in ocr_texts:
|
| 186 |
if re.match(r'^(?:[A-Z][a-z]+\s?){2,}$', t):
|
| 187 |
-
results['Person Name'].append(t)
|
| 188 |
-
|
| 189 |
-
|
|
|
|
| 190 |
with tempfile.NamedTemporaryFile(suffix='.csv', delete=False, mode='w') as f:
|
| 191 |
pd.DataFrame([csv_map]).to_csv(f, index=False)
|
| 192 |
csv_path = f.name
|
| 193 |
-
return full_text,
|
| 194 |
except Exception:
|
| 195 |
err = traceback.format_exc()
|
| 196 |
logger.error(f"Processing failed: {err}")
|
| 197 |
-
return '', {}, None, f"Error:\n{err}"
|
| 198 |
|
| 199 |
# Gradio Interface
|
| 200 |
if __name__ == '__main__':
|
|
@@ -211,4 +220,3 @@ if __name__ == '__main__':
|
|
| 211 |
css=".gr-interface {max-width: 800px !important;}"
|
| 212 |
)
|
| 213 |
demo.launch()
|
| 214 |
-
|
|
|
|
| 27 |
logger.exception("Failed to load GLiNER model")
|
| 28 |
raise
|
| 29 |
|
| 30 |
+
# Regex patterns for emails and websites
|
| 31 |
EMAIL_REGEX = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
|
| 32 |
+
WEBSITE_REGEX = re.compile(r"(?:https?://)?(?:www\.)?([A-Za-z0-9-]+\.[A-Za-z]{2,})")
|
| 33 |
+
|
| 34 |
+
# Phone number constants and regex for Saudi/UAE support
|
| 35 |
SAUDI_CODE = '+966'
|
| 36 |
UAE_CODE = '+971'
|
| 37 |
PHONE_REGEX = re.compile(r'^(?:\+9665\d{8}|\+9715\d{8}|05\d{8}|5\d{8})$')
|
|
|
|
| 43 |
def extract_websites(text: str) -> list[str]:
|
| 44 |
return [m.lower() for m in WEBSITE_REGEX.findall(text)]
|
| 45 |
|
| 46 |
+
def normalize_website(url: str) -> str | None:
|
| 47 |
+
u = url.lower().replace('www.', '').split('/')[0]
|
| 48 |
+
return f"www.{u}" if re.match(r"^[a-z0-9-]+\.[a-z]{2,}$", u) else None
|
| 49 |
+
|
| 50 |
def clean_phone_number(phone: str) -> str | None:
|
| 51 |
cleaned = re.sub(r"[^\d+]", "", phone)
|
| 52 |
# International formats
|
|
|
|
| 56 |
return cleaned
|
| 57 |
# Local to international
|
| 58 |
if cleaned.startswith('05') and len(cleaned) == 10:
|
| 59 |
+
# Determine country by leading digit after 0 (6 Saudi, 5 UAE)
|
| 60 |
+
return (SAUDI_CODE if cleaned[1]=='5' and cleaned[1:2] == '5' else UAE_CODE) + cleaned[1:]
|
| 61 |
if cleaned.startswith('5') and len(cleaned) == 9:
|
| 62 |
+
return UAE_CODE + cleaned
|
| 63 |
if cleaned.startswith('9665') and len(cleaned) == 12:
|
| 64 |
+
return '+' + cleaned
|
| 65 |
return None
|
| 66 |
|
| 67 |
def process_phone_numbers(text: str) -> list[str]:
|
|
|
|
| 72 |
found.append(c)
|
| 73 |
return list(set(found))
|
| 74 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 75 |
def extract_address(ocr_texts: list[str]) -> str | None:
|
| 76 |
keywords = ["block","street","ave","area","industrial","road"]
|
| 77 |
parts = [t for t in ocr_texts if any(kw in t.lower() for kw in keywords)]
|
|
|
|
| 119 |
seen.add(norm); out.append(norm)
|
| 120 |
return out
|
| 121 |
# Normalize lists
|
| 122 |
+
results['Email Address'] = clean_list(results.get('Email Address', []), lambda e: e.lower())
|
| 123 |
+
results['Website'] = clean_list(results.get('Website', []), normalize_website)
|
| 124 |
+
results['Phone Number'] = clean_list(results.get('Phone Number', []), clean_phone_number)
|
| 125 |
# Others: simple dedupe
|
| 126 |
for key in ['Person Name','Company Name','Job Title','Address','QR Code']:
|
| 127 |
seen = set(); out = []
|
|
|
|
| 150 |
# Entity processing
|
| 151 |
for ent in entities:
|
| 152 |
txt, lbl = ent['text'].strip(), ent['label'].lower()
|
| 153 |
+
if lbl == 'person name':
|
| 154 |
+
results['Person Name'].append(txt)
|
| 155 |
+
elif lbl == 'company name':
|
| 156 |
+
results['Company Name'].append(txt)
|
| 157 |
+
elif lbl == 'job title':
|
| 158 |
+
results['Job Title'].append(txt.title())
|
| 159 |
elif lbl == 'phone number':
|
| 160 |
+
if (c:=clean_phone_number(txt)):
|
| 161 |
+
results['Phone Number'].append(c)
|
| 162 |
elif lbl == 'email address' and EMAIL_REGEX.fullmatch(txt):
|
| 163 |
results['Email Address'].append(txt.lower())
|
| 164 |
+
elif lbl == 'website' and WEBSITE_REGEX.search(txt):
|
| 165 |
+
if (n:=normalize_website(txt)):
|
| 166 |
+
results['Website'].append(n)
|
| 167 |
+
elif lbl == 'address':
|
| 168 |
+
results['Address'].append(txt)
|
| 169 |
# Regex fallbacks
|
| 170 |
results['Email Address'] += extract_emails(full_text)
|
| 171 |
results['Website'] += extract_websites(full_text)
|
| 172 |
# Phone regex fallback
|
| 173 |
results['Phone Number'] += process_phone_numbers(full_text)
|
| 174 |
+
# QR code
|
| 175 |
+
if qr := scan_qr_code(img):
|
| 176 |
+
results['QR Code'].append(qr)
|
| 177 |
# Address fallback
|
| 178 |
if not results['Address']:
|
| 179 |
+
if addr := extract_address(ocr_texts):
|
| 180 |
+
results['Address'].append(addr)
|
| 181 |
+
# Deduplicate
|
| 182 |
deduplicate_data(results)
|
| 183 |
# Company fallback
|
| 184 |
if not results['Company Name']:
|
|
|
|
| 192 |
if not results['Person Name']:
|
| 193 |
for t in ocr_texts:
|
| 194 |
if re.match(r'^(?:[A-Z][a-z]+\s?){2,}$', t):
|
| 195 |
+
results['Person Name'].append(t)
|
| 196 |
+
break
|
| 197 |
+
# Build CSV map including all keys
|
| 198 |
+
csv_map = {k: '; '.join(v) for k,v in results.items()}
|
| 199 |
with tempfile.NamedTemporaryFile(suffix='.csv', delete=False, mode='w') as f:
|
| 200 |
pd.DataFrame([csv_map]).to_csv(f, index=False)
|
| 201 |
csv_path = f.name
|
| 202 |
+
return full_text, results, csv_path, ''
|
| 203 |
except Exception:
|
| 204 |
err = traceback.format_exc()
|
| 205 |
logger.error(f"Processing failed: {err}")
|
| 206 |
+
return '', {k: [] for k in ['Person Name','Company Name','Job Title','Phone Number','Email Address','Address','Website','QR Code']}, None, f"Error:\n{err}"
|
| 207 |
|
| 208 |
# Gradio Interface
|
| 209 |
if __name__ == '__main__':
|
|
|
|
| 220 |
css=".gr-interface {max-width: 800px !important;}"
|
| 221 |
)
|
| 222 |
demo.launch()
|
|
|