Source code for manolo_bot.storage.messages.redis

import logging

import redis

from manolo_bot.storage.messages.base import (
    BaseDBHelper,
    BaseMessagesStorage,
    StorageMessage,
    convert_json_to_message,
    get_messages_key,
)


[docs] class RedisDBHelper(BaseDBHelper): """ Helper class for Redis database operations. """ def __init__(self, db_url) -> None: """ Initializes the Redis database helper. :param db_url: The URL of the Redis database. """ self.pool = redis.asyncio.ConnectionPool.from_url(db_url) self.client = None
[docs] async def disconnect(self) -> None: """ Disconnects from the Redis database. """ logging.debug("Disconnecting from the database") await self.client.aclose()
[docs] async def connect(self) -> None: """ Connects to the Redis database. """ logging.debug("Connecting to the database") self.client = redis.asyncio.Redis.from_pool(self.pool)
[docs] class RedisMessagesStorage(BaseMessagesStorage): """ Redis-based implementation of message storage. """ def __init__(self, db: RedisDBHelper, bot_uuid: str, chat_id: int) -> None: """ Initializes the Redis messages storage. :param db: The RedisDBHelper instance. :param bot_uuid: The UUID of the bot. :param chat_id: The ID of the chat. """ super().__init__(bot_uuid, chat_id) self.client = db.client
[docs] async def refresh_messages(self) -> None: """ Updates the messages list from the Redis database. """ key = get_messages_key(self.bot_uuid, self.chat_id) raw_messages = await self.client.lrange(key, 0, -1) self._messages = [StorageMessage(message=convert_json_to_message(raw_message)) for raw_message in raw_messages]
[docs] async def clear_messages(self) -> None: """ Clears all messages from the Redis database for the current chat. """ await self.client.delete(get_messages_key(self.bot_uuid, self.chat_id)) self._messages = []
[docs] async def commit(self) -> None: """ Include new messages and remove deleted messages from the Redis database. """ key = get_messages_key(self.bot_uuid, self.chat_id) for storage_message in self._messages: if storage_message.new: await self.client.rpush(key, storage_message.message.model_dump_json()) elif storage_message.deleted: await self.client.lrem(key, 1, storage_message.message.model_dump_json())
# await self.refresh_messages()