import json from concurrent.futures import ThreadPoolExecutor from typing import List, Union import requests from qwen_agent.tools.base import BaseTool, register_tool import asyncio from typing import Dict, List, Optional, Union import uuid import http.client import json import os SERPER_KEY=os.environ.get('SERPER_KEY_ID') @register_tool("search", allow_overwrite=True) class Search(BaseTool): name = "search" description = "Performs batched web searches: supply an array 'query'; the tool retrieves the top 10 results for each query in one call." parameters = { "type": "object", "properties": { "query": { "type": "array", "items": { "type": "string" }, "description": "Array of query strings. Include multiple complementary search queries in a single call." }, }, "required": ["query"], } def __init__(self, cfg: Optional[dict] = None): super().__init__(cfg) def google_search_with_serp(self, query: str): def contains_chinese_basic(text: str) -> bool: return any('\u4E00' <= char <= '\u9FFF' for char in text) conn = http.client.HTTPSConnection("google.serper.dev") if contains_chinese_basic(query): payload = json.dumps({ "q": query, "location": "China", "gl": "cn", "hl": "zh-cn" }) else: payload = json.dumps({ "q": query, "location": "United States", "gl": "us", "hl": "en" }) headers = { 'X-API-KEY': SERPER_KEY, 'Content-Type': 'application/json' } for i in range(5): try: conn.request("POST", "/search", payload, headers) res = conn.getresponse() break except Exception as e: print(e) if i == 4: return f"Google search Timeout, return None, Please try again later." continue data = res.read() results = json.loads(data.decode("utf-8")) print(results) try: if "organic" not in results: raise Exception(f"No results found for query: '{query}'. Use a less specific query.") web_snippets = list() idx = 0 if "organic" in results: for page in results["organic"]: idx += 1 date_published = "" if "date" in page: date_published = "\nDate published: " + page["date"] source = "" if "source" in page: source = "\nSource: " + page["source"] snippet = "" if "snippet" in page: snippet = "\n" + page["snippet"] redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{snippet}" redacted_version = redacted_version.replace("Your browser can't play this video.", "") web_snippets.append(redacted_version) content = f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n" + "\n\n".join(web_snippets) return content except Exception as e: print(e) return f"No results found for '{query}'. Try with a more general query." def search_with_serp(self, query: str): result = self.google_search_with_serp(query) return result def call(self, params: Union[str, dict], **kwargs) -> str: try: print(params) params = json.loads(params) print(params) query = params["query"] print("query:\n", query) except: return "[Search] Invalid request format: Input must be a JSON object containing 'query' field" if isinstance(query, str): # 单个查询 response = self.search_with_serp(query) else: # 多个查询 assert isinstance(query, List) responses = [] for q in query: responses.append(self.search_with_serp(q)) response = "\n=======\n".join(responses) return response