Spaces:
Running
Running
| from mcp.server.fastmcp import FastMCP | |
| import time | |
| from litellm import completion | |
| import os | |
| import glob | |
| import http.client | |
| import json | |
| import openpyxl | |
| import shutil | |
| from google import genai | |
| import pexpect | |
| client = genai.Client(api_key="AIzaSyDtP05TyoIy9j0uPL7_wLEhgQEE75AZQSc") | |
| source_dir = "/app/uploads/temp" | |
| destination_dir = "/app/code_interpreter" | |
| files_list=[] | |
| downloaded_files=[] | |
| from openai import OpenAI | |
| clienty = OpenAI(api_key="xyz", base_url="https://akiko19191-backend.hf.space/") | |
| mcp = FastMCP("code_sandbox") | |
| data={} | |
| result="" | |
| import requests | |
| import os | |
| from bs4 import BeautifulSoup # For parsing HTML | |
| Parent=pexpect.spawn('bash') | |
| def transfer_files(): | |
| try: | |
| for item in os.listdir(source_dir): | |
| item_path = os.path.join(source_dir, item) | |
| if os.path.isdir(item_path): # Check if it's a directory | |
| for filename in os.listdir(item_path): | |
| source_file_path = os.path.join(item_path, filename) | |
| destination_file_path = os.path.join(destination_dir, filename) | |
| if not os.path.exists(destination_file_path): | |
| shutil.move(source_file_path, destination_file_path) | |
| except: | |
| pass | |
| def transfer_files2(): | |
| try: | |
| for item in os.listdir("/app/uploads"): | |
| if "temp" not in item: | |
| item_path = os.path.join(source_dir, item) | |
| if os.path.isdir(item_path): # Check if it's a directory | |
| for filename in os.listdir(item_path): | |
| source_file_path = os.path.join(item_path, filename) | |
| destination_file_path = os.path.join(destination_dir, filename.split("__")[1]) | |
| if not os.path.exists(destination_file_path): | |
| shutil.move(source_file_path, destination_file_path) | |
| except: | |
| pass | |
| def upload_file(file_path, upload_url): | |
| """Uploads a file to the specified server endpoint.""" | |
| try: | |
| # Check if the file exists | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| # Prepare the file for upload | |
| with open(file_path, "rb") as file: | |
| files = {"file": (os.path.basename(file_path), file)} # Important: Provide filename | |
| # Send the POST request | |
| response = requests.post(upload_url, files=files) | |
| # Check the response status code | |
| response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
| # Parse and print the response | |
| if response.status_code == 200: | |
| print(f"File uploaded successfully. Filename returned by server: {response.text}") | |
| return response.text # Return the filename returned by the server | |
| else: | |
| print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}") | |
| return None | |
| except FileNotFoundError as e: | |
| print(e) | |
| return None # or re-raise the exception if you want the program to halt | |
| except requests.exceptions.RequestException as e: | |
| print(f"Upload failed. Network error: {e}") | |
| return None | |
| TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0" | |
| chat_id = "5075390513" | |
| from requests_futures.sessions import FuturesSession | |
| session = FuturesSession() | |
| def run(cmd, timeout_sec,forever_cmd): | |
| global Parent | |
| if forever_cmd == 'true': | |
| Parent.close() | |
| Parent = pexpect.spawn("bash") | |
| command="cd /app/code_interpreter/ && "+cmd | |
| Parent.sendline(command) | |
| Parent.readline().decode() | |
| return str(Parent.readline().decode()) | |
| t=time.time() | |
| child = pexpect.spawn("bash") | |
| output="" | |
| command="cd /app/code_interpreter/ && "+cmd | |
| child.sendline('PROMPT_COMMAND="echo END"') | |
| child.readline().decode() | |
| child.readline().decode() | |
| child.sendline(command) | |
| while (not child.eof() ) and (time.time()-t<timeout_sec): | |
| x=child.readline().decode() | |
| output=output+x | |
| print(x) | |
| if "END" in x : | |
| output=output.replace("END","") | |
| child.close() | |
| break | |
| if "true" in forever_cmd: | |
| break | |
| return output | |
| def analyse_audio(audiopath,query) -> dict: | |
| """Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory.""" | |
| transfer_files2() | |
| myfile = client.files.upload(file=audiopath) | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash', | |
| contents=[query, myfile] | |
| ) | |
| return {"Output":str(response.text)} | |
| def analyse_video(videopath,query) -> dict: | |
| """Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory.""" | |
| transfer_files2() | |
| video_file = client.files.upload(file=videopath) | |
| while video_file.state.name == "PROCESSING": | |
| print('.', end='') | |
| time.sleep(1) | |
| video_file = client.files.get(name=video_file.name) | |
| if video_file.state.name == "FAILED": | |
| raise ValueError(video_file.state.name) | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash', | |
| contents=[query, video_file] | |
| ) | |
| return {"Output":str(response.text)} | |
| def analyse_images(imagepath,query) -> dict: | |
| """Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
| transfer_files2() | |
| video_file = client.files.upload(file=imagepath) | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash', | |
| contents=[query, video_file] | |
| ) | |
| return {"Output":str(response.text)} | |
| # @mcp.tool() | |
| # def generate_images(imagepath,query) -> dict: | |
| # """Ask another AI model to generate images based on the query and the image path.Set image path as an empty string , if you dont want to edit images , but rather generate images.Eg-query:Generate a cartoon version of this image,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
| # transfer_files2() | |
| # video_file = client.files.upload(file=imagepath) | |
| # response = client.models.generate_content( | |
| # model='gemini-2.0-flash', | |
| # contents=[query, video_file] | |
| # ) | |
| # return {"Output":str(response.text)} | |
| def create_code_files(filename: str, code) -> dict: | |
| """Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory.""" | |
| global destination_dir | |
| transfer_files() | |
| transfer_files2() | |
| if not os.path.exists(os.path.join(destination_dir, filename)): | |
| if isinstance(code, dict): | |
| with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: | |
| json.dump(code, f, ensure_ascii=False, indent=4) | |
| else: | |
| f = open(os.path.join(destination_dir, filename), "w") | |
| f.write(str(code)) | |
| f.close() | |
| return {"info":"The referenced code files were created successfully."} | |
| else: | |
| if isinstance(code, dict): | |
| with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: | |
| json.dump(code, f, ensure_ascii=False, indent=4) | |
| else: | |
| f = open(os.path.join(destination_dir, filename), "w") | |
| f.write(str(code)) | |
| f.close() | |
| return {"info":"The referenced code files were created successfully."} | |
| # return {"info":"The referenced code files already exist. Please rename the file or delete the existing one."} | |
| def run_code(language:str,packages:str,filename: str, code: str,start_cmd:str,forever_cmd:str) -> dict: | |
| """ | |
| Execute code in a controlled environment with package installation and file handling. | |
| Args: | |
| language:Programming language of the code (eg:"python", "nodejs", "bash","html",etc). | |
| packages: Space-separated list of packages to install.(python packages are installed if language set to python and npm packages are installed if language set to nodejs). | |
| Preinstalled python packages: gradio, XlsxWriter, openpyxl , mpxj , jpype1. | |
| Preinstalled npm packages: express, ejs, chart.js. | |
| filename:Name of the file to create (stored in /app/code_interpreter/). | |
| code:Full code to write to the file. | |
| start_cmd:Command to execute the file (e.g., "python /app/code_interpreter/app.py" | |
| or "bash /app/code_interpreter/app.py"). | |
| Leave blank ('') if only file creation is needed / start_cmd not required. | |
| forever_cmd:If 'true', the command will run indefinitely.Set to 'true', when runnig a website/server.Run all servers/website on port 1337. If 'false', the command will time out after 300 second and the result will be returned. | |
| Notes: | |
| - All user-uploaded files are in /app/code_interpreter/. | |
| - After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
| - bash/apk packages cannot be installed. | |
| - When editing and subsequently re-executing the server with the forever_cmd='true' setting, the previous server instance will be automatically terminated, and the updated server will commence operation. This functionality negates the requirement for manual process termination commands such as pkill node. | |
| - The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ (ONLY if the website is running successfully) | |
| - Do not use `plt.show()` in this headless environment. Save visualizations directly (e.g., `plt.savefig("happiness_img.png")` or export GIFs/videos). | |
| """ | |
| global destination_dir | |
| package_names = packages.strip() | |
| if "python" in language: | |
| command="pip install --break-system-packages " | |
| elif "node" in language: | |
| command="npm install " | |
| else: | |
| command="ls" | |
| if packages != "" and packages != " ": | |
| package_logs=run( | |
| f"{command} {package_names}", timeout_sec=300,forever_cmd= 'false' | |
| ) | |
| if "ERROR" in package_logs: | |
| return {"package_installation_log":package_logs,"info":"Package installation failed. Please check the package names. Tip:Try using another package/method to accomplish the task."} | |
| transfer_files2() | |
| transfer_files() | |
| f = open(os.path.join(destination_dir, filename), "w") | |
| f.write(code) | |
| f.close() | |
| global files_list | |
| if start_cmd != "" and start_cmd != " ": | |
| stdot=run(start_cmd, 120,forever_cmd) | |
| else: | |
| stdot="File created successfully." | |
| onlyfiles = glob.glob("/app/code_interpreter/*") | |
| onlyfiles=list(set(onlyfiles)-set(files_list)) | |
| uploaded_filenames=[] | |
| for files in onlyfiles: | |
| try: | |
| uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
| uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
| except: | |
| pass | |
| files_list=onlyfiles | |
| return {"output":stdot,"Files_download_link":uploaded_filenames} | |
| def run_code_files(start_cmd:str,forever_cmd:str) -> dict: | |
| """Executes a shell command to run code files from /app/code_interpreter. | |
| Runs the given `start_cmd`. The execution behavior depends on `forever_cmd`. | |
| Any server/website started should use port 1337. | |
| Args: | |
| start_cmd (str): The shell command to execute the code. | |
| (e.g., ``python /app/code_interpreter/app.py`` or ``node /app/code_interpreter/server.js``). | |
| Files must be in ``/app/code_interpreter``. | |
| forever_cmd (str): Execution mode. | |
| - ``'true'``: Runs indefinitely (for servers/websites). | |
| - ``'false'``: Runs up to 300s, captures output. | |
| Returns: | |
| dict: A dictionary containing: | |
| - ``'output'`` (str): Captured stdout (mainly when forever_cmd='false'). | |
| - ``'Files_download_link'`` (Any): Links/identifiers for downloadable files. | |
| Notes: | |
| - After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
| - When editing and subsequently re-executing the server with the forever_cmd='true' setting, the previous server instance will be automatically terminated, and the updated server will commence operation. This functionality negates the requirement for manual process termination commands such as pkill node. | |
| - The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ (ONLY if the website is running successfully) | |
| """ | |
| global files_list | |
| stdot=run(start_cmd, 300,forever_cmd) | |
| onlyfiles = glob.glob("/app/code_interpreter/*") | |
| onlyfiles=list(set(onlyfiles)-set(files_list)) | |
| uploaded_filenames=[] | |
| for files in onlyfiles: | |
| try: | |
| uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
| uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
| except: | |
| pass | |
| files_list=onlyfiles | |
| return {"output":stdot,"Files_download_link":uploaded_filenames} | |
| def run_shell_command(cmd:str,forever_cmd:str) -> dict: | |
| """Executes a shell command in a sandboxed Alpine Linux environment. | |
| Runs the provided `cmd` string within a bash shell. Commands are executed | |
| relative to the `/app/code_interpreter/` working directory by default. | |
| The execution behavior (indefinite run vs. timeout) is controlled by | |
| the `forever_cmd` parameter. | |
| Important Environment Notes: | |
| - The execution environment is **Alpine Linux**. Commands should be | |
| compatible . | |
| - `sudo` commands are restricted for security reasons.Hence commands which require elevated privelages like `apk add` CANNOT be executed.Instead try to use `pip install` or `npm install` commands. | |
| - Standard bash features like `&&`, `||`, pipes (`|`), etc., are supported. | |
| Args: | |
| cmd (str): The shell command to execute. | |
| Example: ``mkdir test_dir && ls -l`` | |
| forever_cmd (str): Determines the execution mode. | |
| - ``'true'``: Runs the command indefinitely. Suitable | |
| for starting servers or long-running processes. | |
| Output capture might be limited. | |
| - ``'false'``: Runs the command until completion or | |
| a 300-second timeout, whichever comes first. | |
| Captures standard output. | |
| Returns: | |
| dict: A dictionary containing the execution results: | |
| - ``'output'`` (str): The captured standard output (stdout) and potentially | |
| standard error (stderr) from the command. | |
| """ | |
| transfer_files() | |
| transfer_files2() | |
| output=run(cmd, 300,forever_cmd) | |
| return {"output":output} | |
| def install_python_packages(python_packages:str) -> dict: | |
| """python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl""" | |
| global sbx | |
| package_names = python_packages.strip() | |
| command="pip install" | |
| if not package_names: | |
| return | |
| stdot=run( | |
| f"{command} --break-system-packages {package_names}", timeout_sec=300, forever_cmd= 'false' | |
| ) | |
| return {"stdout":stdot,"info":"Ran package installation command"} | |
| def get_youtube_transcript(videoid:str) -> dict: | |
| """Get the transcript of a youtube video by passing the video id.Eg videoid=ZacjOVVgoLY""" | |
| conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com") | |
| headers = { | |
| 'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
| 'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com" | |
| } | |
| conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers) | |
| res = conn.getresponse() | |
| data = res.read() | |
| return json.loads(data) | |
| def read_excel_file(filename) -> dict: | |
| """Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory. """ | |
| global destination_dir | |
| transfer_files2() | |
| transfer_files() | |
| workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename)) | |
| # Create an empty dictionary to store the data | |
| excel_data_dict = {} | |
| # Iterate over all sheets | |
| for sheet_name in workbook.sheetnames: | |
| sheet = workbook[sheet_name] | |
| # Iterate over all rows and columns | |
| for row in sheet.iter_rows(): | |
| for cell in row: | |
| # Get cell coordinate (e.g., 'A1') and value | |
| cell_coordinate = cell.coordinate | |
| cell_value = cell.value | |
| if cell_value is not None: | |
| excel_data_dict[cell_coordinate] = str(cell_value) | |
| return excel_data_dict | |
| def scrape_websites(url_list:list,query:str) -> list: | |
| """Scrapes specific website content.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website,Summarise the content in very great detail,etc.Maximum 4 urls can be passed at a time.""" | |
| conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com") | |
| headers = { | |
| 'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
| 'x-rapidapi-host': "scrapeninja.p.rapidapi.com", | |
| 'Content-Type': "application/json" | |
| } | |
| Output="" | |
| links="" | |
| content="" | |
| for urls in url_list: | |
| payload = {"url" :urls} | |
| payload=json.dumps(payload) | |
| conn.request("POST", "/scrape", payload, headers) | |
| res = conn.getresponse() | |
| data = res.read() | |
| content=content+str(data.decode("utf-8")) | |
| #Only thing llama 4 is good for. | |
| response = clienty.chat.completions.create( | |
| model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", | |
| messages=[ | |
| {"role": "user", "content": f"{query} [CONTENT]:{content}"} | |
| ],stream=True | |
| ) | |
| for chunk in response: | |
| Output = Output +str(chunk.choices[0].delta.content) | |
| #-------------- | |
| response2 = clienty.chat.completions.create( | |
| model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", | |
| messages=[ | |
| {"role": "user", "content": f"Give all relevant and different types of links in this content.The links may be relevant image links , file links , video links , website links , etc .You must give Minimum 30 links and maximum 50 links.[CONTENT]:{content}"} | |
| ],stream=True | |
| ) | |
| for chunk in response2: | |
| links = links +str(chunk.choices[0].delta.content) | |
| return {"website_content":Output,"relevant_links":links} | |
| if __name__ == "__main__": | |
| # Initialize and run the server | |
| Ngrok=pexpect.spawn('bash') | |
| Ngrok.sendline("ngrok http --url=suitable-liked-ibex.ngrok-free.app 1337 --config /home/node/.config/ngrok/ngrok.yml") | |
| Ngrok.readline().decode() | |
| mcp.run(transport='stdio') |