SB-PoC / test.py
Chirapath's picture
First draft coding project
963ae98 verified
#!/usr/bin/env python3
"""
Comprehensive Test Suite for Unified AI Services
Tests the unified application and all integrated services (NER, OCR, RAG)
Combines functionality from test_rag.py and test_ner.py with new unified tests
"""
import asyncio
import httpx
import json
import io
import sys
import time
import tempfile
import os
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import uuid as python_uuid
# Import configuration
try:
from configs import get_config, validate_environment
config = get_config()
except ImportError:
print("⚠️ Could not import configs. Using default values.")
config = None
# Test configuration
UNIFIED_URL = "http://localhost:8000" # Main unified app
NER_URL = "http://localhost:8500" # Direct NER service
OCR_URL = "http://localhost:8400" # Direct OCR service
RAG_URL = "http://localhost:8401" # Direct RAG service
TEST_TIMEOUT = 300
# Test data (from original test files)
THAI_CYANIDE_MURDER_CASE = """
เหตุฆาตกรรมด้วยไซยาไนด์ พ.ศ. 2566
คดีฆาตกรรมต่อเนื่องที่สั่นสะเทือนสังคมไทย เกิดขึ้นระหว่างเดือนเมษายน-ตุลาคม พ.ศ. 2566
โดยมีนางสาวสาริณี ชัยวัฒน์ หรือ "แอม ไซยาไนด์" อายุ 36 ปี เป็นผู้ต้องหา
รายละเอียดคดี:
ผู้ต้องหาได้ทำการวางยาพิษไซยาไนด์ (Potassium Cyanide) ในอาหารและเครื่องดื่มของเหยื่อหลายราย
เหยื่อรายแรกคือ นางสิริพร บุญลาภวนิช อายุ 32 ปี เสียชีวิตเมื่อวันที่ 14 เมษายน 2566 ที่จังหวัดกาญจนบุรี
เหยื่อรายที่สอง นายสุรชัย อยู่คงคลัง อายุ 45 ปี เสียชีวิตเมื่อวันที่ 2 พฤษภาคม 2566 ที่จังหวัดราชบุรี
การสืบสวน:
ตำรวจภูธรภาค 7 ร่วมกับ สำนักงานตำรวจแห่งชาติ ทำการสืบสวน
พบหลักฐานจากกล้องวงจรปิด (CCTV) ในหลายพื้นที่
ตรวจพบสารไซยาไนด์ในร่างกายเหยื่อทุกราย
การจับกุม:
วันที่ 3 ตุลาคม 2566 ตำรวจจับกุมตัวผู้ต้องหาได้ที่โรงแรมเดอะ บายแซด ตั้งอยู่ที่ ถนนรามคำแหง กรุงเทพมหานคร
พบเอกสารปลอม บัตรประชาชนปลอม และวัตถุพยานสำคัญอื่นๆ
ยึดทรัพย์สินที่ได้จากการกระทำผิด มูลค่ารวมกว่า 2 ล้านบาท
"""
ENGLISH_CYBERSECURITY_CASE = """
Major Cybersecurity Incident Report - Operation Digital Shield
Incident Overview:
On October 15, 2024, CyberDefense Corp, a leading cybersecurity firm headquartered in Austin, Texas, detected a sophisticated Advanced Persistent Threat (APT) targeting critical infrastructure across Southeast Asia.
Key Personnel:
- Dr. Sarah Chen, Chief Security Officer at CyberDefense Corp
- Agent Michael Rodriguez, FBI Cyber Division
- Captain Lisa Thompson, US Cyber Command
Technical Details:
The attackers used a custom malware strain called "DeepStrike" developed by the Shadow Dragon group
Primary attack vector: spear-phishing emails containing weaponized PDF documents
Estimated financial damage: $50 million USD across affected organizations
"""
TEST_URLS = [
"https://httpbin.org/html",
"https://httpbin.org/json"
]
class TestResult:
"""Class to track test results"""
def __init__(self):
self.total_tests = 0
self.passed_tests = 0
self.failed_tests = 0
self.test_results = []
self.warnings = []
def add_result(self, test_name: str, passed: bool, message: str = "", details: Dict = None):
"""Add a test result"""
self.total_tests += 1
if passed:
self.passed_tests += 1
print(f"✅ {test_name}")
if message:
print(f" {message}")
else:
self.failed_tests += 1
print(f"❌ {test_name}: {message}")
self.test_results.append({
'test_name': test_name,
'passed': passed,
'message': message,
'details': details or {}
})
def add_warning(self, test_name: str, message: str):
"""Add a warning (doesn't count as pass/fail)"""
print(f"⚠️ {test_name}: {message}")
self.warnings.append({
'test_name': test_name,
'message': message
})
def print_summary(self):
"""Print test summary"""
print("\n" + "="*60)
print("UNIFIED SYSTEM TEST SUMMARY")
print("="*60)
print(f"Total Tests: {self.total_tests}")
print(f"Passed: {self.passed_tests}")
print(f"Failed: {self.failed_tests}")
print(f"Warnings: {len(self.warnings)}")
print(f"Success Rate: {(self.passed_tests/self.total_tests*100):.1f}%" if self.total_tests > 0 else "0%")
if self.failed_tests > 0:
print(f"\n❌ FAILED TESTS:")
for result in self.test_results:
if not result['passed']:
print(f" - {result['test_name']}: {result['message']}")
if self.warnings:
print(f"\n⚠️ WARNINGS:")
for warning in self.warnings:
print(f" - {warning['test_name']}: {warning['message']}")
class UnifiedSystemTester:
"""Main test class for unified system"""
def __init__(self):
self.result = TestResult()
self.session = None
self.created_documents = [] # Track for cleanup
self.created_analyses = [] # Track for cleanup
async def __aenter__(self):
self.session = httpx.AsyncClient(timeout=TEST_TIMEOUT)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.aclose()
async def make_request(self, method: str, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with error handling"""
try:
response = await self.session.request(method, url, **kwargs)
return response
except httpx.RequestError as e:
raise Exception(f"Request failed: {e}")
async def test_unified_app_health(self):
"""Test 1: Unified Application Health Check"""
print("🔍 Test 1: Unified Application Health Check")
try:
response = await self.make_request('GET', f"{UNIFIED_URL}/health")
if response.status_code == 200:
data = response.json()
status = data.get("status")
services = data.get("services", [])
healthy_services = [s for s in services if s.get("health")]
total_services = len(services)
if status in ["healthy", "degraded"] and healthy_services:
message = f"Status: {status}, Services: {len(healthy_services)}/{total_services} healthy"
for service in services:
service_status = "✅" if service.get("health") else "❌"
message += f"\n {service_status} {service.get('name')}: {service.get('status')} ({service.get('response_time', 0):.3f}s)"
self.result.add_result(
"Unified App Health Check",
True,
message,
data
)
return True
else:
self.result.add_result(
"Unified App Health Check",
False,
f"System unhealthy: {data}"
)
return False
else:
self.result.add_result(
"Unified App Health Check",
False,
f"HTTP {response.status_code}: {response.text}"
)
return False
except Exception as e:
# Provide detailed diagnostics for connection failures
if "connection" in str(e).lower():
print(f"\n🔍 Connection Diagnostics:")
print(f" Unified App URL: {UNIFIED_URL}")
print(f" Error: {e}")
print(f"\n💡 Possible Issues:")
print(f" 1. Unified app is not running")
print(f" 2. Wrong host/port in configuration")
print(f" 3. Services failed to start")
print(f"\n🚀 To Start Unified App:")
print(f" python app.py")
self.result.add_result(
"Unified App Health Check",
False,
str(e)
)
return False
async def test_individual_service_health(self):
"""Test 2: Individual Service Health Checks"""
print("🔍 Test 2: Individual Service Health Checks")
services = [
("NER", NER_URL),
("OCR", OCR_URL),
("RAG", RAG_URL)
]
all_healthy = True
service_statuses = {}
for service_name, service_url in services:
try:
response = await self.make_request('GET', f"{service_url}/health")
if response.status_code == 200:
data = response.json()
status = data.get("status", "unknown")
service_statuses[service_name] = {
"healthy": True,
"status": status,
"details": data
}
print(f" ✅ {service_name}: {status}")
else:
service_statuses[service_name] = {
"healthy": False,
"status": f"HTTP {response.status_code}",
"details": None
}
print(f" ❌ {service_name}: HTTP {response.status_code}")
all_healthy = False
except Exception as e:
service_statuses[service_name] = {
"healthy": False,
"status": f"Error: {e}",
"details": None
}
print(f" ❌ {service_name}: {e}")
all_healthy = False
self.result.add_result(
"Individual Service Health",
all_healthy,
f"Services healthy: {sum(1 for s in service_statuses.values() if s['healthy'])}/{len(services)}",
service_statuses
)
return all_healthy
async def test_unified_analysis_text(self):
"""Test 3: Unified Analysis with Text"""
print("🔍 Test 3: Unified Analysis with Text")
try:
request_data = {
"text": THAI_CYANIDE_MURDER_CASE,
"extract_relationships": True,
"include_embeddings": False,
"include_summary": True,
"generate_graph_files": True,
"export_formats": ["neo4j", "json"],
"enable_rag_indexing": True,
"rag_title": "Cyanide Murder Case Analysis",
"rag_keywords": ["cyanide", "murder", "investigation", "thai"],
"rag_metadata": {"test": True, "case_type": "criminal"}
}
response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data)
if response.status_code == 200:
data = response.json()
if data.get("success"):
service_calls = data.get("service_calls", [])
ner_analysis = data.get("ner_analysis", {})
rag_document = data.get("rag_document", {})
processing_time = data.get("processing_time", 0)
# Validate NER analysis
entities = ner_analysis.get("entities", [])
relationships = ner_analysis.get("relationships", [])
# Track analysis for cleanup
if ner_analysis.get("analysis_id"):
self.created_analyses.append(ner_analysis["analysis_id"])
if rag_document and rag_document.get("document_id"):
self.created_documents.append(rag_document["document_id"])
message = f"Service calls: {', '.join(service_calls)}"
message += f"\n Processing time: {processing_time:.2f}s"
message += f"\n NER entities: {len(entities)}"
message += f"\n NER relationships: {len(relationships)}"
if rag_document:
message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}"
message += f"\n RAG chunks: {rag_document.get('total_chunks', 0)}"
# Check if we got expected service calls
expected_calls = ["ner_text"]
if "enable_rag_indexing" in request_data and request_data["enable_rag_indexing"]:
expected_calls.append("rag_upload")
all_expected_calls = all(call in service_calls for call in expected_calls)
self.result.add_result(
"Unified Analysis (Text)",
all_expected_calls and entities and len(service_calls) > 0,
message,
data
)
return data
else:
self.result.add_result(
"Unified Analysis (Text)",
False,
data.get("error", "Analysis failed")
)
return None
else:
self.result.add_result(
"Unified Analysis (Text)",
False,
f"HTTP {response.status_code}: {response.text[:200]}"
)
return None
except Exception as e:
self.result.add_result(
"Unified Analysis (Text)",
False,
str(e)
)
return None
async def test_unified_analysis_url(self):
"""Test 4: Unified Analysis with URL"""
print("🔍 Test 4: Unified Analysis with URL")
try:
request_data = {
"url": "https://httpbin.org/html",
"extract_relationships": True,
"include_embeddings": False,
"include_summary": True,
"generate_graph_files": False,
"export_formats": ["json"],
"enable_rag_indexing": True,
"rag_title": "Test URL Document",
"rag_keywords": ["test", "url", "httpbin"],
"rag_metadata": {"test": True, "source": "httpbin"}
}
response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data)
if response.status_code == 200:
data = response.json()
if data.get("success"):
service_calls = data.get("service_calls", [])
ner_analysis = data.get("ner_analysis", {})
rag_document = data.get("rag_document", {})
# Track for cleanup
if ner_analysis.get("analysis_id"):
self.created_analyses.append(ner_analysis["analysis_id"])
if rag_document and rag_document.get("document_id"):
self.created_documents.append(rag_document["document_id"])
message = f"Service calls: {', '.join(service_calls)}"
message += f"\n NER analysis ID: {ner_analysis.get('analysis_id', 'N/A')}"
if rag_document:
message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}"
# Check for expected service calls
has_ner_url = "ner_url" in service_calls
has_rag_url = "rag_url" in service_calls
self.result.add_result(
"Unified Analysis (URL)",
has_ner_url and len(service_calls) > 0,
message,
data
)
return data
else:
self.result.add_result(
"Unified Analysis (URL)",
False,
data.get("error", "URL analysis failed")
)
return None
else:
self.result.add_result(
"Unified Analysis (URL)",
False,
f"HTTP {response.status_code}: {response.text[:200]}"
)
return None
except Exception as e:
self.result.add_result(
"Unified Analysis (URL)",
False,
str(e)
)
return None
async def test_combined_search(self):
"""Test 5: Combined Search with NER Analysis"""
print("🔍 Test 5: Combined Search with NER Analysis")
# Wait a moment for indexing to complete
await asyncio.sleep(2)
try:
request_data = {
"query": "investigation murder case",
"limit": 5,
"similarity_threshold": 0.1, # Lower threshold for better results
"include_ner_analysis": True,
"ner_export_formats": ["json"]
}
response = await self.make_request('POST', f"{UNIFIED_URL}/search/combined", json=request_data)
if response.status_code == 200:
data = response.json()
if data.get("success"):
service_calls = data.get("service_calls", [])
search_results = data.get("search_results", {})
results = search_results.get("results", [])
ner_analyses = search_results.get("ner_analyses", [])
message = f"Service calls: {', '.join(service_calls)}"
message += f"\n Search results: {len(results)}"
message += f"\n NER analyses: {len(ner_analyses)}"
message += f"\n Processing time: {data.get('processing_time', 0):.2f}s"
# Check for expected service calls
has_rag_search = "rag_search" in service_calls
has_ner_analysis = any("ner_text_" in call for call in service_calls)
success = has_rag_search and len(service_calls) > 0
if len(results) == 0:
self.result.add_warning(
"Combined Search",
"No search results found - may need more indexed content"
)
self.result.add_result(
"Combined Search",
success,
message,
data
)
return data
else:
self.result.add_result(
"Combined Search",
False,
data.get("error", "Search failed")
)
return None
else:
self.result.add_result(
"Combined Search",
False,
f"HTTP {response.status_code}: {response.text[:200]}"
)
return None
except Exception as e:
self.result.add_result(
"Combined Search",
False,
str(e)
)
return None
async def test_service_proxies(self):
"""Test 6: Service Proxy Endpoints"""
print("🔍 Test 6: Service Proxy Endpoints")
proxy_tests = []
# Test NER proxy
try:
ner_data = {
"text": "Test entity recognition with John Smith working at Microsoft in Seattle.",
"extract_relationships": True,
"include_embeddings": False,
"generate_graph_files": False
}
response = await self.make_request('POST', f"{UNIFIED_URL}/ner/analyze/text", json=ner_data)
if response.status_code == 200:
result = response.json()
if result.get("success"):
entities = result.get("entities", [])
proxy_tests.append(("NER Proxy", True, f"Found {len(entities)} entities"))
# Track for cleanup
if result.get("analysis_id"):
self.created_analyses.append(result["analysis_id"])
else:
proxy_tests.append(("NER Proxy", False, "Analysis failed"))
else:
proxy_tests.append(("NER Proxy", False, f"HTTP {response.status_code}"))
except Exception as e:
proxy_tests.append(("NER Proxy", False, str(e)))
# Test OCR proxy
try:
response = await self.make_request('GET', f"{UNIFIED_URL}/ocr/health")
if response.status_code == 200:
proxy_tests.append(("OCR Proxy", True, "Health check passed"))
else:
proxy_tests.append(("OCR Proxy", False, f"HTTP {response.status_code}"))
except Exception as e:
proxy_tests.append(("OCR Proxy", False, str(e)))
# Test RAG proxy
try:
response = await self.make_request('GET', f"{UNIFIED_URL}/rag/documents?limit=5")
if response.status_code == 200:
result = response.json()
documents = result.get("documents", [])
proxy_tests.append(("RAG Proxy", True, f"Found {len(documents)} documents"))
else:
proxy_tests.append(("RAG Proxy", False, f"HTTP {response.status_code}"))
except Exception as e:
proxy_tests.append(("RAG Proxy", False, str(e)))
# Evaluate proxy tests
passed_proxies = sum(1 for _, passed, _ in proxy_tests if passed)
total_proxies = len(proxy_tests)
for test_name, passed, message in proxy_tests:
print(f" {'✅' if passed else '❌'} {test_name}: {message}")
self.result.add_result(
"Service Proxies",
passed_proxies == total_proxies,
f"Proxies working: {passed_proxies}/{total_proxies}",
{"proxy_results": proxy_tests}
)
return passed_proxies > 0
async def test_file_upload_unified(self):
"""Test 7: File Upload through Unified Interface"""
print("🔍 Test 7: File Upload through Unified Interface")
try:
# Create test document
test_content = """
Technical Report: Advanced AI Systems
This report examines the integration of Named Entity Recognition (NER),
Optical Character Recognition (OCR), and Retrieval-Augmented Generation (RAG)
systems in a unified architecture.
Key Personnel:
- Dr. Alice Johnson, Lead AI Researcher at TechCorp
- Prof. Bob Smith, University of Technology
- Sarah Wilson, Data Scientist
Technical Components:
- Azure OpenAI for embeddings and language processing
- PostgreSQL with vector extensions for data storage
- FastAPI for microservice architecture
The system processes documents through multiple stages:
1. OCR extraction for scanned documents
2. NER analysis for entity and relationship extraction
3. RAG indexing for searchable knowledge base
Testing conducted on October 15, 2024 showed 95% accuracy.
Total budget: $250,000 for the complete implementation.
"""
# Test through NER proxy (file upload)
file_content = test_content.encode('utf-8')
files = {"file": ("test_report.txt", io.BytesIO(file_content), "text/plain")}
data = {
"extract_relationships": "true",
"include_embeddings": "false",
"include_summary": "true",
"generate_graph_files": "true",
"export_formats": "neo4j,json"
}
response = await self.make_request(
'POST',
f"{UNIFIED_URL}/ner/analyze/file",
files=files,
data=data
)
if response.status_code == 200:
result = response.json()
if result.get("success"):
entities = result.get("entities", [])
relationships = result.get("relationships", [])
# Track for cleanup
if result.get("analysis_id"):
self.created_analyses.append(result["analysis_id"])
message = f"File processed successfully"
message += f"\n Entities: {len(entities)}"
message += f"\n Relationships: {len(relationships)}"
message += f"\n Language: {result.get('language', 'unknown')}"
# Look for expected entities
person_entities = [e for e in entities if e.get('label') == 'PERSON']
org_entities = [e for e in entities if e.get('label') == 'ORGANIZATION']
money_entities = [e for e in entities if e.get('label') == 'MONEY']
message += f"\n People found: {len(person_entities)}"
message += f"\n Organizations found: {len(org_entities)}"
message += f"\n Money amounts found: {len(money_entities)}"
success = len(entities) > 0 and result.get("analysis_id")
self.result.add_result(
"File Upload (Unified)",
success,
message,
result
)
return result
else:
self.result.add_result(
"File Upload (Unified)",
False,
result.get("error", "File analysis failed")
)
return None
else:
self.result.add_result(
"File Upload (Unified)",
False,
f"HTTP {response.status_code}: {response.text[:200]}"
)
return None
except Exception as e:
self.result.add_result(
"File Upload (Unified)",
False,
str(e)
)
return None
async def test_service_discovery(self):
"""Test 8: Service Discovery and Listing"""
print("🔍 Test 8: Service Discovery and Listing")
try:
response = await self.make_request('GET', f"{UNIFIED_URL}/services")
if response.status_code == 200:
data = response.json()
services = data.get("services", {})
unified = data.get("unified", {})
expected_services = ["ner", "ocr", "rag"]
found_services = list(services.keys())
message = f"Services discovered: {', '.join(found_services)}"
message += f"\n Unified endpoint: {unified.get('url', 'N/A')}"
for service_name, service_info in services.items():
endpoints = service_info.get("endpoints", [])
message += f"\n {service_name}: {len(endpoints)} endpoints"
all_expected_found = all(service in found_services for service in expected_services)
self.result.add_result(
"Service Discovery",
all_expected_found,
message,
data
)
return data
else:
self.result.add_result(
"Service Discovery",
False,
f"HTTP {response.status_code}"
)
return None
except Exception as e:
self.result.add_result(
"Service Discovery",
False,
str(e)
)
return None
async def test_system_performance(self):
"""Test 9: System Performance and Reliability"""
print("🔍 Test 9: System Performance and Reliability")
try:
# Test multiple concurrent requests
tasks = []
test_texts = [
"Performance test with Apple Inc and CEO Tim Cook in California.",
"Reliability testing of Microsoft Azure services in Seattle.",
"Load testing with Google Cloud Platform and AI systems."
]
start_time = time.time()
for i, text in enumerate(test_texts):
task = self.make_request(
'POST',
f"{UNIFIED_URL}/ner/analyze/text",
json={
"text": text,
"extract_relationships": True,
"include_embeddings": False,
"generate_graph_files": False
}
)
tasks.append(task)
# Execute concurrent requests
responses = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# Analyze results
successful_requests = 0
total_entities = 0
for i, response in enumerate(responses):
if isinstance(response, Exception):
continue
if response.status_code == 200:
result = response.json()
if result.get("success"):
successful_requests += 1
entities = result.get("entities", [])
total_entities += len(entities)
# Track for cleanup
if result.get("analysis_id"):
self.created_analyses.append(result["analysis_id"])
avg_time_per_request = total_time / len(test_texts)
message = f"Concurrent requests: {successful_requests}/{len(test_texts)} successful"
message += f"\n Total time: {total_time:.2f}s"
message += f"\n Avg time per request: {avg_time_per_request:.2f}s"
message += f"\n Total entities found: {total_entities}"
# Performance criteria
performance_ok = (
successful_requests >= len(test_texts) * 0.8 and # 80% success rate
avg_time_per_request < 10.0 # Under 10 seconds per request
)
self.result.add_result(
"System Performance",
performance_ok,
message,
{
"successful_requests": successful_requests,
"total_requests": len(test_texts),
"total_time": total_time,
"avg_time_per_request": avg_time_per_request,
"total_entities": total_entities
}
)
return performance_ok
except Exception as e:
self.result.add_result(
"System Performance",
False,
str(e)
)
return False
async def test_error_handling(self):
"""Test 10: Error Handling and Resilience"""
print("🔍 Test 10: Error Handling and Resilience")
error_tests = []
# Test 1: Invalid unified analysis request
try:
response = await self.make_request(
'POST',
f"{UNIFIED_URL}/analyze/unified",
json={"invalid": "data"}
)
if response.status_code in [400, 422]: # Expected validation error
error_tests.append(("Invalid Request Handling", True, "Properly rejected invalid data"))
else:
error_tests.append(("Invalid Request Handling", False, f"Unexpected status: {response.status_code}"))
except Exception as e:
error_tests.append(("Invalid Request Handling", False, str(e)))
# Test 2: Empty text analysis
try:
response = await self.make_request(
'POST',
f"{UNIFIED_URL}/ner/analyze/text",
json={"text": "", "extract_relationships": True}
)
if response.status_code in [400, 422]: # Expected validation error
error_tests.append(("Empty Text Handling", True, "Properly rejected empty text"))
else:
result = response.json()
if not result.get("success"):
error_tests.append(("Empty Text Handling", True, "Failed gracefully"))
else:
error_tests.append(("Empty Text Handling", False, "Should have failed"))
except Exception as e:
error_tests.append(("Empty Text Handling", False, str(e)))
# Test 3: Invalid URL
try:
response = await self.make_request(
'POST',
f"{UNIFIED_URL}/analyze/unified",
json={
"url": "https://invalid-url-that-does-not-exist-12345.com",
"extract_relationships": True
}
)
if response.status_code == 200:
result = response.json()
if not result.get("success"):
error_tests.append(("Invalid URL Handling", True, "Failed gracefully with invalid URL"))
else:
error_tests.append(("Invalid URL Handling", False, "Should have failed"))
else:
error_tests.append(("Invalid URL Handling", True, f"Rejected invalid URL (HTTP {response.status_code})"))
except Exception as e:
error_tests.append(("Invalid URL Handling", False, str(e)))
# Evaluate error handling tests
passed_error_tests = sum(1 for _, passed, _ in error_tests if passed)
total_error_tests = len(error_tests)
for test_name, passed, message in error_tests:
print(f" {'✅' if passed else '❌'} {test_name}: {message}")
self.result.add_result(
"Error Handling",
passed_error_tests >= total_error_tests * 0.8, # 80% success rate
f"Error tests passed: {passed_error_tests}/{total_error_tests}",
{"error_test_results": error_tests}
)
return passed_error_tests > 0
async def cleanup_test_data(self):
"""Clean up test data"""
print("\n🧹 Cleaning up test data...")
cleanup_count = 0
cleanup_errors = 0
# Clean up NER analyses
for analysis_id in self.created_analyses:
try:
# Try direct service first
response = await self.make_request('DELETE', f"{NER_URL}/analysis/{analysis_id}")
if response.status_code in [200, 404]: # 404 is OK (already deleted)
cleanup_count += 1
else:
cleanup_errors += 1
except Exception as e:
cleanup_errors += 1
print(f" ⚠️ Failed to cleanup analysis {analysis_id[:8]}...: {e}")
# Clean up RAG documents
for document_id in self.created_documents:
try:
# Try through unified proxy
response = await self.make_request('DELETE', f"{UNIFIED_URL}/rag/documents/{document_id}")
if response.status_code in [200, 404]: # 404 is OK (already deleted)
cleanup_count += 1
else:
cleanup_errors += 1
except Exception as e:
cleanup_errors += 1
print(f" ⚠️ Failed to cleanup document {document_id[:8]}...: {e}")
if cleanup_count > 0:
print(f" ✅ Cleaned up {cleanup_count} test items")
if cleanup_errors > 0:
print(f" ⚠️ Failed to cleanup {cleanup_errors} items")
async def run_comprehensive_tests(self):
"""Run all comprehensive unified system tests"""
print("🚀 Unified AI Services - Comprehensive Test Suite")
print("Testing: NER + OCR + RAG Integration with Unified Workflows")
print("=" * 80)
start_time = time.time()
# Test sequence
tests = [
("Unified App Health", self.test_unified_app_health),
("Individual Service Health", self.test_individual_service_health),
("Unified Analysis (Text)", self.test_unified_analysis_text),
("Unified Analysis (URL)", self.test_unified_analysis_url),
("Combined Search", self.test_combined_search),
("Service Proxies", self.test_service_proxies),
("File Upload (Unified)", self.test_file_upload_unified),
("Service Discovery", self.test_service_discovery),
("System Performance", self.test_system_performance),
("Error Handling", self.test_error_handling)
]
for test_name, test_func in tests:
print(f"\n" + "=" * 80)
try:
await test_func()
except Exception as e:
print(f"❌ {test_name} failed with exception: {e}")
self.result.add_result(test_name, False, f"Exception: {e}")
# Cleanup
print(f"\n" + "=" * 80)
await self.cleanup_test_data()
# Final summary
total_time = time.time() - start_time
print(f"\n" + "=" * 80)
print("📊 UNIFIED SYSTEM COMPREHENSIVE TEST RESULTS")
print("=" * 80)
self.result.print_summary()
print(f"\nTEST EXECUTION:")
print(f"Total Time: {total_time:.2f} seconds")
print(f"Tests Created: NER analyses: {len(self.created_analyses)}, RAG documents: {len(self.created_documents)}")
passed = self.result.passed_tests
total = self.result.total_tests
if passed == total:
print(f"\n🎉 ALL UNIFIED SYSTEM TESTS PASSED!")
print(f"✅ Unified application is fully operational")
print(f"✅ All services are integrated and working")
print(f"✅ Combined workflows are functional")
print(f"✅ Service proxies are working")
print(f"✅ Error handling is robust")
print(f"\n🎯 UNIFIED SYSTEM CAPABILITIES VERIFIED:")
print(f" • NER + OCR + RAG service integration")
print(f" • Unified analysis workflows")
print(f" • Combined search with NER enhancement")
print(f" • Service proxy functionality")
print(f" • Multi-language support")
print(f" • Concurrent request handling")
print(f" • Comprehensive error handling")
print(f" • Real-time service health monitoring")
else:
print(f"\n⚠️ SOME UNIFIED SYSTEM TESTS FAILED")
print(f"❌ {self.result.failed_tests} out of {total} tests failed")
print(f"\n🔧 TROUBLESHOOTING STEPS:")
print(f"1. Check that all services are running:")
print(f" • NER Service: {NER_URL}/health")
print(f" • OCR Service: {OCR_URL}/health")
print(f" • RAG Service: {RAG_URL}/health")
print(f" • Unified App: {UNIFIED_URL}/health")
print(f"2. Verify configuration in .env file")
print(f"3. Check service logs for errors")
print(f"4. Ensure all dependencies are installed")
print(f"5. Verify database connectivity")
return passed == total
async def main():
"""Main test runner"""
if len(sys.argv) > 1:
unified_url = sys.argv[1]
else:
unified_url = UNIFIED_URL
# Update global URL
global UNIFIED_URL
UNIFIED_URL = unified_url
print(f"🧪 Unified AI Services - Comprehensive Test Suite")
print(f"📡 Testing unified system at: {UNIFIED_URL}")
print(f"🔗 Expected services:")
print(f" • NER Service: {NER_URL}")
print(f" • OCR Service: {OCR_URL}")
print(f" • RAG Service: {RAG_URL}")
print(f" • Unified App: {UNIFIED_URL}")
print(f"\nMake sure the unified application is running before starting tests.")
print(f"Start command: python app.py")
# Wait for user confirmation
input(f"\nPress Enter to start unified system tests...")
async with UnifiedSystemTester() as tester:
success = await tester.run_comprehensive_tests()
if success:
print(f"\n🏆 UNIFIED SYSTEM VERIFICATION COMPLETE!")
print(f"✅ All services are integrated and operational")
print(f"✅ Combined workflows are working perfectly")
print(f"✅ Ready for production deployment")
sys.exit(0)
else:
print(f"\n🔧 UNIFIED SYSTEM NEEDS ATTENTION")
print(f"❌ Some functionality is not working correctly")
print(f"📋 Review the test results above for specific issues")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())