Spaces:
Running
Running
Chandima Prabhath
Add health check endpoint and improve logging context; update requirements for telegram bot support
b9cdf7a
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import json | |
| import asyncio | |
| from typing import List, Optional, Literal | |
| from collections import defaultdict, deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from telegram import Update, Message, Bot | |
| from telegram.ext import ( | |
| ApplicationBuilder, | |
| ContextTypes, | |
| CommandHandler, | |
| MessageHandler, | |
| filters, | |
| ) | |
| from pydantic import BaseModel, Field, ValidationError | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from polLLM import generate_llm, LLMBadRequestError | |
| # --- Logging Setup --------------------------------------------------------- | |
| LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() | |
| logger = logging.getLogger("eve_bot") | |
| logger.setLevel(LOG_LEVEL) | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| "%(asctime)s [%(levelname)s] [%(chat_id)s/%(user_id)s] %(message)s" | |
| ) | |
| handler.setFormatter(formatter) | |
| class ContextFilter(logging.Filter): | |
| def filter(self, record): | |
| record.chat_id = getattr(record, "chat_id", "-") | |
| record.user_id = getattr(record, "user_id", "-") | |
| return True | |
| handler.addFilter(ContextFilter()) | |
| logger.handlers = [handler] | |
| # Thread‐local to carry context through helpers | |
| _thread_ctx = threading.local() | |
| def set_thread_context(chat_id, user_id, message_id): | |
| _thread_ctx.chat_id = chat_id | |
| _thread_ctx.user_id = user_id | |
| _thread_ctx.message_id = message_id | |
| def get_thread_context(): | |
| return ( | |
| getattr(_thread_ctx, "chat_id", None), | |
| getattr(_thread_ctx, "user_id", None), | |
| getattr(_thread_ctx, "message_id", None), | |
| ) | |
| # --- Conversation History ------------------------------------------------- | |
| history = defaultdict(lambda: deque(maxlen=20)) | |
| def record_user_message(chat_id, user_id, message): | |
| history[(chat_id, user_id)].append(f"User: {message}") | |
| def record_bot_message(chat_id, user_id, message): | |
| history[(chat_id, user_id)].append(f"Assistant: {message}") | |
| def get_history_text(chat_id, user_id): | |
| return "\n".join(history[(chat_id, user_id)]) | |
| def clear_history(chat_id, user_id): | |
| history[(chat_id, user_id)].clear() | |
| # --- Config --------------------------------------------------------------- | |
| class BotConfig: | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
| IMAGE_DIR = "/tmp/images" | |
| AUDIO_DIR = "/tmp/audio" | |
| DEFAULT_IMAGE_COUNT = 4 | |
| def validate(cls): | |
| if not cls.TELEGRAM_TOKEN: | |
| raise ValueError("Missing TELEGRAM_TOKEN") | |
| BotConfig.validate() | |
| # --- Threading & Queues --------------------------------------------------- | |
| task_queue = queue.Queue() | |
| polls = {} | |
| def worker(): | |
| while True: | |
| task = task_queue.get() | |
| try: | |
| if task["type"] == "image": | |
| _fn_generate_images(**task) | |
| except Exception as e: | |
| logger.error(f"Worker error {task}: {e}") | |
| finally: | |
| task_queue.task_done() | |
| for _ in range(4): | |
| threading.Thread(target=worker, daemon=True).start() | |
| # --- Core Handlers -------------------------------------------------------- | |
| async def _fn_send_text(mid: int, cid: int, text: str, context: ContextTypes.DEFAULT_TYPE): | |
| chat_id, user_id, _ = get_thread_context() | |
| msg: Message = await context.bot.send_message( | |
| chat_id=cid, | |
| text=text, | |
| reply_to_message_id=mid | |
| ) | |
| record_bot_message(chat_id, user_id, text) | |
| # enqueue audio reply in async loop | |
| context.application.create_task(_fn_voice_reply_async( | |
| msg.message_id, cid, text, context | |
| )) | |
| async def _fn_send_text_wrapper(mid, cid, message, context): | |
| await _fn_send_text(mid, cid, message, context) | |
| async def _fn_voice_reply_async(mid: int, cid: int, prompt: str, context: ContextTypes.DEFAULT_TYPE): | |
| proc = ( | |
| f"Just say this exactly as written in a friendly, playful, " | |
| f"happy and helpful but a little bit clumsy-cute way: {prompt}" | |
| ) | |
| res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) | |
| if res and res[0]: | |
| path, _ = res | |
| with open(path, "rb") as f: | |
| await context.bot.send_audio(chat_id=cid, audio=f, reply_to_message_id=mid) | |
| os.remove(path) | |
| else: | |
| await _fn_send_text(mid, cid, prompt, context) | |
| async def _fn_summarize(mid, cid, text, context): | |
| summary = generate_llm(f"Summarize:\n\n{text}") | |
| await _fn_send_text(mid, cid, summary, context) | |
| async def _fn_translate(mid, cid, lang, text, context): | |
| resp = generate_llm(f"Translate to {lang}:\n\n{text}") | |
| await _fn_send_text(mid, cid, resp, context) | |
| async def _fn_joke(mid, cid, context): | |
| try: | |
| j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| joke = f"{j['setup']}\n\n{j['punchline']}" | |
| except: | |
| joke = generate_llm("Tell me a short joke.") | |
| await _fn_send_text(mid, cid, joke, context) | |
| async def _fn_weather(mid, cid, loc, context): | |
| raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
| report = generate_llm(f"Give a weather report in °C:\n\n{raw}") | |
| await _fn_send_text(mid, cid, report, context) | |
| async def _fn_inspire(mid, cid, context): | |
| quote = generate_llm("Give me a unique, random short inspirational quote.") | |
| await _fn_send_text(mid, cid, f"✨ {quote}", context) | |
| async def _fn_meme(mid, cid, txt, context): | |
| await context.bot.send_message(chat_id=cid, text="🎨 Generating meme…", reply_to_message_id=mid) | |
| task_queue.put({"type":"image","message_id":mid,"chat_id":cid,"prompt":f"meme: {txt}"}) | |
| async def _fn_poll_create(mid, cid, question, options, context): | |
| votes = {i+1:0 for i in range(len(options))} | |
| polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}} | |
| text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options)) | |
| await context.bot.send_message(chat_id=cid, text=text, reply_to_message_id=mid, parse_mode="Markdown") | |
| record_bot_message(cid, get_thread_context()[1], text) | |
| async def _fn_poll_vote(mid, cid, voter, choice, context): | |
| poll = polls.get(cid) | |
| if not poll or choice<1 or choice>len(poll["options"]): return | |
| prev = poll["voters"].get(voter) | |
| if prev: poll["votes"][prev] -= 1 | |
| poll["votes"][choice] += 1 | |
| poll["voters"][voter] = choice | |
| await _fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}", context) | |
| async def _fn_poll_results(mid, cid, context): | |
| poll = polls.get(cid) | |
| if not poll: | |
| await _fn_send_text(mid, cid, "No active poll.", context) | |
| return | |
| txt = f"📊 *Results:* {poll['question']}\n" + "\n".join( | |
| f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
| ) | |
| await _fn_send_text(mid, cid, txt, context) | |
| async def _fn_poll_end(mid, cid, context): | |
| poll = polls.pop(cid, None) | |
| if not poll: | |
| await _fn_send_text(mid, cid, "No active poll.", context) | |
| return | |
| txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( | |
| f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
| ) | |
| await _fn_send_text(mid, cid, txt, context) | |
| def _fn_generate_images(message_id, chat_id, prompt, count=1, width=None, height=None, **_): | |
| """ | |
| Runs in a background thread. Spins up its own asyncio loop | |
| so we can `await` the bot’s send_message / send_photo coroutines. | |
| """ | |
| b = Bot(BotConfig.TELEGRAM_TOKEN) | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete( | |
| b.send_message( | |
| chat_id=chat_id, | |
| text=f"✨ Generating {count} image(s)…", | |
| reply_to_message_id=message_id | |
| ) | |
| ) | |
| for i in range(1, count+1): | |
| try: | |
| img, path, ret_p, url = generate_image( | |
| prompt, str(message_id), str(message_id), | |
| BotConfig.IMAGE_DIR, width=width, height=height | |
| ) | |
| caption = f"✨ Image {i}/{count}: {url}\n\n{ret_p}" | |
| with open(path, "rb") as f: | |
| loop.run_until_complete( | |
| b.send_photo( | |
| chat_id=chat_id, | |
| photo=f, | |
| caption=caption, | |
| reply_to_message_id=message_id | |
| ) | |
| ) | |
| os.remove(path) | |
| except Exception as e: | |
| logger.warning(f"Img {i}/{count} failed: {e}") | |
| loop.run_until_complete( | |
| b.send_message( | |
| chat_id=chat_id, | |
| text=f"😢 Failed to generate image {i}/{count}.", | |
| reply_to_message_id=message_id | |
| ) | |
| ) | |
| finally: | |
| loop.close() | |
| # --- Pydantic Models & Intent Routing ------------------------------------ | |
| class BaseIntent(BaseModel): | |
| action: str | |
| class SummarizeIntent(BaseIntent): | |
| action: Literal["summarize"] | |
| text: str | |
| class TranslateIntent(BaseIntent): | |
| action: Literal["translate"] | |
| lang: str | |
| text: str | |
| class JokeIntent(BaseIntent): | |
| action: Literal["joke"] | |
| class WeatherIntent(BaseIntent): | |
| action: Literal["weather"] | |
| location: str | |
| class InspireIntent(BaseIntent): | |
| action: Literal["inspire"] | |
| class MemeIntent(BaseIntent): | |
| action: Literal["meme"] | |
| text: str | |
| class PollCreateIntent(BaseIntent): | |
| action: Literal["poll_create"] | |
| question: str | |
| options: List[str] | |
| class PollVoteIntent(BaseIntent): | |
| action: Literal["poll_vote"] | |
| voter: str | |
| choice: int | |
| class PollResultsIntent(BaseIntent): | |
| action: Literal["poll_results"] | |
| class PollEndIntent(BaseIntent): | |
| action: Literal["poll_end"] | |
| class GenerateImageIntent(BaseModel): | |
| action: Literal["generate_image"] | |
| prompt: str | |
| count: int = Field(default=1, ge=1) | |
| width: Optional[int] | |
| height: Optional[int] | |
| class SendTextIntent(BaseIntent): | |
| action: Literal["send_text"] | |
| message: str | |
| INTENT_MODELS = [ | |
| SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, | |
| InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent, | |
| PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent | |
| ] | |
| async def _fn_enqueue_image(mid, cid, prompt, count, width, height, context): | |
| task_queue.put({ | |
| "type":"image", | |
| "message_id": mid, | |
| "chat_id": cid, | |
| "prompt": prompt, | |
| "count": count, | |
| "width": width, | |
| "height": height | |
| }) | |
| ACTION_HANDLERS = { | |
| "summarize": _fn_summarize, | |
| "translate": _fn_translate, | |
| "joke": _fn_joke, | |
| "weather": _fn_weather, | |
| "inspire": _fn_inspire, | |
| "meme": _fn_meme, | |
| "poll_create": _fn_poll_create, | |
| "poll_vote": _fn_poll_vote, | |
| "poll_results": _fn_poll_results, | |
| "poll_end": _fn_poll_end, | |
| "generate_image": _fn_enqueue_image, | |
| "send_text": _fn_send_text_wrapper, | |
| } | |
| def route_intent(user_input: str, chat_id: str, sender: str): | |
| history_text = get_history_text(chat_id, sender) | |
| sys_prompt = ( | |
| "You never perform work yourself—you only invoke one of the available functions. " | |
| "When the user asks for something that matches a function signature, you must return exactly one JSON object matching that function’s parameters—and nothing else. " | |
| "Do not wrap it in markdown, do not add extra text, and do not show the JSON to the user. " | |
| "If the user’s request does not match any function, reply in plain text, and never mention JSON or internal logic.\n\n" | |
| "- summarize(text)\n" | |
| "- translate(lang, text)\n" | |
| "- joke()\n" | |
| "- weather(location)\n" | |
| "- inspire()\n" | |
| "- meme(text)\n" | |
| "- poll_create(question, options)\n" | |
| "- poll_vote(voter, choice)\n" | |
| "- poll_results()\n" | |
| "- poll_end()\n" | |
| "- generate_image(prompt, count, width, height)\n" | |
| "- send_text(message)\n\n" | |
| "Return only raw JSON matching one of these shapes. For example:\n" | |
| " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n" | |
| "Otherwise, use send_text to reply with plain chat and you should only return one json for the current message not for previous conversations.\n" | |
| f"Conversation so far:\n{history_text}\n\n current message: User: {user_input}" | |
| ) | |
| try: | |
| raw = generate_llm(sys_prompt) | |
| except LLMBadRequestError: | |
| clear_history(chat_id, sender) | |
| return SendTextIntent(action="send_text", message="Oops, let’s start fresh!") | |
| try: | |
| parsed = json.loads(raw) | |
| except json.JSONDecodeError: | |
| return SendTextIntent(action="send_text", message=raw) | |
| for M in INTENT_MODELS: | |
| try: | |
| return M.model_validate(parsed) | |
| except ValidationError: | |
| continue | |
| action = parsed.get("action") | |
| if action in ACTION_HANDLERS: | |
| data = parsed | |
| kwargs = {} | |
| if action == "generate_image": | |
| kwargs = { | |
| "prompt": data.get("prompt",""), | |
| "count": int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)), | |
| "width": data.get("width"), | |
| "height": data.get("height"), | |
| } | |
| elif action == "send_text": | |
| kwargs = {"message": data.get("message","")} | |
| elif action == "translate": | |
| kwargs = {"lang": data.get("lang",""), "text": data.get("text","")} | |
| elif action == "summarize": | |
| kwargs = {"text": data.get("text","")} | |
| elif action == "weather": | |
| kwargs = {"location": data.get("location","")} | |
| elif action == "meme": | |
| kwargs = {"text": data.get("text","")} | |
| elif action == "poll_create": | |
| kwargs = {"question": data.get("question",""), "options": data.get("options",[])} | |
| elif action == "poll_vote": | |
| kwargs = {"voter": sender, "choice": int(data.get("choice",0))} | |
| try: | |
| model = next(m for m in INTENT_MODELS if getattr(m, "__fields__", {}).get("action").default == action) | |
| return model.model_validate({"action":action, **kwargs}) | |
| except Exception: | |
| return SendTextIntent(action="send_text", message=raw) | |
| return SendTextIntent(action="send_text", message=raw) | |
| # --- Telegram Handlers ---------------------------------------------------- | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_text("🌟 Eve is online! Type /help to see commands.") | |
| async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await update.message.reply_markdown( | |
| "🤖 *Eve* commands:\n" | |
| "• /help\n" | |
| "• /summarize <text>\n" | |
| "• /translate <lang>|<text>\n" | |
| "• /joke\n" | |
| "• /weather <loc>\n" | |
| "• /inspire\n" | |
| "• /meme <text>\n" | |
| "• /poll <Q>|opt1|opt2 …\n" | |
| "• /results\n" | |
| "• /endpoll\n" | |
| "• /gen <prompt>|<count>|<width>|<height>\n" | |
| "Otherwise just chat with me." | |
| ) | |
| async def message_router(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| msg: Message = update.message | |
| chat_id = msg.chat.id | |
| user_id = msg.from_user.id | |
| mid = msg.message_id | |
| text = msg.text or "" | |
| set_thread_context(chat_id, user_id, mid) | |
| record_user_message(chat_id, user_id, text) | |
| low = text.lower().strip() | |
| if low == "/help": | |
| return await help_cmd(update, context) | |
| if low.startswith("/summarize "): | |
| return await _fn_summarize(mid, chat_id, text[11:].strip(), context) | |
| if low.startswith("/translate "): | |
| lang, txt = text[11:].split("|",1) | |
| return await _fn_translate(mid, chat_id, lang.strip(), txt.strip(), context) | |
| if low == "/joke": | |
| return await _fn_joke(mid, chat_id, context) | |
| if low.startswith("/weather "): | |
| return await _fn_weather(mid, chat_id, text[9:].strip().replace(" ","+"), context) | |
| if low == "/inspire": | |
| return await _fn_inspire(mid, chat_id, context) | |
| if low.startswith("/meme "): | |
| return await _fn_meme(mid, chat_id, text[6:].strip(), context) | |
| if low.startswith("/poll "): | |
| parts = [p.strip() for p in text[6:].split("|")] | |
| return await _fn_poll_create(mid, chat_id, parts[0], parts[1:], context) | |
| if chat_id in polls and low.isdigit(): | |
| return await _fn_poll_vote(mid, chat_id, str(user_id), int(low), context) | |
| if low == "/results": | |
| return await _fn_poll_results(mid, chat_id, context) | |
| if low == "/endpoll": | |
| return await _fn_poll_end(mid, chat_id, context) | |
| if low.startswith("/gen"): | |
| parts = text[4:].split("|") | |
| pr = parts[0].strip() | |
| ct = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT | |
| width = int(parts[2]) if len(parts)>2 and parts[2].isdigit() else None | |
| height = int(parts[3]) if len(parts)>3 and parts[3].isdigit() else None | |
| task_queue.put({ | |
| "type":"image","message_id":mid,"chat_id":chat_id, | |
| "prompt":pr,"count":ct,"width":width,"height":height | |
| }) | |
| return | |
| intent = route_intent(text, str(chat_id), str(user_id)) | |
| handler = ACTION_HANDLERS.get(intent.action) | |
| kwargs = intent.model_dump(exclude={"action"}) | |
| await handler(mid, chat_id, **kwargs, context=context) | |
| def main(): | |
| app = ApplicationBuilder().token(BotConfig.TELEGRAM_TOKEN).build() | |
| app.add_handler(CommandHandler("start", start)) | |
| app.add_handler(CommandHandler("help", help_cmd)) | |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, message_router)) | |
| logger.info("Starting Telegram bot…") | |
| app.run_polling() | |
| if __name__ == "__main__": | |
| main() | |