import base64
import logging
import aiohttp
from langchain.agents import create_agent
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.tools import BaseTool
from manolo_bot.ai.config import BotConfig
from manolo_bot.ai.llmbot import LLMBot
from manolo_bot.storage.base import BaseMessagesStorage
[docs]
class LLMAgent(LLMBot):
"""
Advanced Telegram LLM Chat Bot using a LangGraph-based agent.
This bot can use tools and dynamically integrate with MCP servers.
"""
bind_tools_on_init = False
def __init__(
self,
llm: BaseChatModel,
bot_config: BotConfig,
system_instructions: list[BaseMessage],
messages_storage: BaseMessagesStorage,
tools: list[BaseTool] | None = None,
) -> None:
super().__init__(llm, bot_config, system_instructions, messages_storage, tools=tools)
# Don't create agent yet - wait for async initialization
self.agent = None
[docs]
async def initialize_async_resources(self) -> None:
"""Initialize async resources and create agent with all tools."""
await super().initialize_async_resources()
# Create agent with all tools (custom + MCP)
from manolo_bot.ai.tools import get_all_tools
# Use the tools passed in __init__ if available, otherwise get default ones
tools = await get_all_tools(self._mcp_manager, self.bot_config, custom_tools=self.tools)
self.agent = create_agent(
model=self.llm,
tools=tools,
)
logging.debug(f"Agent created with {len(tools)} tools")
# is probably better to not use the agent for this
# def generate_feedback_message(self, prompt: str, max_length: int = 200) -> str:
# logging.debug("Generating feedback message")
#
# response = self.agent.invoke({"messages": [{"role": "user", "content": prompt}]})
#
# # Clean up the response if needed
# feedback_message = response["messages"][-1].content.strip()
#
# # Ensure the message isn't too long
# if len(feedback_message) > max_length:
# feedback_message = feedback_message[: max_length - 3] + "..."
#
# logging.debug(f"Generated feedback message: {feedback_message}")
# return feedback_message
[docs]
async def answer_message(self, chat_id: int, message: str) -> BaseMessage:
self.messages_storage.add_message(HumanMessage(content=message))
self.truncate_chat_context()
config = self._get_langchain_config(chat_id)
ai_msg = await self.agent.ainvoke(
{"messages": self.system_instructions + self.messages_storage.messages},
config=config,
)
return ai_msg["messages"][-1]
[docs]
async def answer_image_message(self, chat_id: int, text: str, image: str) -> BaseMessage:
"""
Answer an image message.
:param chat_id: Chat ID
:param text: Text to answer
:param image: Image to answer
:return: Response
"""
logging.debug(f"Image message: {text}")
try:
async with aiohttp.ClientSession() as session:
timeout = self._get_session_timeout()
async with session.get(image, timeout=timeout) as response:
response.raise_for_status()
image_bytes = await response.read()
image_data = base64.b64encode(image_bytes).decode("utf-8")
llm_message = HumanMessage(
content=[
{
"type": "text",
"text": text,
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
},
]
)
self.messages_storage.add_message(llm_message)
self.truncate_chat_context()
config = self._get_langchain_config(chat_id)
response = (await self.agent.ainvoke({"messages": self.messages_storage.messages}, config=config))[
"messages"
][-1]
except (aiohttp.ClientError, Exception) as e:
if isinstance(e, aiohttp.ClientError):
logging.error(f"Failed to get image: {image}")
logging.exception(e)
response = BaseMessage(content="NO_ANSWER", type="text")
logging.debug(f"Image message response: {response}")
return response
[docs]
async def answer_voice_message(self, chat_id: int, text: str, audio: str) -> BaseMessage:
"""
Answer a voice message.
:param chat_id: Chat ID
:param text: Text to answer
:param audio: Audio to answer
:return: Response
"""
logging.debug(f"Voice message: {text}")
try:
async with aiohttp.ClientSession() as session:
timeout = self._get_session_timeout()
async with session.get(audio, timeout=timeout) as response:
response.raise_for_status()
audio_bytes = await response.read()
audio_data = base64.b64encode(audio_bytes).decode("utf-8")
llm_message = HumanMessage(
content=[
{
"type": "text",
"text": text,
},
{
"type": "media",
"mime_type": "audio/ogg",
"data": audio_data,
},
]
)
self.messages_storage.add_message(llm_message)
self.truncate_chat_context()
config = self._get_langchain_config(chat_id)
response = (
await self.agent.ainvoke(
{"messages": self.system_instructions + self.messages_storage.messages}, config=config
)
)["messages"][-1]
except (aiohttp.ClientError, Exception) as e:
if isinstance(e, aiohttp.ClientError):
logging.error(f"Failed to get audio: {audio}")
logging.exception(e)
response = BaseMessage(content="NO_ANSWER", type="text")
logging.debug(f"Voice message response: {response}")
return response