onurcopur commited on
Commit
e01c07b
·
1 Parent(s): be7846f

change dockerfile

Browse files
search_engines/__init__.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Search engines package for tattoo image discovery."""
2
+
3
+ from .base import BaseSearchEngine, ImageResult, SearchPlatform, SearchResult
4
+ from .pinterest import PinterestSearchEngine
5
+ from .manager import SearchEngineManager
6
+
7
+ __all__ = [
8
+ "BaseSearchEngine",
9
+ "ImageResult",
10
+ "SearchPlatform",
11
+ "SearchResult",
12
+ "PinterestSearchEngine",
13
+ "SearchEngineManager",
14
+ ]
search_engines/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (536 Bytes). View file
 
search_engines/__pycache__/base.cpython-312.pyc ADDED
Binary file (5.01 kB). View file
 
search_engines/__pycache__/instagram.cpython-312.pyc ADDED
Binary file (5.03 kB). View file
 
search_engines/__pycache__/manager.cpython-312.pyc ADDED
Binary file (8.65 kB). View file
 
search_engines/__pycache__/pinterest.cpython-312.pyc ADDED
Binary file (5.45 kB). View file
 
search_engines/__pycache__/reddit.cpython-312.pyc ADDED
Binary file (5.02 kB). View file
 
search_engines/base.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Base classes for image search engines."""
2
+
3
+ import logging
4
+ from abc import ABC, abstractmethod
5
+ from dataclasses import dataclass
6
+ from enum import Enum
7
+ from typing import Dict, List, Optional, Set
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class SearchPlatform(Enum):
13
+ """Supported search platforms."""
14
+ PINTEREST = "pinterest"
15
+ INSTAGRAM = "instagram"
16
+ REDDIT = "reddit"
17
+ FLICKR = "flickr"
18
+ DEVIANTART = "deviantart"
19
+ GENERAL = "general"
20
+
21
+
22
+ @dataclass
23
+ class ImageResult:
24
+ """Represents a single image search result."""
25
+ url: str
26
+ platform: SearchPlatform
27
+ quality_score: float = 0.0
28
+ width: Optional[int] = None
29
+ height: Optional[int] = None
30
+ title: Optional[str] = None
31
+ source_url: Optional[str] = None
32
+
33
+ @property
34
+ def resolution_score(self) -> float:
35
+ """Calculate score based on image resolution."""
36
+ if not self.width or not self.height:
37
+ return 0.5
38
+
39
+ total_pixels = self.width * self.height
40
+ if total_pixels >= 1000000: # 1MP+
41
+ return 1.0
42
+ elif total_pixels >= 500000: # 0.5MP+
43
+ return 0.8
44
+ elif total_pixels >= 250000: # 0.25MP+
45
+ return 0.6
46
+ else:
47
+ return 0.3
48
+
49
+
50
+ @dataclass
51
+ class SearchResult:
52
+ """Container for all search results from multiple platforms."""
53
+ images: List[ImageResult]
54
+ total_found: int
55
+ platforms_used: Set[SearchPlatform]
56
+ search_duration: float
57
+
58
+ def get_top_results(self, limit: int = 50) -> List[ImageResult]:
59
+ """Get top results sorted by quality score."""
60
+ sorted_images = sorted(self.images, key=lambda x: x.quality_score, reverse=True)
61
+ return sorted_images[:limit]
62
+
63
+
64
+ class BaseSearchEngine(ABC):
65
+ """Abstract base class for image search engines."""
66
+
67
+ def __init__(self, platform: SearchPlatform):
68
+ self.platform = platform
69
+ self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
70
+
71
+ @abstractmethod
72
+ def search(self, query: str, max_results: int = 20) -> List[ImageResult]:
73
+ """Search for images on the platform."""
74
+ pass
75
+
76
+ @abstractmethod
77
+ def is_valid_url(self, url: str) -> bool:
78
+ """Check if URL is valid for this platform."""
79
+ pass
80
+
81
+ def get_quality_score(self, url: str, **kwargs) -> float:
82
+ """Calculate quality score for a URL (0.0 to 1.0)."""
83
+ score = 0.5 # Base score
84
+
85
+ # URL length penalty (very long URLs often broken)
86
+ if len(url) > 500:
87
+ score -= 0.2
88
+ elif len(url) > 300:
89
+ score -= 0.1
90
+
91
+ # Image extension bonus
92
+ image_extensions = ['.jpg', '.jpeg', '.png', '.webp']
93
+ if any(ext in url.lower() for ext in image_extensions):
94
+ score += 0.1
95
+
96
+ return max(0.0, min(1.0, score))
search_engines/instagram.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Instagram-specific search engine implementation."""
2
+
3
+ import time
4
+ from typing import List
5
+
6
+ from ddgs import DDGS
7
+
8
+ from .base import BaseSearchEngine, ImageResult, SearchPlatform
9
+
10
+
11
+ class InstagramSearchEngine(BaseSearchEngine):
12
+ """Search engine for Instagram images."""
13
+
14
+ def __init__(self):
15
+ super().__init__(SearchPlatform.INSTAGRAM)
16
+ self.instagram_domains = {
17
+ "instagram.com",
18
+ "cdninstagram.com",
19
+ "scontent.cdninstagram.com",
20
+ "scontent-", # Instagram CDN prefix
21
+ }
22
+
23
+ def search(self, query: str, max_results: int = 20) -> List[ImageResult]:
24
+ """Search Instagram for tattoo images."""
25
+ results = []
26
+
27
+ # Instagram hashtag-based queries
28
+ instagram_queries = self._build_instagram_queries(query)
29
+
30
+ try:
31
+ with DDGS() as ddgs:
32
+ for i, instagram_query in enumerate(instagram_queries):
33
+ if i > 0:
34
+ time.sleep(2) # Instagram is more sensitive to rate limiting
35
+
36
+ try:
37
+ search_results = ddgs.images(
38
+ instagram_query,
39
+ region="wt-wt",
40
+ safesearch="off",
41
+ size="Medium",
42
+ max_results=max_results // len(instagram_queries)
43
+ )
44
+
45
+ for result in search_results:
46
+ url = result.get("image")
47
+ if url and self.is_valid_url(url):
48
+ image_result = self._create_image_result(url, result)
49
+ results.append(image_result)
50
+
51
+ if len(results) >= max_results:
52
+ break
53
+
54
+ except Exception as e:
55
+ self.logger.warning(f"Instagram query failed: {e}")
56
+ continue
57
+
58
+ except Exception as e:
59
+ self.logger.error(f"Instagram search failed: {e}")
60
+
61
+ return results[:max_results]
62
+
63
+ def is_valid_url(self, url: str) -> bool:
64
+ """Check if URL is from Instagram domains."""
65
+ return any(domain in url.lower() for domain in self.instagram_domains)
66
+
67
+ def get_quality_score(self, url: str, **kwargs) -> float:
68
+ """Calculate Instagram-specific quality score."""
69
+ score = super().get_quality_score(url)
70
+
71
+ # Instagram CDN URLs are generally reliable
72
+ if "cdninstagram.com" in url or "scontent" in url:
73
+ score += 0.15
74
+
75
+ # Instagram posts tend to be high quality
76
+ if "instagram.com/p/" in url:
77
+ score += 0.1
78
+
79
+ return min(1.0, score)
80
+
81
+ def _build_instagram_queries(self, query: str) -> List[str]:
82
+ """Build Instagram-specific search queries."""
83
+ queries = []
84
+
85
+ # General Instagram search
86
+ queries.append(f"site:instagram.com {query} tattoo")
87
+
88
+ # Hashtag-focused searches
89
+ hashtag_queries = [
90
+ f"site:instagram.com #{query.replace(' ', '')}tattoo",
91
+ f"site:instagram.com #tattoo {query}",
92
+ ]
93
+
94
+ queries.extend(hashtag_queries)
95
+ return queries
96
+
97
+ def _create_image_result(self, url: str, raw_result: dict) -> ImageResult:
98
+ """Create ImageResult from raw Instagram search result."""
99
+ return ImageResult(
100
+ url=url,
101
+ platform=self.platform,
102
+ quality_score=self.get_quality_score(url),
103
+ title=raw_result.get("title"),
104
+ source_url=raw_result.get("source")
105
+ )
search_engines/manager.py ADDED
@@ -0,0 +1,196 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Search engine manager for coordinating multi-platform searches."""
2
+
3
+ import asyncio
4
+ import time
5
+ from concurrent.futures import ThreadPoolExecutor, as_completed
6
+ from typing import Dict, List, Optional, Set
7
+
8
+ from .base import BaseSearchEngine, ImageResult, SearchPlatform, SearchResult
9
+ from .instagram import InstagramSearchEngine
10
+ from .pinterest import PinterestSearchEngine
11
+ from .reddit import RedditSearchEngine
12
+
13
+
14
+ class SearchEngineManager:
15
+ """Manages and coordinates searches across multiple platforms."""
16
+
17
+ def __init__(self, max_workers: int = 5):
18
+ self.max_workers = max_workers
19
+ self.engines: Dict[SearchPlatform, BaseSearchEngine] = {
20
+ SearchPlatform.PINTEREST: PinterestSearchEngine(),
21
+ SearchPlatform.REDDIT: RedditSearchEngine(),
22
+ SearchPlatform.INSTAGRAM: InstagramSearchEngine(),
23
+ }
24
+
25
+ def search_all_platforms(
26
+ self,
27
+ query: str,
28
+ max_results_per_platform: int = 20,
29
+ platforms: Optional[Set[SearchPlatform]] = None
30
+ ) -> SearchResult:
31
+ """Search across multiple platforms concurrently."""
32
+ start_time = time.time()
33
+
34
+ if platforms is None:
35
+ platforms = set(self.engines.keys())
36
+
37
+ all_results = []
38
+ platforms_used = set()
39
+
40
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
41
+ # Submit search tasks for each platform
42
+ future_to_platform = {
43
+ executor.submit(
44
+ self._search_single_platform,
45
+ platform,
46
+ query,
47
+ max_results_per_platform
48
+ ): platform
49
+ for platform in platforms
50
+ if platform in self.engines
51
+ }
52
+
53
+ # Collect results as they complete
54
+ for future in as_completed(future_to_platform):
55
+ platform = future_to_platform[future]
56
+ try:
57
+ platform_results = future.result(timeout=30) # 30s timeout per platform
58
+ if platform_results:
59
+ all_results.extend(platform_results)
60
+ platforms_used.add(platform)
61
+ except Exception as e:
62
+ print(f"Platform {platform.value} search failed: {e}")
63
+
64
+ # Remove duplicates and sort by quality
65
+ unique_results = self._deduplicate_results(all_results)
66
+ sorted_results = sorted(unique_results, key=lambda x: x.quality_score, reverse=True)
67
+
68
+ search_duration = time.time() - start_time
69
+
70
+ return SearchResult(
71
+ images=sorted_results,
72
+ total_found=len(sorted_results),
73
+ platforms_used=platforms_used,
74
+ search_duration=search_duration
75
+ )
76
+
77
+ def search_with_fallback(
78
+ self,
79
+ query: str,
80
+ max_results: int = 50,
81
+ min_results_threshold: int = 10
82
+ ) -> SearchResult:
83
+ """Search with intelligent fallback strategies."""
84
+ # Try primary platforms first
85
+ primary_platforms = {SearchPlatform.PINTEREST, SearchPlatform.REDDIT}
86
+ result = self.search_all_platforms(
87
+ query,
88
+ max_results_per_platform=max_results // 2,
89
+ platforms=primary_platforms
90
+ )
91
+
92
+ # If we don't have enough results, try additional platforms
93
+ if len(result.images) < min_results_threshold:
94
+ additional_platforms = {SearchPlatform.INSTAGRAM}
95
+ additional_result = self.search_all_platforms(
96
+ query,
97
+ max_results_per_platform=max_results // 2,
98
+ platforms=additional_platforms
99
+ )
100
+
101
+ # Merge results
102
+ all_images = result.images + additional_result.images
103
+ unique_images = self._deduplicate_results(all_images)
104
+ sorted_images = sorted(unique_images, key=lambda x: x.quality_score, reverse=True)
105
+
106
+ result = SearchResult(
107
+ images=sorted_images,
108
+ total_found=len(sorted_images),
109
+ platforms_used=result.platforms_used | additional_result.platforms_used,
110
+ search_duration=result.search_duration + additional_result.search_duration
111
+ )
112
+
113
+ # If still not enough, try simplified queries
114
+ if len(result.images) < min_results_threshold:
115
+ simplified_query = self._simplify_query(query)
116
+ if simplified_query != query:
117
+ fallback_result = self.search_all_platforms(
118
+ simplified_query,
119
+ max_results_per_platform=max_results // 3
120
+ )
121
+
122
+ # Merge with existing results
123
+ all_images = result.images + fallback_result.images
124
+ unique_images = self._deduplicate_results(all_images)
125
+ sorted_images = sorted(unique_images, key=lambda x: x.quality_score, reverse=True)
126
+
127
+ result = SearchResult(
128
+ images=sorted_images,
129
+ total_found=len(sorted_images),
130
+ platforms_used=result.platforms_used | fallback_result.platforms_used,
131
+ search_duration=result.search_duration + fallback_result.search_duration
132
+ )
133
+
134
+ return result
135
+
136
+ def _search_single_platform(
137
+ self,
138
+ platform: SearchPlatform,
139
+ query: str,
140
+ max_results: int
141
+ ) -> List[ImageResult]:
142
+ """Search a single platform (thread-safe)."""
143
+ engine = self.engines.get(platform)
144
+ if not engine:
145
+ return []
146
+
147
+ try:
148
+ return engine.search(query, max_results)
149
+ except Exception as e:
150
+ print(f"Error searching {platform.value}: {e}")
151
+ return []
152
+
153
+ def _deduplicate_results(self, results: List[ImageResult]) -> List[ImageResult]:
154
+ """Remove duplicate URLs while preserving the highest quality version."""
155
+ seen_urls = {}
156
+
157
+ for result in results:
158
+ if result.url in seen_urls:
159
+ # Keep the result with higher quality score
160
+ if result.quality_score > seen_urls[result.url].quality_score:
161
+ seen_urls[result.url] = result
162
+ else:
163
+ seen_urls[result.url] = result
164
+
165
+ return list(seen_urls.values())
166
+
167
+ def _simplify_query(self, query: str) -> str:
168
+ """Simplify query by removing complex terms and keeping core concepts."""
169
+ # Remove adjectives and keep main nouns
170
+ words = query.split()
171
+
172
+ # Common tattoo-related keywords to keep
173
+ core_keywords = {
174
+ 'tattoo', 'design', 'art', 'ink', 'traditional', 'realistic', 'geometric',
175
+ 'tribal', 'watercolor', 'minimalist', 'blackwork', 'dotwork',
176
+ 'dragon', 'flower', 'skull', 'rose', 'bird', 'lion', 'butterfly'
177
+ }
178
+
179
+ # Keep important words and first few words
180
+ simplified_words = []
181
+ for i, word in enumerate(words):
182
+ if i < 3 or word.lower() in core_keywords:
183
+ simplified_words.append(word)
184
+
185
+ simplified = ' '.join(simplified_words)
186
+ return simplified if simplified else 'tattoo art'
187
+
188
+ def get_platform_stats(self) -> Dict[str, Dict]:
189
+ """Get statistics about available platforms."""
190
+ stats = {}
191
+ for platform, engine in self.engines.items():
192
+ stats[platform.value] = {
193
+ 'available': True,
194
+ 'class': engine.__class__.__name__
195
+ }
196
+ return stats
search_engines/pinterest.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pinterest-specific search engine implementation."""
2
+
3
+ import re
4
+ import time
5
+ from typing import List, Optional
6
+
7
+ from ddgs import DDGS
8
+
9
+ from .base import BaseSearchEngine, ImageResult, SearchPlatform
10
+
11
+
12
+ class PinterestSearchEngine(BaseSearchEngine):
13
+ """Search engine for Pinterest images."""
14
+
15
+ def __init__(self):
16
+ super().__init__(SearchPlatform.PINTEREST)
17
+ self.pinterest_domains = {
18
+ "pinterest.com",
19
+ "pinimg.com",
20
+ "i.pinimg.com",
21
+ "media.pinimg.com",
22
+ "s-media-cache-ak0.pinimg.com"
23
+ }
24
+
25
+ def search(self, query: str, max_results: int = 20) -> List[ImageResult]:
26
+ """Search Pinterest for tattoo images."""
27
+ results = []
28
+
29
+ pinterest_queries = [
30
+ f"site:pinterest.com {query} tattoo",
31
+ f"site:pinterest.com tattoo {query}",
32
+ ]
33
+
34
+ try:
35
+ with DDGS() as ddgs:
36
+ for i, pinterest_query in enumerate(pinterest_queries):
37
+ if i > 0:
38
+ time.sleep(2) # Rate limiting
39
+
40
+ try:
41
+ search_results = ddgs.images(
42
+ pinterest_query,
43
+ region="wt-wt",
44
+ safesearch="off",
45
+ size="Medium",
46
+ max_results=max_results // 2
47
+ )
48
+
49
+ for result in search_results:
50
+ url = result.get("image")
51
+ if url and self.is_valid_url(url):
52
+ image_result = self._create_image_result(url, result)
53
+ results.append(image_result)
54
+
55
+ if len(results) >= max_results:
56
+ break
57
+
58
+ except Exception as e:
59
+ self.logger.warning(f"Pinterest query failed: {e}")
60
+ continue
61
+
62
+ except Exception as e:
63
+ self.logger.error(f"Pinterest search failed: {e}")
64
+
65
+ return results[:max_results]
66
+
67
+ def is_valid_url(self, url: str) -> bool:
68
+ """Check if URL is from Pinterest domains."""
69
+ return any(domain in url.lower() for domain in self.pinterest_domains)
70
+
71
+ def get_quality_score(self, url: str, **kwargs) -> float:
72
+ """Calculate Pinterest-specific quality score."""
73
+ score = super().get_quality_score(url)
74
+
75
+ # Pinterest size indicators (higher resolution = higher score)
76
+ size_patterns = {
77
+ "/736x/": 1.0,
78
+ "/564x/": 0.9,
79
+ "/474x/": 0.8,
80
+ "/236x/": 0.6
81
+ }
82
+
83
+ for pattern, bonus in size_patterns.items():
84
+ if pattern in url:
85
+ score = bonus
86
+ break
87
+
88
+ # Pinterest CDN reliability bonus
89
+ if "i.pinimg.com" in url:
90
+ score += 0.1
91
+
92
+ return min(1.0, score)
93
+
94
+ def _create_image_result(self, url: str, raw_result: dict) -> ImageResult:
95
+ """Create ImageResult from raw Pinterest search result."""
96
+ dimensions = self._extract_dimensions(url)
97
+
98
+ return ImageResult(
99
+ url=url,
100
+ platform=self.platform,
101
+ quality_score=self.get_quality_score(url),
102
+ width=dimensions.get("width"),
103
+ height=dimensions.get("height"),
104
+ title=raw_result.get("title"),
105
+ source_url=raw_result.get("source")
106
+ )
107
+
108
+ def _extract_dimensions(self, url: str) -> dict:
109
+ """Extract image dimensions from Pinterest URL patterns."""
110
+ # Pinterest URL pattern: .../236x/... or .../564x314/...
111
+ size_match = re.search(r"/(\d+)x(\d*)/", url)
112
+ if size_match:
113
+ width = int(size_match.group(1))
114
+ height = int(size_match.group(2)) if size_match.group(2) else None
115
+ return {"width": width, "height": height}
116
+
117
+ return {}
search_engines/reddit.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Reddit-specific search engine implementation."""
2
+
3
+ import time
4
+ from typing import List
5
+
6
+ from ddgs import DDGS
7
+
8
+ from .base import BaseSearchEngine, ImageResult, SearchPlatform
9
+
10
+
11
+ class RedditSearchEngine(BaseSearchEngine):
12
+ """Search engine for Reddit images."""
13
+
14
+ def __init__(self):
15
+ super().__init__(SearchPlatform.REDDIT)
16
+ self.reddit_domains = {
17
+ "reddit.com",
18
+ "i.redd.it",
19
+ "i.imgur.com",
20
+ "imgur.com"
21
+ }
22
+ self.tattoo_subreddits = [
23
+ "tattoos",
24
+ "tattoo",
25
+ "traditionaltattoos",
26
+ "blackwork",
27
+ "sticknpokes",
28
+ "tattoodesigns"
29
+ ]
30
+
31
+ def search(self, query: str, max_results: int = 20) -> List[ImageResult]:
32
+ """Search Reddit for tattoo images."""
33
+ results = []
34
+
35
+ # Create Reddit-specific queries
36
+ reddit_queries = self._build_reddit_queries(query)
37
+
38
+ try:
39
+ with DDGS() as ddgs:
40
+ for i, reddit_query in enumerate(reddit_queries):
41
+ if i > 0:
42
+ time.sleep(1.5) # Rate limiting
43
+
44
+ try:
45
+ search_results = ddgs.images(
46
+ reddit_query,
47
+ region="wt-wt",
48
+ safesearch="off",
49
+ size="Medium",
50
+ max_results=max_results // len(reddit_queries)
51
+ )
52
+
53
+ for result in search_results:
54
+ url = result.get("image")
55
+ if url and self.is_valid_url(url):
56
+ image_result = self._create_image_result(url, result)
57
+ results.append(image_result)
58
+
59
+ if len(results) >= max_results:
60
+ break
61
+
62
+ except Exception as e:
63
+ self.logger.warning(f"Reddit query failed: {e}")
64
+ continue
65
+
66
+ except Exception as e:
67
+ self.logger.error(f"Reddit search failed: {e}")
68
+
69
+ return results[:max_results]
70
+
71
+ def is_valid_url(self, url: str) -> bool:
72
+ """Check if URL is from Reddit or Reddit-linked domains."""
73
+ return any(domain in url.lower() for domain in self.reddit_domains)
74
+
75
+ def get_quality_score(self, url: str, **kwargs) -> float:
76
+ """Calculate Reddit-specific quality score."""
77
+ score = super().get_quality_score(url)
78
+
79
+ # i.redd.it is Reddit's native image host (reliable)
80
+ if "i.redd.it" in url:
81
+ score += 0.2
82
+
83
+ # Imgur is commonly used and reliable
84
+ elif "imgur.com" in url:
85
+ score += 0.1
86
+
87
+ # Reddit posts tend to be higher quality
88
+ if "reddit.com" in url:
89
+ score += 0.1
90
+
91
+ return min(1.0, score)
92
+
93
+ def _build_reddit_queries(self, query: str) -> List[str]:
94
+ """Build Reddit-specific search queries."""
95
+ queries = []
96
+
97
+ # General Reddit search
98
+ queries.append(f"site:reddit.com {query} tattoo")
99
+
100
+ # Subreddit-specific searches
101
+ for subreddit in self.tattoo_subreddits[:3]: # Limit to top 3 subreddits
102
+ queries.append(f"site:reddit.com/r/{subreddit} {query}")
103
+
104
+ return queries
105
+
106
+ def _create_image_result(self, url: str, raw_result: dict) -> ImageResult:
107
+ """Create ImageResult from raw Reddit search result."""
108
+ return ImageResult(
109
+ url=url,
110
+ platform=self.platform,
111
+ quality_score=self.get_quality_score(url),
112
+ title=raw_result.get("title"),
113
+ source_url=raw_result.get("source")
114
+ )
utils/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """Utilities package for the tattoo search engine."""
2
+
3
+ from .cache import SearchCache
4
+ from .url_validator import URLValidator
5
+
6
+ __all__ = ["URLValidator", "SearchCache"]
utils/__pycache__/__init__.cpython-312.pyc ADDED
Binary file (371 Bytes). View file
 
