from langchain_core.tools import tool, Tool import math @tool def calculator_tool(expression: str) -> str: """ Evaluate a mathematical expression. """ # Define the restricted global and local namespace safe_globals = {"__builtins__": {}} safe_locals = { # Math functions 'sqrt': math.sqrt, 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, 'log': math.log10, # log base 10 'ln': math.log, # natural log 'exp': math.exp, 'pow': pow, # Constants 'pi': math.pi, 'e': math.e, # Built-in math utilities 'abs': abs, 'round': round, 'max': max, 'min': min, 'sum': sum, } try: # Evaluate the expression in a restricted environment result = eval(expression, safe_globals, safe_locals) # Handle None explicitly if result is None: return "calculator tool produced no valid result" # Optional: Round very small floats to avoid scientific notation if isinstance(result, float) and abs(result) < 1e-9: result = round(result, 10) return str(result) except SyntaxError as se: return f"Syntax error in expression: {str(se)}" except NameError as ne: return f"Undefined variable or function used: {str(ne)}" except ZeroDivisionError: return "Error: Division by zero" except Exception as e: return f"Evaluation error: {str(e)}" from langchain_tavily import TavilySearch @tool def web_search(query: str) -> str: """ Searches the web and returns a list of the most relevant URLs. Use this FIRST for complex queries, metadata questions, or to find the right sources. Then follow up with get_webdoc_content or get_website_content on the most promising URL. """ try: tavily_search = TavilySearch( max_results=5, topic="general", search_depth="advanced", include_raw_content=False, # Just URLs and snippets ) results = tavily_search.invoke(query) # Format results to show URLs and brief descriptions web_search_results = "Search Results:\n" for i, result in enumerate(results["results"], 1): web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n" return web_search_results except Exception as e: return f"web_search tool error: {str(e)}" import os import tempfile import requests import easyocr from io import BytesIO from PIL import Image from openai import OpenAI @tool def query_image(query: str, source: str, need_ocr: bool = True, need_reasoning: bool = False) -> str: """Use ONLY to answer question about an image using a Vision Language Model. NOT used to perform image processing or other tasks EXCEPT asking question about an image. Args: query (str): The question about the image, e.g. how many persons are on the image? source (str): URL to the image need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise. need_ocr (bool): If True, also extract visible text from the image. Set to False otherwise. """ try: # OCR Extraction (optional) ocr_text = "" if need_ocr: try: # Download image from URL response = requests.get(source, stream=True, timeout=10) response.raise_for_status() # Load image into PIL image = Image.open(BytesIO(response.content)) # Save to temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmpfile: image.save(tmpfile, format=image.format) file_to_use = tmpfile.name # Perform OCR reader = easyocr.Reader(['en']) results = reader.readtext(file_to_use) ocr_text = "\n".join([res[1] for res in results]) ocr_text = f"\n\n[OCR Extracted Text]:\n{ocr_text}" except Exception as ocr_error: ocr_text = f"\n\n[OCR Error]: {str(ocr_error)}" finally: # Clean up temporary file if file_to_use and os.path.exists(file_to_use): os.unlink(file_to_use) # Query Vision Language Model client = OpenAI() if need_reasoning: model_name = "o4-mini" else: model_name = "gpt-4o-mini" response = client.chat.completions.create( model=model_name, messages=[ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "image_url", "image_url": {"url": source}}, ], } ], max_tokens=512, ) content = response.choices[0].message.content # Combine OCR and VLM output final_response = content if need_ocr and ocr_text: final_response += ocr_text return final_response except Exception as e: return f"Image query failed: {str(e)}" from pydantic import BaseModel, Field from e2b import Sandbox import re import os class PythonCodeInput(BaseModel): code: str = Field(description="The Python code string to execute.") @tool(args_schema=PythonCodeInput) def python_repl(code: str) -> str: """ Use this to execute single or multi-line Python commands to perform tasks like: sort a list in ascending or descending order, reverse input string, draw a table, photo processing, etc. Input should be syntactically valid Python code. Make sure to include required imports in the code. Always include in your code `print(...)` or `image.save(...)` to return outputs that can be seen. You are allowed to access internet and download files from URLs via code (e.g., using requests) Avoid using any system-level commands or libraries that could harm the host system. Avoid commands that require user input or block indefinitely (e.g., `input()`). """ # List of forbidden patterns in code FORBIDDEN_PATTERNS = [ r'\bimport\s+(os|sys|subprocess|shutil|socket)', r'\b(eval|exec|input|open)\s*$(?=.*\w)', r'\b__import__', r'\bos\.', r'\bsys\.', r'\bsubprocess\.', ] # Step 1: Keyword-based security check for pattern in FORBIDDEN_PATTERNS: if re.search(pattern, code): match = re.search(pattern, code).group() return f"Blocked unsafe operation: {match}" # Step 2: Create E2B sandbox try: with Sandbox(api_key=os.getenv("E2B_API_KEY")) as sandbox: # Known mismatches: import name -> pip package name import_to_pip = { "PIL": "pillow", "cv2": "opencv-python", "yaml": "PyYAML", "bs4": "beautifulsoup4", "tkinter": "tk", } # Built-in modules that don't need installation built_in_modules = { "math", "re", "json", "csv", "os", "sys", "time", "datetime", "random", "itertools", "functools", "__future__", "collections", "pathlib", "io", } # Step 1: Extract import statements import_matches = re.findall( r'(?:import\s+([a-zA-Z0-9_]+)(?!\.)|\bfrom\s+([a-zA-Z0-9_]+)(?=\s+import\b))', code ) base_imports = set() base_imports = set(match[0] or match[1] for match in import_matches) # match[0] = 'import X', match[1] = 'from X import Y' # Step 2: Determine which packages to install packages_to_install = set() for imp in base_imports: # Skip known built-ins if imp in built_in_modules: continue # Use mapped name if exists, else use import name package_name = import_to_pip.get(imp, imp) # Avoid installing system-specific modules like __pycache__ if imp.startswith("__"): continue packages_to_install.add(package_name) # Step 3: Install necessary packages if packages_to_install: install_cmd = f"pip install {' '.join(packages_to_install)}" result = sandbox.commands.run(install_cmd) if result.stderr: return f"Failed to install packages:\n{result.stderr}" # Step 4: Write and run the user code CODE_FILE_PATH = "/tmp/code.py" sandbox.files.write(CODE_FILE_PATH, code) # Step 5: Execute the code using the new API result = sandbox.commands.run(f"python {CODE_FILE_PATH}") stdout = result.stdout.strip() stderr = result.stderr.strip() # Step 6: Return output if stderr: return f"Execution error:\n{stderr}" return stdout or "No output" except Exception as e: return f"Sandbox error: {str(e)}" import requests from bs4 import BeautifulSoup from PyPDF2 import PdfReader from io import BytesIO from markdownify import markdownify @tool def get_webdoc_content(url: str) -> str: """ Extracts content from PDFs or document-like URLs (academic papers, reports) Can be used after web_search to get detailed information. Args: url (str): the URL of web page to extract the content from """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() content_type = response.headers.get('Content-Type', '') # PDF Handling if 'application/pdf' in content_type: pdf_file = BytesIO(response.content) reader = PdfReader(pdf_file) text = "\n".join(page.extract_text() for page in reader.pages) # return f"## PDF Content from {page_url}\n\n{text[:15000]}" return f"## PDF Content from {url}\n\n```\n{text[:15000]}\n```" # HTML Document Handling elif 'text/html' in content_type: soup = BeautifulSoup(response.text, 'html.parser') cleaned_html = soup.body or soup # Fallback to full document return markdownify(str(cleaned_html), strip=['a']) # Fallback: Raw text extraction else: return f"## Raw Content from {url}\n\n{response.text[:15000]}" except requests.exceptions.RequestException as e: return f"HTTP error in get_webpage_content: {str(e)}" except Exception as e: return f"Unexpected error in get_webpage_content: {str(e)}" import requests from bs4 import BeautifulSoup from markdownify import markdownify @tool def get_website_content(url: str) -> str: """ Extracts contents from HTML-based URLs. Specializes in Wikipedia, technical documentation, and discussion pages. NOT used for document-based URLs (academic papers, reports). Used after web_search to get detailed information. Args: url (str): The URL of the web page to extract content from """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Remove non-content elements for element in soup.select('script, style, footer, nav, header, aside'): element.decompose() # Convert cleaned HTML to markdown cleaned_html = str(soup.body) if soup.body else str(soup) markdown_content = markdownify(cleaned_html, strip=['a']) # Optional: strip links return f"## Extracted Content from {url}\n\n{markdown_content[:15000]}" # Limit length except requests.exceptions.RequestException as e: return f"HTTP error in web_content_extract: {str(e)}" except Exception as e: return f"Unexpected error in web_content_extract: {str(e)}" import os from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter @tool def extract_answer_from_content(content: str | dict, query: str) -> str: """ Extract relevant information from content based on user query. Args: content (str/dict): Raw text or transcribed test from audio or structured content from any source query (str): Natural language question to answer Returns: str: Concise answer extracted from content """ try: # Normalize content format if isinstance(content, dict): text_content = "" if "summary" in content: text_content += f"SUMMARY: {content['summary']}\n\n" if "infobox" in content: text_content += "INFOBOX:\n" for k, v in content["infobox"].items(): text_content += f"{k}: {v}\n" text_content += "\n" if "sections" in content: for section, text in content["sections"].items(): text_content += f"{section}:\n{text}\n\n" else: text_content = content # Initialize OpenAI embeddings embeddings = OpenAIEmbeddings( openai_api_key=os.getenv("OPENAI_API_KEY"), model="text-embedding-3-large" ) # Split content into manageable chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=100 ) chunks = text_splitter.split_text(text_content) # Create vector store vectorstore = FAISS.from_texts(chunks, embeddings) retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # Get most relevant content relevant_docs = retriever.invoke(query) combined_text = " ".join([doc.page_content for doc in relevant_docs]) # Return relevant content with context return f"Relevant information found:\n{combined_text[:1500]}" except Exception as e: return f"Content extraction failed: {str(e)}" import os import requests from openai import OpenAI @tool def transcribe_audio(source: str, file_extension: str) -> str: """ Transcribes an audio to text from local path or URL. Args: source (str): URL to an audio file. Returns: str: The transcribed text, or error message. """ # If file is not existing use download_file_from_url tool to download the file first. client = OpenAI() try: # download the audio file response = requests.get(source) response.raise_for_status() # write to disk file_extension = file_extension.replace('.','') with open(f'tmp.{file_extension}', 'wb') as file: file.write(response.content) audio_file = open(f'tmp.{file_extension}', "rb") client = OpenAI() transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_file ) return transcription.text except Exception as e: return f"Transcription error: {str(e)}" from youtube_transcript_api import YouTubeTranscriptApi from pytube import extract @tool def get_youtube_transcript(page_url: str) -> str: """Get the transcript of audio component of YouTube video. Use this for Youtube videos with available transcripts Args: page_url (str): YouTube URL of the video """ try: # Get video ID from URL video_id = extract.video_id(page_url) # Get transcript using correct method transcript = YouTubeTranscriptApi.get_transcript(video_id) # Return concatenated text return '\n'.join([s['text'] for s in transcript]) except Exception as e: return f"get_youtube_transcript failed: {str(e)}" from tabulate import tabulate from typing import Dict, Any, List @tool def generate_table_from_data(data: List[Dict[str, Any]]) -> str: """ Convert list of dictionaries to markdown table Args: data (List[Dict]): List of objects with common keys Returns: str: Markdown-formatted table """ if not data: return "No data available" headers = data[0].keys() rows = [list(item.values()) for item in data] return tabulate(rows, headers=headers, tablefmt="pipe") from pydantic import BaseModel, Field from typing import List, Dict class CommutativeCheckInput(BaseModel): table_str: str = Field(..., description="Markdown-formatted string of the operation table (e.g., |*|a|b|c|...)") elements: List[str] = Field(..., description="List of elements in the set S") @tool(args_schema=CommutativeCheckInput) def check_commutative(table_str: str, elements: List[str]) -> str: """ Analyzes a binary operation table for commutativity. Args: table_str (str): Markdown-formatted string of the operation table. elements (List[str]): List of elements in the set S. Returns: str: Comma-separated list of element pairs (e.g., "b,e") where x*y ≠ y*x. """ # Parse the table string into a 2D list lines = [line.strip() for line in table_str.strip().split('\n') if line.strip()] header = [cell.strip() for cell in lines[0].split('|') if cell.strip()][1:] # Skip the first cell (operator) rows = [] for line in lines[2:]: cells = [cell.strip() for cell in line.split('|') if cell.strip()] # Remove empty cells if cells: rows.append(cells) # Validate that all rows have the correct number of cells expected_length = len(header) + 1 # x + one for each header for row in rows: if len(row) < expected_length: return f"Error: Row '{row[0]}' has {len(row)} cells, but expected {expected_length}." # Build a dictionary for the operation: op[x][y] = result operation: Dict[str, Dict[str, str]] = {} for row in rows: x = row[0] operation[x] = {} for i, y in enumerate(header): operation[x][y] = row[i + 1] # Check all pairs (x, y) for x*y == y*x counterexamples = [] for x in elements: for y in elements: if x < y: # Avoid redundant checks and self-comparison try: xy = operation[x][y] yx = operation[y][x] if xy != yx: counterexamples.append(f"{x},{y}") except KeyError as e: return f"Error: Missing data for pair ({x}, {y}) in table." return "\n".join(counterexamples) if counterexamples else "The operation is commutative."