|
|
|
|
|
"""
|
|
|
Comprehensive Test Suite for Unified AI Services
|
|
|
Tests the unified application and all integrated services (NER, OCR, RAG)
|
|
|
Combines functionality from test_rag.py and test_ner.py with new unified tests
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import httpx
|
|
|
import json
|
|
|
import io
|
|
|
import sys
|
|
|
import time
|
|
|
import tempfile
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
import uuid as python_uuid
|
|
|
|
|
|
|
|
|
try:
|
|
|
from configs import get_config, validate_environment
|
|
|
config = get_config()
|
|
|
except ImportError:
|
|
|
print("⚠️ Could not import configs. Using default values.")
|
|
|
config = None
|
|
|
|
|
|
|
|
|
UNIFIED_URL = "http://localhost:8000"
|
|
|
NER_URL = "http://localhost:8500"
|
|
|
OCR_URL = "http://localhost:8400"
|
|
|
RAG_URL = "http://localhost:8401"
|
|
|
TEST_TIMEOUT = 300
|
|
|
|
|
|
|
|
|
THAI_CYANIDE_MURDER_CASE = """
|
|
|
เหตุฆาตกรรมด้วยไซยาไนด์ พ.ศ. 2566
|
|
|
|
|
|
คดีฆาตกรรมต่อเนื่องที่สั่นสะเทือนสังคมไทย เกิดขึ้นระหว่างเดือนเมษายน-ตุลาคม พ.ศ. 2566
|
|
|
โดยมีนางสาวสาริณี ชัยวัฒน์ หรือ "แอม ไซยาไนด์" อายุ 36 ปี เป็นผู้ต้องหา
|
|
|
|
|
|
รายละเอียดคดี:
|
|
|
ผู้ต้องหาได้ทำการวางยาพิษไซยาไนด์ (Potassium Cyanide) ในอาหารและเครื่องดื่มของเหยื่อหลายราย
|
|
|
เหยื่อรายแรกคือ นางสิริพร บุญลาภวนิช อายุ 32 ปี เสียชีวิตเมื่อวันที่ 14 เมษายน 2566 ที่จังหวัดกาญจนบุรี
|
|
|
เหยื่อรายที่สอง นายสุรชัย อยู่คงคลัง อายุ 45 ปี เสียชีวิตเมื่อวันที่ 2 พฤษภาคม 2566 ที่จังหวัดราชบุรี
|
|
|
|
|
|
การสืบสวน:
|
|
|
ตำรวจภูธรภาค 7 ร่วมกับ สำนักงานตำรวจแห่งชาติ ทำการสืบสวน
|
|
|
พบหลักฐานจากกล้องวงจรปิด (CCTV) ในหลายพื้นที่
|
|
|
ตรวจพบสารไซยาไนด์ในร่างกายเหยื่อทุกราย
|
|
|
|
|
|
การจับกุม:
|
|
|
วันที่ 3 ตุลาคม 2566 ตำรวจจับกุมตัวผู้ต้องหาได้ที่โรงแรมเดอะ บายแซด ตั้งอยู่ที่ ถนนรามคำแหง กรุงเทพมหานคร
|
|
|
พบเอกสารปลอม บัตรประชาชนปลอม และวัตถุพยานสำคัญอื่นๆ
|
|
|
ยึดทรัพย์สินที่ได้จากการกระทำผิด มูลค่ารวมกว่า 2 ล้านบาท
|
|
|
"""
|
|
|
|
|
|
ENGLISH_CYBERSECURITY_CASE = """
|
|
|
Major Cybersecurity Incident Report - Operation Digital Shield
|
|
|
|
|
|
Incident Overview:
|
|
|
On October 15, 2024, CyberDefense Corp, a leading cybersecurity firm headquartered in Austin, Texas, detected a sophisticated Advanced Persistent Threat (APT) targeting critical infrastructure across Southeast Asia.
|
|
|
|
|
|
Key Personnel:
|
|
|
- Dr. Sarah Chen, Chief Security Officer at CyberDefense Corp
|
|
|
- Agent Michael Rodriguez, FBI Cyber Division
|
|
|
- Captain Lisa Thompson, US Cyber Command
|
|
|
|
|
|
Technical Details:
|
|
|
The attackers used a custom malware strain called "DeepStrike" developed by the Shadow Dragon group
|
|
|
Primary attack vector: spear-phishing emails containing weaponized PDF documents
|
|
|
Estimated financial damage: $50 million USD across affected organizations
|
|
|
"""
|
|
|
|
|
|
TEST_URLS = [
|
|
|
"https://httpbin.org/html",
|
|
|
"https://httpbin.org/json"
|
|
|
]
|
|
|
|
|
|
class TestResult:
|
|
|
"""Class to track test results"""
|
|
|
def __init__(self):
|
|
|
self.total_tests = 0
|
|
|
self.passed_tests = 0
|
|
|
self.failed_tests = 0
|
|
|
self.test_results = []
|
|
|
self.warnings = []
|
|
|
|
|
|
def add_result(self, test_name: str, passed: bool, message: str = "", details: Dict = None):
|
|
|
"""Add a test result"""
|
|
|
self.total_tests += 1
|
|
|
if passed:
|
|
|
self.passed_tests += 1
|
|
|
print(f"✅ {test_name}")
|
|
|
if message:
|
|
|
print(f" {message}")
|
|
|
else:
|
|
|
self.failed_tests += 1
|
|
|
print(f"❌ {test_name}: {message}")
|
|
|
|
|
|
self.test_results.append({
|
|
|
'test_name': test_name,
|
|
|
'passed': passed,
|
|
|
'message': message,
|
|
|
'details': details or {}
|
|
|
})
|
|
|
|
|
|
def add_warning(self, test_name: str, message: str):
|
|
|
"""Add a warning (doesn't count as pass/fail)"""
|
|
|
print(f"⚠️ {test_name}: {message}")
|
|
|
self.warnings.append({
|
|
|
'test_name': test_name,
|
|
|
'message': message
|
|
|
})
|
|
|
|
|
|
def print_summary(self):
|
|
|
"""Print test summary"""
|
|
|
print("\n" + "="*60)
|
|
|
print("UNIFIED SYSTEM TEST SUMMARY")
|
|
|
print("="*60)
|
|
|
print(f"Total Tests: {self.total_tests}")
|
|
|
print(f"Passed: {self.passed_tests}")
|
|
|
print(f"Failed: {self.failed_tests}")
|
|
|
print(f"Warnings: {len(self.warnings)}")
|
|
|
print(f"Success Rate: {(self.passed_tests/self.total_tests*100):.1f}%" if self.total_tests > 0 else "0%")
|
|
|
|
|
|
if self.failed_tests > 0:
|
|
|
print(f"\n❌ FAILED TESTS:")
|
|
|
for result in self.test_results:
|
|
|
if not result['passed']:
|
|
|
print(f" - {result['test_name']}: {result['message']}")
|
|
|
|
|
|
if self.warnings:
|
|
|
print(f"\n⚠️ WARNINGS:")
|
|
|
for warning in self.warnings:
|
|
|
print(f" - {warning['test_name']}: {warning['message']}")
|
|
|
|
|
|
class UnifiedSystemTester:
|
|
|
"""Main test class for unified system"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.result = TestResult()
|
|
|
self.session = None
|
|
|
self.created_documents = []
|
|
|
self.created_analyses = []
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
self.session = httpx.AsyncClient(timeout=TEST_TIMEOUT)
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
if self.session:
|
|
|
await self.session.aclose()
|
|
|
|
|
|
async def make_request(self, method: str, url: str, **kwargs) -> httpx.Response:
|
|
|
"""Make HTTP request with error handling"""
|
|
|
try:
|
|
|
response = await self.session.request(method, url, **kwargs)
|
|
|
return response
|
|
|
except httpx.RequestError as e:
|
|
|
raise Exception(f"Request failed: {e}")
|
|
|
|
|
|
async def test_unified_app_health(self):
|
|
|
"""Test 1: Unified Application Health Check"""
|
|
|
print("🔍 Test 1: Unified Application Health Check")
|
|
|
try:
|
|
|
response = await self.make_request('GET', f"{UNIFIED_URL}/health")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
status = data.get("status")
|
|
|
services = data.get("services", [])
|
|
|
|
|
|
healthy_services = [s for s in services if s.get("health")]
|
|
|
total_services = len(services)
|
|
|
|
|
|
if status in ["healthy", "degraded"] and healthy_services:
|
|
|
message = f"Status: {status}, Services: {len(healthy_services)}/{total_services} healthy"
|
|
|
for service in services:
|
|
|
service_status = "✅" if service.get("health") else "❌"
|
|
|
message += f"\n {service_status} {service.get('name')}: {service.get('status')} ({service.get('response_time', 0):.3f}s)"
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Unified App Health Check",
|
|
|
True,
|
|
|
message,
|
|
|
data
|
|
|
)
|
|
|
return True
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified App Health Check",
|
|
|
False,
|
|
|
f"System unhealthy: {data}"
|
|
|
)
|
|
|
return False
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified App Health Check",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}: {response.text}"
|
|
|
)
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
|
|
|
if "connection" in str(e).lower():
|
|
|
print(f"\n🔍 Connection Diagnostics:")
|
|
|
print(f" Unified App URL: {UNIFIED_URL}")
|
|
|
print(f" Error: {e}")
|
|
|
print(f"\n💡 Possible Issues:")
|
|
|
print(f" 1. Unified app is not running")
|
|
|
print(f" 2. Wrong host/port in configuration")
|
|
|
print(f" 3. Services failed to start")
|
|
|
print(f"\n🚀 To Start Unified App:")
|
|
|
print(f" python app.py")
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Unified App Health Check",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def test_individual_service_health(self):
|
|
|
"""Test 2: Individual Service Health Checks"""
|
|
|
print("🔍 Test 2: Individual Service Health Checks")
|
|
|
|
|
|
services = [
|
|
|
("NER", NER_URL),
|
|
|
("OCR", OCR_URL),
|
|
|
("RAG", RAG_URL)
|
|
|
]
|
|
|
|
|
|
all_healthy = True
|
|
|
service_statuses = {}
|
|
|
|
|
|
for service_name, service_url in services:
|
|
|
try:
|
|
|
response = await self.make_request('GET', f"{service_url}/health")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
status = data.get("status", "unknown")
|
|
|
service_statuses[service_name] = {
|
|
|
"healthy": True,
|
|
|
"status": status,
|
|
|
"details": data
|
|
|
}
|
|
|
print(f" ✅ {service_name}: {status}")
|
|
|
else:
|
|
|
service_statuses[service_name] = {
|
|
|
"healthy": False,
|
|
|
"status": f"HTTP {response.status_code}",
|
|
|
"details": None
|
|
|
}
|
|
|
print(f" ❌ {service_name}: HTTP {response.status_code}")
|
|
|
all_healthy = False
|
|
|
|
|
|
except Exception as e:
|
|
|
service_statuses[service_name] = {
|
|
|
"healthy": False,
|
|
|
"status": f"Error: {e}",
|
|
|
"details": None
|
|
|
}
|
|
|
print(f" ❌ {service_name}: {e}")
|
|
|
all_healthy = False
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Individual Service Health",
|
|
|
all_healthy,
|
|
|
f"Services healthy: {sum(1 for s in service_statuses.values() if s['healthy'])}/{len(services)}",
|
|
|
service_statuses
|
|
|
)
|
|
|
|
|
|
return all_healthy
|
|
|
|
|
|
async def test_unified_analysis_text(self):
|
|
|
"""Test 3: Unified Analysis with Text"""
|
|
|
print("🔍 Test 3: Unified Analysis with Text")
|
|
|
|
|
|
try:
|
|
|
request_data = {
|
|
|
"text": THAI_CYANIDE_MURDER_CASE,
|
|
|
"extract_relationships": True,
|
|
|
"include_embeddings": False,
|
|
|
"include_summary": True,
|
|
|
"generate_graph_files": True,
|
|
|
"export_formats": ["neo4j", "json"],
|
|
|
"enable_rag_indexing": True,
|
|
|
"rag_title": "Cyanide Murder Case Analysis",
|
|
|
"rag_keywords": ["cyanide", "murder", "investigation", "thai"],
|
|
|
"rag_metadata": {"test": True, "case_type": "criminal"}
|
|
|
}
|
|
|
|
|
|
response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if data.get("success"):
|
|
|
service_calls = data.get("service_calls", [])
|
|
|
ner_analysis = data.get("ner_analysis", {})
|
|
|
rag_document = data.get("rag_document", {})
|
|
|
processing_time = data.get("processing_time", 0)
|
|
|
|
|
|
|
|
|
entities = ner_analysis.get("entities", [])
|
|
|
relationships = ner_analysis.get("relationships", [])
|
|
|
|
|
|
|
|
|
if ner_analysis.get("analysis_id"):
|
|
|
self.created_analyses.append(ner_analysis["analysis_id"])
|
|
|
if rag_document and rag_document.get("document_id"):
|
|
|
self.created_documents.append(rag_document["document_id"])
|
|
|
|
|
|
message = f"Service calls: {', '.join(service_calls)}"
|
|
|
message += f"\n Processing time: {processing_time:.2f}s"
|
|
|
message += f"\n NER entities: {len(entities)}"
|
|
|
message += f"\n NER relationships: {len(relationships)}"
|
|
|
if rag_document:
|
|
|
message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}"
|
|
|
message += f"\n RAG chunks: {rag_document.get('total_chunks', 0)}"
|
|
|
|
|
|
|
|
|
expected_calls = ["ner_text"]
|
|
|
if "enable_rag_indexing" in request_data and request_data["enable_rag_indexing"]:
|
|
|
expected_calls.append("rag_upload")
|
|
|
|
|
|
all_expected_calls = all(call in service_calls for call in expected_calls)
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (Text)",
|
|
|
all_expected_calls and entities and len(service_calls) > 0,
|
|
|
message,
|
|
|
data
|
|
|
)
|
|
|
return data
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (Text)",
|
|
|
False,
|
|
|
data.get("error", "Analysis failed")
|
|
|
)
|
|
|
return None
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (Text)",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}: {response.text[:200]}"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (Text)",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return None
|
|
|
|
|
|
async def test_unified_analysis_url(self):
|
|
|
"""Test 4: Unified Analysis with URL"""
|
|
|
print("🔍 Test 4: Unified Analysis with URL")
|
|
|
|
|
|
try:
|
|
|
request_data = {
|
|
|
"url": "https://httpbin.org/html",
|
|
|
"extract_relationships": True,
|
|
|
"include_embeddings": False,
|
|
|
"include_summary": True,
|
|
|
"generate_graph_files": False,
|
|
|
"export_formats": ["json"],
|
|
|
"enable_rag_indexing": True,
|
|
|
"rag_title": "Test URL Document",
|
|
|
"rag_keywords": ["test", "url", "httpbin"],
|
|
|
"rag_metadata": {"test": True, "source": "httpbin"}
|
|
|
}
|
|
|
|
|
|
response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if data.get("success"):
|
|
|
service_calls = data.get("service_calls", [])
|
|
|
ner_analysis = data.get("ner_analysis", {})
|
|
|
rag_document = data.get("rag_document", {})
|
|
|
|
|
|
|
|
|
if ner_analysis.get("analysis_id"):
|
|
|
self.created_analyses.append(ner_analysis["analysis_id"])
|
|
|
if rag_document and rag_document.get("document_id"):
|
|
|
self.created_documents.append(rag_document["document_id"])
|
|
|
|
|
|
message = f"Service calls: {', '.join(service_calls)}"
|
|
|
message += f"\n NER analysis ID: {ner_analysis.get('analysis_id', 'N/A')}"
|
|
|
if rag_document:
|
|
|
message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}"
|
|
|
|
|
|
|
|
|
has_ner_url = "ner_url" in service_calls
|
|
|
has_rag_url = "rag_url" in service_calls
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (URL)",
|
|
|
has_ner_url and len(service_calls) > 0,
|
|
|
message,
|
|
|
data
|
|
|
)
|
|
|
return data
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (URL)",
|
|
|
False,
|
|
|
data.get("error", "URL analysis failed")
|
|
|
)
|
|
|
return None
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (URL)",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}: {response.text[:200]}"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"Unified Analysis (URL)",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return None
|
|
|
|
|
|
async def test_combined_search(self):
|
|
|
"""Test 5: Combined Search with NER Analysis"""
|
|
|
print("🔍 Test 5: Combined Search with NER Analysis")
|
|
|
|
|
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
try:
|
|
|
request_data = {
|
|
|
"query": "investigation murder case",
|
|
|
"limit": 5,
|
|
|
"similarity_threshold": 0.1,
|
|
|
"include_ner_analysis": True,
|
|
|
"ner_export_formats": ["json"]
|
|
|
}
|
|
|
|
|
|
response = await self.make_request('POST', f"{UNIFIED_URL}/search/combined", json=request_data)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
if data.get("success"):
|
|
|
service_calls = data.get("service_calls", [])
|
|
|
search_results = data.get("search_results", {})
|
|
|
results = search_results.get("results", [])
|
|
|
ner_analyses = search_results.get("ner_analyses", [])
|
|
|
|
|
|
message = f"Service calls: {', '.join(service_calls)}"
|
|
|
message += f"\n Search results: {len(results)}"
|
|
|
message += f"\n NER analyses: {len(ner_analyses)}"
|
|
|
message += f"\n Processing time: {data.get('processing_time', 0):.2f}s"
|
|
|
|
|
|
|
|
|
has_rag_search = "rag_search" in service_calls
|
|
|
has_ner_analysis = any("ner_text_" in call for call in service_calls)
|
|
|
|
|
|
success = has_rag_search and len(service_calls) > 0
|
|
|
if len(results) == 0:
|
|
|
self.result.add_warning(
|
|
|
"Combined Search",
|
|
|
"No search results found - may need more indexed content"
|
|
|
)
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Combined Search",
|
|
|
success,
|
|
|
message,
|
|
|
data
|
|
|
)
|
|
|
return data
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Combined Search",
|
|
|
False,
|
|
|
data.get("error", "Search failed")
|
|
|
)
|
|
|
return None
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Combined Search",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}: {response.text[:200]}"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"Combined Search",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return None
|
|
|
|
|
|
async def test_service_proxies(self):
|
|
|
"""Test 6: Service Proxy Endpoints"""
|
|
|
print("🔍 Test 6: Service Proxy Endpoints")
|
|
|
|
|
|
proxy_tests = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
ner_data = {
|
|
|
"text": "Test entity recognition with John Smith working at Microsoft in Seattle.",
|
|
|
"extract_relationships": True,
|
|
|
"include_embeddings": False,
|
|
|
"generate_graph_files": False
|
|
|
}
|
|
|
|
|
|
response = await self.make_request('POST', f"{UNIFIED_URL}/ner/analyze/text", json=ner_data)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
if result.get("success"):
|
|
|
entities = result.get("entities", [])
|
|
|
proxy_tests.append(("NER Proxy", True, f"Found {len(entities)} entities"))
|
|
|
|
|
|
|
|
|
if result.get("analysis_id"):
|
|
|
self.created_analyses.append(result["analysis_id"])
|
|
|
else:
|
|
|
proxy_tests.append(("NER Proxy", False, "Analysis failed"))
|
|
|
else:
|
|
|
proxy_tests.append(("NER Proxy", False, f"HTTP {response.status_code}"))
|
|
|
except Exception as e:
|
|
|
proxy_tests.append(("NER Proxy", False, str(e)))
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request('GET', f"{UNIFIED_URL}/ocr/health")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
proxy_tests.append(("OCR Proxy", True, "Health check passed"))
|
|
|
else:
|
|
|
proxy_tests.append(("OCR Proxy", False, f"HTTP {response.status_code}"))
|
|
|
except Exception as e:
|
|
|
proxy_tests.append(("OCR Proxy", False, str(e)))
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request('GET', f"{UNIFIED_URL}/rag/documents?limit=5")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
documents = result.get("documents", [])
|
|
|
proxy_tests.append(("RAG Proxy", True, f"Found {len(documents)} documents"))
|
|
|
else:
|
|
|
proxy_tests.append(("RAG Proxy", False, f"HTTP {response.status_code}"))
|
|
|
except Exception as e:
|
|
|
proxy_tests.append(("RAG Proxy", False, str(e)))
|
|
|
|
|
|
|
|
|
passed_proxies = sum(1 for _, passed, _ in proxy_tests if passed)
|
|
|
total_proxies = len(proxy_tests)
|
|
|
|
|
|
for test_name, passed, message in proxy_tests:
|
|
|
print(f" {'✅' if passed else '❌'} {test_name}: {message}")
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Service Proxies",
|
|
|
passed_proxies == total_proxies,
|
|
|
f"Proxies working: {passed_proxies}/{total_proxies}",
|
|
|
{"proxy_results": proxy_tests}
|
|
|
)
|
|
|
|
|
|
return passed_proxies > 0
|
|
|
|
|
|
async def test_file_upload_unified(self):
|
|
|
"""Test 7: File Upload through Unified Interface"""
|
|
|
print("🔍 Test 7: File Upload through Unified Interface")
|
|
|
|
|
|
try:
|
|
|
|
|
|
test_content = """
|
|
|
Technical Report: Advanced AI Systems
|
|
|
|
|
|
This report examines the integration of Named Entity Recognition (NER),
|
|
|
Optical Character Recognition (OCR), and Retrieval-Augmented Generation (RAG)
|
|
|
systems in a unified architecture.
|
|
|
|
|
|
Key Personnel:
|
|
|
- Dr. Alice Johnson, Lead AI Researcher at TechCorp
|
|
|
- Prof. Bob Smith, University of Technology
|
|
|
- Sarah Wilson, Data Scientist
|
|
|
|
|
|
Technical Components:
|
|
|
- Azure OpenAI for embeddings and language processing
|
|
|
- PostgreSQL with vector extensions for data storage
|
|
|
- FastAPI for microservice architecture
|
|
|
|
|
|
The system processes documents through multiple stages:
|
|
|
1. OCR extraction for scanned documents
|
|
|
2. NER analysis for entity and relationship extraction
|
|
|
3. RAG indexing for searchable knowledge base
|
|
|
|
|
|
Testing conducted on October 15, 2024 showed 95% accuracy.
|
|
|
Total budget: $250,000 for the complete implementation.
|
|
|
"""
|
|
|
|
|
|
|
|
|
file_content = test_content.encode('utf-8')
|
|
|
files = {"file": ("test_report.txt", io.BytesIO(file_content), "text/plain")}
|
|
|
data = {
|
|
|
"extract_relationships": "true",
|
|
|
"include_embeddings": "false",
|
|
|
"include_summary": "true",
|
|
|
"generate_graph_files": "true",
|
|
|
"export_formats": "neo4j,json"
|
|
|
}
|
|
|
|
|
|
response = await self.make_request(
|
|
|
'POST',
|
|
|
f"{UNIFIED_URL}/ner/analyze/file",
|
|
|
files=files,
|
|
|
data=data
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
if result.get("success"):
|
|
|
entities = result.get("entities", [])
|
|
|
relationships = result.get("relationships", [])
|
|
|
|
|
|
|
|
|
if result.get("analysis_id"):
|
|
|
self.created_analyses.append(result["analysis_id"])
|
|
|
|
|
|
message = f"File processed successfully"
|
|
|
message += f"\n Entities: {len(entities)}"
|
|
|
message += f"\n Relationships: {len(relationships)}"
|
|
|
message += f"\n Language: {result.get('language', 'unknown')}"
|
|
|
|
|
|
|
|
|
person_entities = [e for e in entities if e.get('label') == 'PERSON']
|
|
|
org_entities = [e for e in entities if e.get('label') == 'ORGANIZATION']
|
|
|
money_entities = [e for e in entities if e.get('label') == 'MONEY']
|
|
|
|
|
|
message += f"\n People found: {len(person_entities)}"
|
|
|
message += f"\n Organizations found: {len(org_entities)}"
|
|
|
message += f"\n Money amounts found: {len(money_entities)}"
|
|
|
|
|
|
success = len(entities) > 0 and result.get("analysis_id")
|
|
|
|
|
|
self.result.add_result(
|
|
|
"File Upload (Unified)",
|
|
|
success,
|
|
|
message,
|
|
|
result
|
|
|
)
|
|
|
return result
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"File Upload (Unified)",
|
|
|
False,
|
|
|
result.get("error", "File analysis failed")
|
|
|
)
|
|
|
return None
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"File Upload (Unified)",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}: {response.text[:200]}"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"File Upload (Unified)",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return None
|
|
|
|
|
|
async def test_service_discovery(self):
|
|
|
"""Test 8: Service Discovery and Listing"""
|
|
|
print("🔍 Test 8: Service Discovery and Listing")
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request('GET', f"{UNIFIED_URL}/services")
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
services = data.get("services", {})
|
|
|
unified = data.get("unified", {})
|
|
|
|
|
|
expected_services = ["ner", "ocr", "rag"]
|
|
|
found_services = list(services.keys())
|
|
|
|
|
|
message = f"Services discovered: {', '.join(found_services)}"
|
|
|
message += f"\n Unified endpoint: {unified.get('url', 'N/A')}"
|
|
|
|
|
|
for service_name, service_info in services.items():
|
|
|
endpoints = service_info.get("endpoints", [])
|
|
|
message += f"\n {service_name}: {len(endpoints)} endpoints"
|
|
|
|
|
|
all_expected_found = all(service in found_services for service in expected_services)
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Service Discovery",
|
|
|
all_expected_found,
|
|
|
message,
|
|
|
data
|
|
|
)
|
|
|
return data
|
|
|
else:
|
|
|
self.result.add_result(
|
|
|
"Service Discovery",
|
|
|
False,
|
|
|
f"HTTP {response.status_code}"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"Service Discovery",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return None
|
|
|
|
|
|
async def test_system_performance(self):
|
|
|
"""Test 9: System Performance and Reliability"""
|
|
|
print("🔍 Test 9: System Performance and Reliability")
|
|
|
|
|
|
try:
|
|
|
|
|
|
tasks = []
|
|
|
test_texts = [
|
|
|
"Performance test with Apple Inc and CEO Tim Cook in California.",
|
|
|
"Reliability testing of Microsoft Azure services in Seattle.",
|
|
|
"Load testing with Google Cloud Platform and AI systems."
|
|
|
]
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
for i, text in enumerate(test_texts):
|
|
|
task = self.make_request(
|
|
|
'POST',
|
|
|
f"{UNIFIED_URL}/ner/analyze/text",
|
|
|
json={
|
|
|
"text": text,
|
|
|
"extract_relationships": True,
|
|
|
"include_embeddings": False,
|
|
|
"generate_graph_files": False
|
|
|
}
|
|
|
)
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
total_time = time.time() - start_time
|
|
|
|
|
|
|
|
|
successful_requests = 0
|
|
|
total_entities = 0
|
|
|
|
|
|
for i, response in enumerate(responses):
|
|
|
if isinstance(response, Exception):
|
|
|
continue
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
if result.get("success"):
|
|
|
successful_requests += 1
|
|
|
entities = result.get("entities", [])
|
|
|
total_entities += len(entities)
|
|
|
|
|
|
|
|
|
if result.get("analysis_id"):
|
|
|
self.created_analyses.append(result["analysis_id"])
|
|
|
|
|
|
avg_time_per_request = total_time / len(test_texts)
|
|
|
|
|
|
message = f"Concurrent requests: {successful_requests}/{len(test_texts)} successful"
|
|
|
message += f"\n Total time: {total_time:.2f}s"
|
|
|
message += f"\n Avg time per request: {avg_time_per_request:.2f}s"
|
|
|
message += f"\n Total entities found: {total_entities}"
|
|
|
|
|
|
|
|
|
performance_ok = (
|
|
|
successful_requests >= len(test_texts) * 0.8 and
|
|
|
avg_time_per_request < 10.0
|
|
|
)
|
|
|
|
|
|
self.result.add_result(
|
|
|
"System Performance",
|
|
|
performance_ok,
|
|
|
message,
|
|
|
{
|
|
|
"successful_requests": successful_requests,
|
|
|
"total_requests": len(test_texts),
|
|
|
"total_time": total_time,
|
|
|
"avg_time_per_request": avg_time_per_request,
|
|
|
"total_entities": total_entities
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return performance_ok
|
|
|
|
|
|
except Exception as e:
|
|
|
self.result.add_result(
|
|
|
"System Performance",
|
|
|
False,
|
|
|
str(e)
|
|
|
)
|
|
|
return False
|
|
|
|
|
|
async def test_error_handling(self):
|
|
|
"""Test 10: Error Handling and Resilience"""
|
|
|
print("🔍 Test 10: Error Handling and Resilience")
|
|
|
|
|
|
error_tests = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request(
|
|
|
'POST',
|
|
|
f"{UNIFIED_URL}/analyze/unified",
|
|
|
json={"invalid": "data"}
|
|
|
)
|
|
|
|
|
|
if response.status_code in [400, 422]:
|
|
|
error_tests.append(("Invalid Request Handling", True, "Properly rejected invalid data"))
|
|
|
else:
|
|
|
error_tests.append(("Invalid Request Handling", False, f"Unexpected status: {response.status_code}"))
|
|
|
except Exception as e:
|
|
|
error_tests.append(("Invalid Request Handling", False, str(e)))
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request(
|
|
|
'POST',
|
|
|
f"{UNIFIED_URL}/ner/analyze/text",
|
|
|
json={"text": "", "extract_relationships": True}
|
|
|
)
|
|
|
|
|
|
if response.status_code in [400, 422]:
|
|
|
error_tests.append(("Empty Text Handling", True, "Properly rejected empty text"))
|
|
|
else:
|
|
|
result = response.json()
|
|
|
if not result.get("success"):
|
|
|
error_tests.append(("Empty Text Handling", True, "Failed gracefully"))
|
|
|
else:
|
|
|
error_tests.append(("Empty Text Handling", False, "Should have failed"))
|
|
|
except Exception as e:
|
|
|
error_tests.append(("Empty Text Handling", False, str(e)))
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = await self.make_request(
|
|
|
'POST',
|
|
|
f"{UNIFIED_URL}/analyze/unified",
|
|
|
json={
|
|
|
"url": "https://invalid-url-that-does-not-exist-12345.com",
|
|
|
"extract_relationships": True
|
|
|
}
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
if not result.get("success"):
|
|
|
error_tests.append(("Invalid URL Handling", True, "Failed gracefully with invalid URL"))
|
|
|
else:
|
|
|
error_tests.append(("Invalid URL Handling", False, "Should have failed"))
|
|
|
else:
|
|
|
error_tests.append(("Invalid URL Handling", True, f"Rejected invalid URL (HTTP {response.status_code})"))
|
|
|
except Exception as e:
|
|
|
error_tests.append(("Invalid URL Handling", False, str(e)))
|
|
|
|
|
|
|
|
|
passed_error_tests = sum(1 for _, passed, _ in error_tests if passed)
|
|
|
total_error_tests = len(error_tests)
|
|
|
|
|
|
for test_name, passed, message in error_tests:
|
|
|
print(f" {'✅' if passed else '❌'} {test_name}: {message}")
|
|
|
|
|
|
self.result.add_result(
|
|
|
"Error Handling",
|
|
|
passed_error_tests >= total_error_tests * 0.8,
|
|
|
f"Error tests passed: {passed_error_tests}/{total_error_tests}",
|
|
|
{"error_test_results": error_tests}
|
|
|
)
|
|
|
|
|
|
return passed_error_tests > 0
|
|
|
|
|
|
async def cleanup_test_data(self):
|
|
|
"""Clean up test data"""
|
|
|
print("\n🧹 Cleaning up test data...")
|
|
|
|
|
|
cleanup_count = 0
|
|
|
cleanup_errors = 0
|
|
|
|
|
|
|
|
|
for analysis_id in self.created_analyses:
|
|
|
try:
|
|
|
|
|
|
response = await self.make_request('DELETE', f"{NER_URL}/analysis/{analysis_id}")
|
|
|
if response.status_code in [200, 404]:
|
|
|
cleanup_count += 1
|
|
|
else:
|
|
|
cleanup_errors += 1
|
|
|
except Exception as e:
|
|
|
cleanup_errors += 1
|
|
|
print(f" ⚠️ Failed to cleanup analysis {analysis_id[:8]}...: {e}")
|
|
|
|
|
|
|
|
|
for document_id in self.created_documents:
|
|
|
try:
|
|
|
|
|
|
response = await self.make_request('DELETE', f"{UNIFIED_URL}/rag/documents/{document_id}")
|
|
|
if response.status_code in [200, 404]:
|
|
|
cleanup_count += 1
|
|
|
else:
|
|
|
cleanup_errors += 1
|
|
|
except Exception as e:
|
|
|
cleanup_errors += 1
|
|
|
print(f" ⚠️ Failed to cleanup document {document_id[:8]}...: {e}")
|
|
|
|
|
|
if cleanup_count > 0:
|
|
|
print(f" ✅ Cleaned up {cleanup_count} test items")
|
|
|
if cleanup_errors > 0:
|
|
|
print(f" ⚠️ Failed to cleanup {cleanup_errors} items")
|
|
|
|
|
|
async def run_comprehensive_tests(self):
|
|
|
"""Run all comprehensive unified system tests"""
|
|
|
print("🚀 Unified AI Services - Comprehensive Test Suite")
|
|
|
print("Testing: NER + OCR + RAG Integration with Unified Workflows")
|
|
|
print("=" * 80)
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
tests = [
|
|
|
("Unified App Health", self.test_unified_app_health),
|
|
|
("Individual Service Health", self.test_individual_service_health),
|
|
|
("Unified Analysis (Text)", self.test_unified_analysis_text),
|
|
|
("Unified Analysis (URL)", self.test_unified_analysis_url),
|
|
|
("Combined Search", self.test_combined_search),
|
|
|
("Service Proxies", self.test_service_proxies),
|
|
|
("File Upload (Unified)", self.test_file_upload_unified),
|
|
|
("Service Discovery", self.test_service_discovery),
|
|
|
("System Performance", self.test_system_performance),
|
|
|
("Error Handling", self.test_error_handling)
|
|
|
]
|
|
|
|
|
|
for test_name, test_func in tests:
|
|
|
print(f"\n" + "=" * 80)
|
|
|
try:
|
|
|
await test_func()
|
|
|
except Exception as e:
|
|
|
print(f"❌ {test_name} failed with exception: {e}")
|
|
|
self.result.add_result(test_name, False, f"Exception: {e}")
|
|
|
|
|
|
|
|
|
print(f"\n" + "=" * 80)
|
|
|
await self.cleanup_test_data()
|
|
|
|
|
|
|
|
|
total_time = time.time() - start_time
|
|
|
print(f"\n" + "=" * 80)
|
|
|
print("📊 UNIFIED SYSTEM COMPREHENSIVE TEST RESULTS")
|
|
|
print("=" * 80)
|
|
|
|
|
|
self.result.print_summary()
|
|
|
|
|
|
print(f"\nTEST EXECUTION:")
|
|
|
print(f"Total Time: {total_time:.2f} seconds")
|
|
|
print(f"Tests Created: NER analyses: {len(self.created_analyses)}, RAG documents: {len(self.created_documents)}")
|
|
|
|
|
|
passed = self.result.passed_tests
|
|
|
total = self.result.total_tests
|
|
|
|
|
|
if passed == total:
|
|
|
print(f"\n🎉 ALL UNIFIED SYSTEM TESTS PASSED!")
|
|
|
print(f"✅ Unified application is fully operational")
|
|
|
print(f"✅ All services are integrated and working")
|
|
|
print(f"✅ Combined workflows are functional")
|
|
|
print(f"✅ Service proxies are working")
|
|
|
print(f"✅ Error handling is robust")
|
|
|
|
|
|
print(f"\n🎯 UNIFIED SYSTEM CAPABILITIES VERIFIED:")
|
|
|
print(f" • NER + OCR + RAG service integration")
|
|
|
print(f" • Unified analysis workflows")
|
|
|
print(f" • Combined search with NER enhancement")
|
|
|
print(f" • Service proxy functionality")
|
|
|
print(f" • Multi-language support")
|
|
|
print(f" • Concurrent request handling")
|
|
|
print(f" • Comprehensive error handling")
|
|
|
print(f" • Real-time service health monitoring")
|
|
|
|
|
|
else:
|
|
|
print(f"\n⚠️ SOME UNIFIED SYSTEM TESTS FAILED")
|
|
|
print(f"❌ {self.result.failed_tests} out of {total} tests failed")
|
|
|
|
|
|
print(f"\n🔧 TROUBLESHOOTING STEPS:")
|
|
|
print(f"1. Check that all services are running:")
|
|
|
print(f" • NER Service: {NER_URL}/health")
|
|
|
print(f" • OCR Service: {OCR_URL}/health")
|
|
|
print(f" • RAG Service: {RAG_URL}/health")
|
|
|
print(f" • Unified App: {UNIFIED_URL}/health")
|
|
|
print(f"2. Verify configuration in .env file")
|
|
|
print(f"3. Check service logs for errors")
|
|
|
print(f"4. Ensure all dependencies are installed")
|
|
|
print(f"5. Verify database connectivity")
|
|
|
|
|
|
return passed == total
|
|
|
|
|
|
async def main():
|
|
|
"""Main test runner"""
|
|
|
if len(sys.argv) > 1:
|
|
|
unified_url = sys.argv[1]
|
|
|
else:
|
|
|
unified_url = UNIFIED_URL
|
|
|
|
|
|
|
|
|
global UNIFIED_URL
|
|
|
UNIFIED_URL = unified_url
|
|
|
|
|
|
print(f"🧪 Unified AI Services - Comprehensive Test Suite")
|
|
|
print(f"📡 Testing unified system at: {UNIFIED_URL}")
|
|
|
print(f"🔗 Expected services:")
|
|
|
print(f" • NER Service: {NER_URL}")
|
|
|
print(f" • OCR Service: {OCR_URL}")
|
|
|
print(f" • RAG Service: {RAG_URL}")
|
|
|
print(f" • Unified App: {UNIFIED_URL}")
|
|
|
|
|
|
print(f"\nMake sure the unified application is running before starting tests.")
|
|
|
print(f"Start command: python app.py")
|
|
|
|
|
|
|
|
|
input(f"\nPress Enter to start unified system tests...")
|
|
|
|
|
|
async with UnifiedSystemTester() as tester:
|
|
|
success = await tester.run_comprehensive_tests()
|
|
|
|
|
|
if success:
|
|
|
print(f"\n🏆 UNIFIED SYSTEM VERIFICATION COMPLETE!")
|
|
|
print(f"✅ All services are integrated and operational")
|
|
|
print(f"✅ Combined workflows are working perfectly")
|
|
|
print(f"✅ Ready for production deployment")
|
|
|
|
|
|
sys.exit(0)
|
|
|
else:
|
|
|
print(f"\n🔧 UNIFIED SYSTEM NEEDS ATTENTION")
|
|
|
print(f"❌ Some functionality is not working correctly")
|
|
|
print(f"📋 Review the test results above for specific issues")
|
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main()) |