utils/__pycache__/cache.cpython-312.pyc ADDED
Binary file (5.47 kB). View file
 
utils/__pycache__/url_validator.cpython-312.pyc ADDED
Binary file (8.57 kB). View file
 
utils/cache.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Simple in-memory caching for search results."""
2
+
3
+ import hashlib
4
+ import time
5
+ from typing import Any, Dict, Optional, Tuple
6
+
7
+
8
+ class SearchCache:
9
+ """Simple in-memory cache for search results."""
10
+
11
+ def __init__(self, default_ttl: int = 3600, max_size: int = 1000):
12
+ """
13
+ Initialize cache.
14
+
15
+ Args:
16
+ default_ttl: Default time-to-live in seconds (1 hour)
17
+ max_size: Maximum number of cached items
18
+ """
19
+ self.default_ttl = default_ttl
20
+ self.max_size = max_size
21
+ self._cache: Dict[str, Tuple[Any, float]] = {} # key -> (value, expiry_time)
22
+
23
+ def get(self, key: str) -> Optional[Any]:
24
+ """Get value from cache if not expired."""
25
+ if key not in self._cache:
26
+ return None
27
+
28
+ value, expiry_time = self._cache[key]
29
+
30
+ # Check if expired
31
+ if time.time() > expiry_time:
32
+ del self._cache[key]
33
+ return None
34
+
35
+ return value
36
+
37
+ def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
38
+ """Set value in cache with optional TTL."""
39
+ if ttl is None:
40
+ ttl = self.default_ttl
41
+
42
+ expiry_time = time.time() + ttl
43
+
44
+ # Evict oldest entries if cache is full
45
+ if len(self._cache) >= self.max_size:
46
+ self._evict_expired()
47
+
48
+ # If still full, remove oldest entry
49
+ if len(self._cache) >= self.max_size:
50
+ oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
51
+ del self._cache[oldest_key]
52
+
53
+ self._cache[key] = (value, expiry_time)
54
+
55
+ def delete(self, key: str) -> bool:
56
+ """Delete key from cache. Returns True if key existed."""
57
+ return self._cache.pop(key, None) is not None
58
+
59
+ def clear(self) -> None:
60
+ """Clear all cached items."""
61
+ self._cache.clear()
62
+
63
+ def _evict_expired(self) -> None:
64
+ """Remove expired entries from cache."""
65
+ current_time = time.time()
66
+ expired_keys = [
67
+ key for key, (_, expiry_time) in self._cache.items()
68
+ if current_time > expiry_time
69
+ ]
70
+
71
+ for key in expired_keys:
72
+ del self._cache[key]
73
+
74
+ def get_stats(self) -> Dict[str, Any]:
75
+ """Get cache statistics."""
76
+ current_time = time.time()
77
+ expired_count = sum(
78
+ 1 for _, expiry_time in self._cache.values()
79
+ if current_time > expiry_time
80
+ )
81
+
82
+ return {
83
+ 'total_items': len(self._cache),
84
+ 'expired_items': expired_count,
85
+ 'active_items': len(self._cache) - expired_count,
86
+ 'max_size': self.max_size,
87
+ 'default_ttl': self.default_ttl
88
+ }
89
+
90
+ @staticmethod
91
+ def create_cache_key(query: str, max_results: int, platforms: Optional[set] = None) -> str:
92
+ """Create a cache key from search parameters."""
93
+ # Normalize query
94
+ normalized_query = query.lower().strip()
95
+
96
+ # Create a string representation of platforms
97
+ platform_str = ''
98
+ if platforms:
99
+ platform_str = '_'.join(sorted(p.value for p in platforms))
100
+
101
+ # Combine all parameters
102
+ key_string = f"{normalized_query}_{max_results}_{platform_str}"
103
+
104
+ # Hash to create a fixed-length key
105
+ return hashlib.md5(key_string.encode()).hexdigest()
utils/url_validator.py ADDED
@@ -0,0 +1,189 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """URL validation and health checking utilities."""
2
+
3
+ import logging
4
+ import random
5
+ import time
6
+ from concurrent.futures import ThreadPoolExecutor, as_completed
7
+ from typing import Dict, List, Optional, Set
8
+ from urllib.parse import urlparse
9
+
10
+ import requests
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+
15
+ class URLValidator:
16
+ """Validates and health-checks URLs before processing."""
17
+
18
+ def __init__(self, max_workers: int = 10, timeout: int = 10):
19
+ self.max_workers = max_workers
20
+ self.timeout = timeout
21
+ self.session = requests.Session()
22
+
23
+ # Blocked domains that consistently fail or are problematic
24
+ self.blocked_domains = {
25
+ 'bodyartguru.com',
26
+ 'dcassetcdn.com',
27
+ 'warvox.com',
28
+ 'jenkins-tpp.blackboard.com',
29
+ 'wrdsclassroom.wharton.upenn.edu',
30
+ }
31
+
32
+ # User agents for health checks
33
+ self.user_agents = [
34
+ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
35
+ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
36
+ 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
37
+ ]
38
+
39
+ def validate_urls(self, urls: List[str]) -> List[str]:
40
+ """Validate multiple URLs concurrently."""
41
+ if not urls:
42
+ return []
43
+
44
+ # First, filter out obviously bad URLs
45
+ pre_filtered = self._pre_filter_urls(urls)
46
+
47
+ if not pre_filtered:
48
+ return []
49
+
50
+ # Health check the remaining URLs
51
+ valid_urls = self._health_check_urls(pre_filtered)
52
+
53
+ logger.info(f"URL validation: {len(urls)} -> {len(pre_filtered)} -> {len(valid_urls)}")
54
+ return valid_urls
55
+
56
+ def _pre_filter_urls(self, urls: List[str]) -> List[str]:
57
+ """Pre-filter URLs based on basic criteria."""
58
+ filtered = []
59
+
60
+ for url in urls:
61
+ if not self._is_valid_url_format(url):
62
+ continue
63
+
64
+ if self._is_blocked_domain(url):
65
+ continue
66
+
67
+ if not self._has_image_extension(url):
68
+ continue
69
+
70
+ if len(url) > 500: # Skip very long URLs
71
+ continue
72
+
73
+ filtered.append(url)
74
+
75
+ return filtered
76
+
77
+ def _health_check_urls(self, urls: List[str]) -> List[str]:
78
+ """Perform HEAD requests to check URL accessibility."""
79
+ valid_urls = []
80
+
81
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
82
+ # Submit health check tasks
83
+ future_to_url = {
84
+ executor.submit(self._check_single_url, url): url
85
+ for url in urls
86
+ }
87
+
88
+ # Collect results
89
+ for future in as_completed(future_to_url):
90
+ url = future_to_url[future]
91
+ try:
92
+ is_valid = future.result(timeout=self.timeout + 5)
93
+ if is_valid:
94
+ valid_urls.append(url)
95
+ except Exception as e:
96
+ logger.debug(f"Health check failed for {url}: {e}")
97
+
98
+ # Small delay to be respectful
99
+ time.sleep(0.1)
100
+
101
+ return valid_urls
102
+
103
+ def _check_single_url(self, url: str) -> bool:
104
+ """Check if a single URL is accessible."""
105
+ try:
106
+ headers = {
107
+ 'User-Agent': random.choice(self.user_agents),
108
+ 'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
109
+ 'Accept-Language': 'en-US,en;q=0.9',
110
+ 'Connection': 'keep-alive',
111
+ 'DNT': '1',
112
+ }
113
+
114
+ # Add platform-specific headers
115
+ if 'pinterest' in url.lower():
116
+ headers.update({
117
+ 'Referer': 'https://www.pinterest.com/',
118
+ 'Origin': 'https://www.pinterest.com',
119
+ })
120
+ elif 'instagram' in url.lower():
121
+ headers.update({
122
+ 'Referer': 'https://www.instagram.com/',
123
+ })
124
+ else:
125
+ headers['Referer'] = 'https://www.google.com/'
126
+
127
+ response = self.session.head(
128
+ url,
129
+ headers=headers,
130
+ timeout=self.timeout,
131
+ allow_redirects=True
132
+ )
133
+
134
+ # Check status code
135
+ if response.status_code not in [200, 301, 302]:
136
+ return False
137
+
138
+ # Check content type if available
139
+ content_type = response.headers.get('content-type', '').lower()
140
+ if content_type and not content_type.startswith('image/'):
141
+ return False
142
+
143
+ # Check content length if available
144
+ content_length = response.headers.get('content-length')
145
+ if content_length:
146
+ size = int(content_length)
147
+ if size < 1024 or size > 10 * 1024 * 1024: # Too small or too large
148
+ return False
149
+
150
+ return True
151
+
152
+ except Exception as e:
153
+ logger.debug(f"URL check failed for {url}: {e}")
154
+ return False
155
+
156
+ def _is_valid_url_format(self, url: str) -> bool:
157
+ """Check if URL has valid format."""
158
+ try:
159
+ parsed = urlparse(url)
160
+ return all([parsed.scheme, parsed.netloc])
161
+ except Exception:
162
+ return False
163
+
164
+ def _is_blocked_domain(self, url: str) -> bool:
165
+ """Check if URL is from a blocked domain."""
166
+ try:
167
+ parsed = urlparse(url)
168
+ domain = parsed.netloc.lower()
169
+ return any(blocked in domain for blocked in self.blocked_domains)
170
+ except Exception:
171
+ return True # Block malformed URLs
172
+
173
+ def _has_image_extension(self, url: str) -> bool:
174
+ """Check if URL appears to point to an image."""
175
+ image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
176
+ url_lower = url.lower()
177
+ return any(ext in url_lower for ext in image_extensions)
178
+
179
+ def add_blocked_domain(self, domain: str) -> None:
180
+ """Add a domain to the blocked list."""
181
+ self.blocked_domains.add(domain.lower())
182
+
183
+ def remove_blocked_domain(self, domain: str) -> None:
184
+ """Remove a domain from the blocked list."""
185
+ self.blocked_domains.discard(domain.lower())
186
+
187
+ def get_blocked_domains(self) -> Set[str]:
188
+ """Get the set of blocked domains."""
189
+ return self.blocked_domains.copy()