import gradio as gr import os from smolagents import OpenAIServerModel from smolagents import CodeAgent, MCPClient from smolagents.gradio_ui import stream_to_gradio try: system_prompt = """You are an expert of Relative Strength (RS) investing style, as taught by Premal Sir. You reply with facts on RS methodology, taking help from tools accessible with you. There can be situations when a clear answer is not revealed from the tool searches. In that case reply as: \"No Clear Answer For The Request, Please Ask in RSWarriors Forum!!!\" and terminate the processing. You will never make up or assume.""" mcp_client = MCPClient( { "url": "https://bharatcoder-rs-studies.hf.space/gradio_api/mcp/", "transport": "streamable-http", } ) tools = mcp_client.get_tools() # model = InferenceClientModel(token=os.getenv("HUGGINGFACE_API_TOKEN")) model = OpenAIServerModel( model_id=os.getenv("LLM_MODEL_ID"), api_base=os.getenv("LLM_BASSE_URL"), api_key=os.getenv("LLM_API_TOKEN"), ) # Enable streaming in the agent agent = CodeAgent( tools=[*tools], model=model, instructions=system_prompt, additional_authorized_imports=["json", "ast", "urllib", "base64"], stream_outputs=True # Enable streaming ) # Streaming function for ChatInterface def chat_with_agent(message, history): """ Generator function that yields streaming responses from the agent. """ try: # Use stream_to_gradio to get streaming messages for gradio_message in stream_to_gradio( agent=agent, task=message, ): # gradio_message is a ChatMessage object # Extract the content to yield if hasattr(gradio_message, 'content'): yield gradio_message.content else: yield str(gradio_message) except Exception as e: yield f"Error: {str(e)}" demo = gr.ChatInterface( fn=chat_with_agent, type="messages", title="RSWarriors - RS chatbot", description="RS Chatbot - Remember AI makes mistakes.\n!!!DO NOT TAKE INVESTMENT DECISIONS BASED ON THIS OUTPUT!!!", ) demo.launch() finally: mcp_client.disconnect()