From d1435a958b8b901010d01add70985c1a7c4136af Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 17 Mar 2025 06:34:00 +0300 Subject: [PATCH 1/8] 0.5.2 - draft chat fetcher --- botspot/__init__.py | 5 +- botspot/components/new/chat_fetcher.py | 513 +++++++++++++++++- botspot/core/bot_manager.py | 8 +- botspot/core/botspot_settings.py | 2 + botspot/core/dependency_manager.py | 12 + botspot/utils/deps_getters.py | 14 + dev/todo.md | 22 + example.env | 8 +- .../chat_fetcher_demo/README.md | 65 +++ .../chat_fetcher_demo/bot.py | 276 ++++++++++ .../chat_fetcher_demo/sample.env | 28 + pyproject.toml | 2 +- 12 files changed, 948 insertions(+), 7 deletions(-) create mode 100644 examples/components_examples/chat_fetcher_demo/README.md create mode 100644 examples/components_examples/chat_fetcher_demo/bot.py create mode 100644 examples/components_examples/chat_fetcher_demo/sample.env diff --git a/botspot/__init__.py b/botspot/__init__.py index fd55786..316d1ad 100644 --- a/botspot/__init__.py +++ b/botspot/__init__.py @@ -19,7 +19,7 @@ from .components.features import multi_forward_handler from .components.main import event_scheduler, single_user_mode, telethon_manager from .components.middlewares import error_handler -from .components.new import llm_provider +from .components.new import chat_fetcher, llm_provider from .components.qol import bot_commands_menu, bot_info, print_bot_url from .core import get_dependency_manager from .utils import ( @@ -71,6 +71,9 @@ "get_telethon_manager", # LLM Provider "llm_provider", + # ChatFetcher + "chat_fetcher", + "get_chat_fetcher", # utils "get_dependency_manager", "get_bot", diff --git a/botspot/components/new/chat_fetcher.py b/botspot/components/new/chat_fetcher.py index 482ed61..14e969b 100644 --- a/botspot/components/new/chat_fetcher.py +++ b/botspot/components/new/chat_fetcher.py @@ -1,8 +1,25 @@ +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +from aiogram import Dispatcher +from pydantic import BaseModel, Field from pydantic_settings import BaseSettings +from botspot.utils.internal import get_logger + +if TYPE_CHECKING: + from motor.motor_asyncio import AsyncIOMotorCollection + from telethon import TelegramClient + +logger = get_logger() + class ChatFetcherSettings(BaseSettings): enabled: bool = False + cache_enabled: bool = True + collection_name: str = "telegram_messages" + max_messages_per_request: int = 100 class Config: env_prefix = "BOTSPOT_CHAT_FETCHER_" @@ -11,19 +28,509 @@ class Config: extra = "ignore" +class ChatModel(BaseModel): + """Basic model for storing chat information""" + + id: int + title: str + username: Optional[str] = None + type: str = "private" # private, group, channel, etc. + last_message_date: Optional[datetime] = None + + +class MessageModel(BaseModel): + """Basic model for storing message information""" + + id: int + chat_id: int + text: Optional[str] = None + date: datetime + from_id: Optional[int] = None + reply_to_message_id: Optional[int] = None + media: bool = False + media_type: Optional[str] = None + file_path: Optional[str] = None + metadata: Dict[str, Any] = Field(default_factory=dict) + + class ChatFetcher: - pass + """Component for fetching and caching Telegram chats and messages""" + + def __init__( + self, + settings: ChatFetcherSettings, + collection: Optional["AsyncIOMotorCollection"] = None, + single_user_id: Optional[str] = None, + ): + self.settings = settings + self.collection = collection + self.single_user_id = single_user_id + logger.info(f"ChatFetcher initialized with cache_enabled={settings.cache_enabled}") + + async def get_client(self, user_id: Optional[int] = None) -> "TelegramClient": + """Get telethon client for the specified user or for single user mode""" + from botspot.components.main.telethon_manager import get_telethon_manager + from botspot.core.dependency_manager import get_dependency_manager + + deps = get_dependency_manager() + telethon_manager = get_telethon_manager() + + # If user_id is not provided and single user mode is enabled, use the single user + if user_id is None: + if not deps.botspot_settings.single_user_mode.enabled: + raise ValueError("User ID must be provided when not in single user mode") + user = deps.botspot_settings.single_user_mode.user + # Convert username to numeric ID if needed + if user.startswith("@"): + # TODO: Implement username to ID conversion + raise NotImplementedError("Username conversion not implemented yet") + user_id = int(user) + + return await telethon_manager.get_client(user_id) + + async def get_chats(self, user_id: Optional[int] = None, limit: int = 100) -> List[ChatModel]: + """Get chats for the specified user or for single user mode""" + client = await self.get_client(user_id) + + # Get dialogs from Telethon + dialogs = await client.get_dialogs(limit=limit) + + # Convert to ChatModel + chats = [] + for dialog in dialogs: + entity = dialog.entity + + # Determine chat type + chat_type = "private" + if hasattr(entity, "megagroup") and entity.megagroup: + chat_type = "group" + elif hasattr(entity, "broadcast") and entity.broadcast: + chat_type = "channel" + + chat = ChatModel( + id=dialog.id, + title=dialog.title, + username=getattr(entity, "username", None), + type=chat_type, + last_message_date=dialog.date, + ) + chats.append(chat) + + # Cache the chat if caching is enabled + if self.settings.cache_enabled and self.collection: + await self._cache_chat(chat) + + return chats + + async def find_chat( + self, query: str, user_id: Optional[int] = None, limit: int = 5 + ) -> List[ChatModel]: + """Find chats by title or username""" + # First try to get from cache + if self.settings.cache_enabled and self.collection: + cached_chats = await self._find_chats_in_cache(query, limit) + if cached_chats: + return cached_chats + + # If not found in cache, get all chats and filter + chats = await self.get_chats(user_id, limit=100) # Get more to search through + + # Filter by title or username + query = query.lower() + filtered_chats = [ + chat + for chat in chats + if (query in chat.title.lower() or (chat.username and query in chat.username.lower())) + ] + + return filtered_chats[:limit] + + async def get_messages( + self, + chat_id: int, + user_id: Optional[int] = None, + limit: int = 100, + offset_id: int = 0, + min_id: Optional[int] = None, + max_id: Optional[int] = None, + reverse: bool = False, + ) -> List[MessageModel]: + """Get messages from a specific chat""" + client = await self.get_client(user_id) + + # Try to get from cache first if appropriate + if ( + self.settings.cache_enabled + and self.collection + and not min_id + and not max_id + and offset_id == 0 + ): + cached_messages = await self._get_cached_messages(chat_id, limit) + if cached_messages: + return cached_messages + + # Limit the number of messages per request + limit = min(limit, self.settings.max_messages_per_request) + + # Get messages from Telethon + messages = await client.get_messages( + chat_id, limit=limit, offset_id=offset_id, min_id=min_id, max_id=max_id, reverse=reverse + ) + + # Convert to MessageModel + result = [] + for msg in messages: + # Determine media type + media_type = None + has_media = False + + if hasattr(msg, "media") and msg.media: + has_media = True + media_type = type(msg.media).__name__ + + message = MessageModel( + id=msg.id, + chat_id=chat_id, + text=msg.text if hasattr(msg, "text") else None, + date=msg.date, + from_id=msg.from_id.user_id if hasattr(msg, "from_id") and msg.from_id else None, + reply_to_message_id=( + msg.reply_to.reply_to_msg_id + if hasattr(msg, "reply_to") and msg.reply_to + else None + ), + media=has_media, + media_type=media_type, + ) + result.append(message) + + # Cache the message if caching is enabled + if self.settings.cache_enabled and self.collection: + await self._cache_message(message) + + return result + + async def find_messages( + self, + query: str, + chat_id: int, + user_id: Optional[int] = None, + limit: int = 5, + search_in_cached: bool = True, + ) -> List[MessageModel]: + """Find messages by text content""" + # First try to search in cache + if search_in_cached and self.settings.cache_enabled and self.collection: + cached_messages = await self._find_messages_in_cache(query, chat_id, limit) + if cached_messages: + return cached_messages + + # If not found in cache, use telethon's search function + client = await self.get_client(user_id) + messages = await client.get_messages(chat_id, search=query, limit=limit) + + # Convert to MessageModel + result = [] + for msg in messages: + # Determine media type + media_type = None + has_media = False + + if hasattr(msg, "media") and msg.media: + has_media = True + media_type = type(msg.media).__name__ + + message = MessageModel( + id=msg.id, + chat_id=chat_id, + text=msg.text if hasattr(msg, "text") else None, + date=msg.date, + from_id=msg.from_id.user_id if hasattr(msg, "from_id") and msg.from_id else None, + reply_to_message_id=( + msg.reply_to.reply_to_msg_id + if hasattr(msg, "reply_to") and msg.reply_to + else None + ), + media=has_media, + media_type=media_type, + ) + result.append(message) + + # Cache the message if caching is enabled + if self.settings.cache_enabled and self.collection: + await self._cache_message(message) + + return result + + async def download_media( + self, + message_id: int, + chat_id: int, + user_id: Optional[int] = None, + target_path: Optional[Union[str, Path]] = None, + ) -> Optional[str]: + """Download media from a message""" + client = await self.get_client(user_id) + + # Get the message + message = await client.get_messages(chat_id, ids=message_id) + + if not message or not message.media: + return None + + # Download the media + file_path = await message.download(file=target_path) + + # Update the cache with file path + if self.settings.cache_enabled and self.collection and file_path: + await self._update_message_file_path(message_id, chat_id, str(file_path)) + + return str(file_path) if file_path else None + # Cache-related methods + async def _cache_chat(self, chat: ChatModel) -> None: + """Cache chat information""" + if not self.collection: + return + + try: + # Check if the chat already exists + existing = await self.collection.find_one({"type": "chat", "chat_id": chat.id}) + + chat_dict = chat.dict() + chat_dict["type"] = "chat" + chat_dict["chat_id"] = chat.id + + if existing: + # Update existing chat + await self.collection.update_one( + {"type": "chat", "chat_id": chat.id}, {"$set": chat_dict} + ) + else: + # Insert new chat + await self.collection.insert_one(chat_dict) + + except Exception as e: + logger.error(f"Error caching chat: {e}") + + async def _cache_message(self, message: MessageModel) -> None: + """Cache message information""" + if not self.collection: + return + + try: + # Check if the message already exists + existing = await self.collection.find_one( + {"type": "message", "message_id": message.id, "chat_id": message.chat_id} + ) + + msg_dict = message.dict() + msg_dict["type"] = "message" + msg_dict["message_id"] = message.id + + if existing: + # Update existing message + await self.collection.update_one( + {"type": "message", "message_id": message.id, "chat_id": message.chat_id}, + {"$set": msg_dict}, + ) + else: + # Insert new message + await self.collection.insert_one(msg_dict) + + except Exception as e: + logger.error(f"Error caching message: {e}") + + async def _update_message_file_path( + self, message_id: int, chat_id: int, file_path: str + ) -> None: + """Update file path in cached message""" + if not self.collection: + return + + try: + await self.collection.update_one( + {"type": "message", "message_id": message_id, "chat_id": chat_id}, + {"$set": {"file_path": file_path}}, + ) + except Exception as e: + logger.error(f"Error updating message file path: {e}") + + async def _get_cached_messages(self, chat_id: int, limit: int) -> List[MessageModel]: + """Get cached messages for a chat""" + if not self.collection: + return [] + + try: + cursor = ( + self.collection.find({"type": "message", "chat_id": chat_id}) + .sort("date", -1) + .limit(limit) + ) + + messages = [] + async for doc in cursor: + # Remove the "type" field added for cache + if "type" in doc: + doc.pop("type") + if "message_id" in doc: + doc.pop("message_id") + + messages.append(MessageModel(**doc)) + + return messages + + except Exception as e: + logger.error(f"Error getting cached messages: {e}") + return [] + + async def _find_chats_in_cache(self, query: str, limit: int) -> List[ChatModel]: + """Find chats in cache by title or username""" + if not self.collection: + return [] + + try: + cursor = self.collection.find( + { + "type": "chat", + "$or": [ + {"title": {"$regex": query, "$options": "i"}}, + {"username": {"$regex": query, "$options": "i"}}, + ], + } + ).limit(limit) + + chats = [] + async for doc in cursor: + # Remove the "type" field added for cache + if "type" in doc: + doc.pop("type") + if "chat_id" in doc: + doc.pop("chat_id") + + chats.append(ChatModel(**doc)) + + return chats + + except Exception as e: + logger.error(f"Error finding chats in cache: {e}") + return [] + + async def _find_messages_in_cache( + self, query: str, chat_id: int, limit: int + ) -> List[MessageModel]: + """Find messages in cache by text content""" + if not self.collection: + return [] + + try: + cursor = self.collection.find( + {"type": "message", "chat_id": chat_id, "text": {"$regex": query, "$options": "i"}} + ).limit(limit) + + messages = [] + async for doc in cursor: + # Remove the "type" field added for cache + if "type" in doc: + doc.pop("type") + if "message_id" in doc: + doc.pop("message_id") + + messages.append(MessageModel(**doc)) + + return messages + + except Exception as e: + logger.error(f"Error finding messages in cache: {e}") + return [] + + +def setup_dispatcher(dp: Dispatcher) -> Dispatcher: + """Setup dispatcher with ChatFetcher commands""" + from aiogram.filters import Command + from aiogram.types import Message + + from botspot.commands_menu import add_command + from botspot.utils.deps_getters import get_chat_fetcher + + @add_command("list_chats", "List your Telegram chats", visibility="hidden") + @dp.message(Command("list_chats")) + async def list_chats_command(message: Message) -> None: + try: + chat_fetcher = get_chat_fetcher() + chats = await chat_fetcher.get_chats(message.from_user.id, limit=10) + + if not chats: + await message.reply("No chats found.") + return + + result = "Your recent chats:\n\n" + for chat in chats: + username = f" (@{chat.username})" if chat.username else "" + result += f"• {chat.title}{username} [{chat.type}]\n" + + await message.reply(result) + except Exception as e: + logger.error(f"Error in list_chats_command: {e}") + await message.reply(f"Error listing chats: {str(e)}") + + @add_command("search_chat", "Search for a chat by name", visibility="hidden") + @dp.message(Command("search_chat")) + async def search_chat_command(message: Message) -> None: + try: + parts = message.text.split(maxsplit=1) + if len(parts) < 2: + await message.reply("Please provide a search query, e.g. /search_chat Python") + return + + query = parts[1] + + chat_fetcher = get_chat_fetcher() + chats = await chat_fetcher.find_chat(query, message.from_user.id) + + if not chats: + await message.reply(f"No chats found matching '{query}'.") + return + + result = f"Chats matching '{query}':\n\n" + for chat in chats: + username = f" (@{chat.username})" if chat.username else "" + result += f"• {chat.title}{username} [{chat.type}]\n" + + await message.reply(result) + except Exception as e: + logger.error(f"Error in search_chat_command: {e}") + await message.reply(f"Error searching chats: {str(e)}") -def setup_dispatcher(dp): return dp def initialize(settings: ChatFetcherSettings) -> ChatFetcher: - pass + """Initialize ChatFetcher component""" + from botspot.core.dependency_manager import get_dependency_manager + + deps = get_dependency_manager() + + # Get MongoDB collection if available + collection = None + if settings.cache_enabled and deps.botspot_settings.mongo_database.enabled: + try: + collection = deps.mongo_database[settings.collection_name] + logger.info(f"ChatFetcher using MongoDB collection: {settings.collection_name}") + except Exception as e: + logger.error(f"Error getting MongoDB collection: {e}") + + # Get single user mode settings if enabled + single_user_id = None + if deps.botspot_settings.single_user_mode.enabled: + single_user_id = deps.botspot_settings.single_user_mode.user + + return ChatFetcher(settings, collection, single_user_id) def get_chat_fetcher() -> ChatFetcher: + """Get ChatFetcher instance from dependency manager""" from botspot.core.dependency_manager import get_dependency_manager deps = get_dependency_manager() diff --git a/botspot/core/bot_manager.py b/botspot/core/bot_manager.py index 885fe8f..8414edb 100644 --- a/botspot/core/bot_manager.py +++ b/botspot/core/bot_manager.py @@ -13,7 +13,7 @@ from botspot.components.features import user_interactions from botspot.components.main import event_scheduler, single_user_mode, telethon_manager, trial_mode from botspot.components.middlewares import error_handler -from botspot.components.new import chat_binder, llm_provider +from botspot.components.new import chat_binder, chat_fetcher, llm_provider from botspot.components.qol import bot_commands_menu, bot_info, print_bot_url from botspot.core.botspot_settings import BotspotSettings from botspot.core.dependency_manager import DependencyManager @@ -61,6 +61,9 @@ def __init__( if self.settings.llm_provider.enabled: self.deps.llm_provider = llm_provider.initialize(self.settings.llm_provider) + if self.settings.chat_fetcher.enabled: + self.deps.chat_fetcher = chat_fetcher.initialize(self.settings.chat_fetcher) + def setup_dispatcher(self, dp: Dispatcher): """Setup dispatcher with components""" # Remove global bot check - each component handles its own requirements @@ -105,3 +108,6 @@ def setup_dispatcher(self, dp: Dispatcher): if self.settings.llm_provider.enabled: llm_provider.setup_dispatcher(dp) + + if self.settings.chat_fetcher.enabled: + chat_fetcher.setup_dispatcher(dp) diff --git a/botspot/core/botspot_settings.py b/botspot/core/botspot_settings.py index 7c1d186..5f97c01 100644 --- a/botspot/core/botspot_settings.py +++ b/botspot/core/botspot_settings.py @@ -13,6 +13,7 @@ from botspot.components.main.trial_mode import TrialModeSettings from botspot.components.middlewares.error_handler import ErrorHandlerSettings from botspot.components.new.chat_binder import ChatBinderSettings +from botspot.components.new.chat_fetcher import ChatFetcherSettings from botspot.components.new.llm_provider import LLMProviderSettings from botspot.components.qol.bot_commands_menu import BotCommandsMenuSettings from botspot.components.qol.bot_info import BotInfoSettings @@ -58,6 +59,7 @@ def friends(self) -> List[str]: send_safe: SendSafeSettings = SendSafeSettings() admin_filter: AdminFilterSettings = AdminFilterSettings() chat_binder: ChatBinderSettings = ChatBinderSettings() + chat_fetcher: ChatFetcherSettings = ChatFetcherSettings() llm_provider: LLMProviderSettings = LLMProviderSettings() class Config: diff --git a/botspot/core/dependency_manager.py b/botspot/core/dependency_manager.py index d15ef83..f89ff2b 100644 --- a/botspot/core/dependency_manager.py +++ b/botspot/core/dependency_manager.py @@ -15,6 +15,7 @@ from botspot.components.data.user_data import UserManager from botspot.components.main.telethon_manager import TelethonManager + from botspot.components.new.chat_fetcher import ChatFetcher from botspot.components.new.chat_binder import ChatBinder from botspot.components.new.llm_provider import LLMProvider @@ -39,6 +40,7 @@ def __init__( self._user_manager = None self._chat_binder = None self._llm_provider = None + self._chat_fetcher = None self.__dict__.update(kwargs) @property @@ -139,6 +141,16 @@ def llm_provider(self) -> "LLMProvider": def llm_provider(self, value): self._llm_provider = value + @property + def chat_fetcher(self) -> "ChatFetcher": + if self._chat_fetcher is None: + raise RuntimeError("Chat Fetcher is not initialized") + return self._chat_fetcher + + @chat_fetcher.setter + def chat_fetcher(self, value): + self._chat_fetcher = value + @classmethod def is_initialized(cls) -> bool: return cls in cls._instances diff --git a/botspot/utils/deps_getters.py b/botspot/utils/deps_getters.py index fd845ca..26aa3f8 100644 --- a/botspot/utils/deps_getters.py +++ b/botspot/utils/deps_getters.py @@ -20,6 +20,8 @@ from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase # noqa: F401 from telethon import TelegramClient + from botspot.components.main.telethon_manager import TelethonManager + from botspot.components.new.chat_fetcher import ChatFetcher # Core getters for bot and dispatcher def get_bot() -> "Bot": @@ -73,3 +75,15 @@ async def get_telethon_client( "get_mongo_client", "get_chat_binder", ] + + +def get_chat_fetcher() -> "ChatFetcher": + """Get ChatFetcher instance from dependency manager.""" + from botspot.core.dependency_manager import get_dependency_manager + + chat_fetcher = get_dependency_manager().chat_fetcher + if chat_fetcher is None: + raise RuntimeError( + "ChatFetcher is not initialized. Make sure chat_fetcher component is enabled in settings." + ) + return chat_fetcher diff --git a/dev/todo.md b/dev/todo.md index 621be9a..80d9e5b 100644 --- a/dev/todo.md +++ b/dev/todo.md @@ -22,6 +22,28 @@ ## Components: +## Chat Fetcher + +- Main features (1-2): + - [ ] Get chats for user + - [ ] Get messages for user from specific chat + - [ ] Find chats and messages with simple lightweight interface + +- Secondary features: + - [ ] Use Telethon Manager to get client + - [ ] Use MongoDB to cache messages and chats + - [ ] Single user mode support (auto-detect user ID) + - [ ] Handle pagination, delays, and API limitations + - [ ] Adapt from existing telegram_downloader implementation + +Notes: + +- Implement caching to work around Telegram API limitations +- Consider importing telegram_downloader as dependency vs. moving code +- Reference implementation in projects/telegram_downloader +- Draft in calmmage/seasonal/season_1_winter_2024/draft/download-telegram-messages +- Should adapt telegram_downloader to use custom DB and telethon client + ## LLM Provider - ✅ DONE - Main features (1-2): diff --git a/example.env b/example.env index cadd83c..15269ae 100644 --- a/example.env +++ b/example.env @@ -13,4 +13,10 @@ BOTSPOT_LLM_PROVIDER_DEFAULT_MAX_TOKENS=1000 BOTSPOT_LLM_PROVIDER_DEFAULT_TIMEOUT=30 # API Keys for different providers BOTSPOT_LLM_PROVIDER_OPENAI_API_KEY= -BOTSPOT_LLM_PROVIDER_ANTHROPIC_API_KEY= \ No newline at end of file +BOTSPOT_LLM_PROVIDER_ANTHROPIC_API_KEY= + +# ChatFetcher settings +BOTSPOT_CHAT_FETCHER_ENABLED=False +BOTSPOT_CHAT_FETCHER_CACHE_ENABLED=True +BOTSPOT_CHAT_FETCHER_COLLECTION_NAME=telegram_messages +BOTSPOT_CHAT_FETCHER_MAX_MESSAGES_PER_REQUEST=100 \ No newline at end of file diff --git a/examples/components_examples/chat_fetcher_demo/README.md b/examples/components_examples/chat_fetcher_demo/README.md new file mode 100644 index 0000000..3a2d2d2 --- /dev/null +++ b/examples/components_examples/chat_fetcher_demo/README.md @@ -0,0 +1,65 @@ +# ChatFetcher Demo Bot + +This demo showcases the ChatFetcher component from Botspot, which allows your bot to +access and search Telegram chats and messages. + +## Features + +- Access user's Telegram chats using Telethon +- List and search chats +- Fetch messages from specific chats +- Search message content +- Cache chats and messages in MongoDB (optional) +- Single user mode support (optional) + +## Prerequisites + +- Python 3.8+ +- Telegram Bot Token +- Telegram API ID and API Hash (from https://my.telegram.org/apps) +- MongoDB (optional, for caching) + +## Setup + +1. Clone the repository +2. Copy `sample.env` to `.env` and fill in the required values +3. Install dependencies: + +```bash +pip install -r requirements.txt # or use poetry +``` + +4. Run the bot: + +```bash +python bot.py +``` + +## Usage + +1. Start the bot with `/start` +2. Setup your Telethon client with `/setup_telethon` +3. List your chats with `/list_chats` +4. Fetch messages from a chat with `/fetch_messages` +5. Search for specific messages with `/search_messages` + +## Component Details + +The ChatFetcher component is designed to: + +- Use Telethon to access user's Telegram account +- Fetch and search chats and messages +- Cache results in MongoDB for faster access +- Handle pagination and API limitations +- Support single user mode + +## Environment Variables + +See `sample.env` for all available configuration options. + +## Dependencies + +- telethon +- motor (MongoDB driver) +- aiogram +- botspot \ No newline at end of file diff --git a/examples/components_examples/chat_fetcher_demo/bot.py b/examples/components_examples/chat_fetcher_demo/bot.py new file mode 100644 index 0000000..25fdb95 --- /dev/null +++ b/examples/components_examples/chat_fetcher_demo/bot.py @@ -0,0 +1,276 @@ +import asyncio +import os + +from aiogram import Bot, Dispatcher, F +from aiogram.filters import Command +from aiogram.fsm.context import FSMContext +from aiogram.fsm.state import State, StatesGroup +from aiogram.types import Message +from dotenv import load_dotenv +from loguru import logger + +import botspot + +# Load environment variables +load_dotenv() + +# Initialize bot and dispatcher +token = os.getenv("TELEGRAM_BOT_TOKEN") +if not token: + raise ValueError("TELEGRAM_BOT_TOKEN is not set in environment variables") + +bot = Bot(token=token) +dp = Dispatcher() + + +class ChatStates(StatesGroup): + waiting_for_chat = State() + waiting_for_message_count = State() + waiting_for_search_query = State() + + +@dp.message(Command("start")) +async def cmd_start(message: Message): + """Start command handler""" + username = message.from_user.username or message.from_user.first_name + await message.answer( + f"Hello, {username}! I'm a Chat Fetcher demo bot.\n\n" + "I can help you access your Telegram chats and messages using the Telethon API.\n\n" + "Available commands:\n" + "/setup_telethon - Set up Telethon client\n" + "/check_telethon - Check if Telethon client is active\n" + "/list_chats - List your recent chats\n" + "/search_chat - Search for chats by name\n" + "/fetch_messages - Fetch messages from a chat\n" + "/search_messages - Search for messages in a chat\n" + "/help - Show this help message" + ) + + +@dp.message(Command("help")) +async def cmd_help(message: Message): + """Help command handler""" + await message.answer( + "Chat Fetcher Demo Bot Help:\n\n" + "This bot demonstrates the ChatFetcher component from Botspot.\n\n" + "Steps to use this bot:\n" + "1. Use /setup_telethon to authorize the bot to access your Telegram account\n" + "2. Use /list_chats to see your available chats\n" + "3. Try /fetch_messages to get messages from a specific chat\n" + "4. Use /search_chat and /search_messages to find content\n\n" + "Available commands:\n" + "/setup_telethon - Set up Telethon client\n" + "/check_telethon - Check if Telethon client is active\n" + "/list_chats - List your recent chats\n" + "/search_chat - Search for chats by name\n" + "/fetch_messages - Fetch messages from a chat\n" + "/search_messages - Search for messages in a chat\n" + "/help - Show this help message" + ) + + +@dp.message(Command("fetch_messages")) +async def cmd_fetch_messages(message: Message, state: FSMContext): + """Fetch messages from a specific chat""" + # Check if user has Telethon client + from botspot import get_telethon_manager + + try: + telethon_manager = get_telethon_manager() + client = await telethon_manager.get_client(message.from_user.id) + if not client or not await client.is_user_authorized(): + await message.reply( + "You need to set up Telethon client first. Use /setup_telethon command." + ) + return + except Exception as e: + await message.reply(f"Error checking Telethon client: {str(e)}") + return + + await message.reply( + "Please enter chat ID or username to fetch messages from.\n" + "You can use @username format or numeric chat ID." + ) + await state.set_state(ChatStates.waiting_for_chat) + + +@dp.message(ChatStates.waiting_for_chat) +async def process_chat_id(message: Message, state: FSMContext): + """Process chat ID input""" + chat_input = message.text.strip() + + # Store chat input in state + await state.update_data(chat_input=chat_input) + + await message.reply("How many messages do you want to fetch? (Enter a number between 1-50)") + await state.set_state(ChatStates.waiting_for_message_count) + + +@dp.message(ChatStates.waiting_for_message_count) +async def process_message_count(message: Message, state: FSMContext): + """Process message count input and fetch messages""" + try: + count = int(message.text.strip()) + if count < 1 or count > 50: + await message.reply("Please enter a number between 1 and 50.") + return + except ValueError: + await message.reply("Please enter a valid number.") + return + + # Get chat input from state + data = await state.get_data() + chat_input = data.get("chat_input") + + # Reset state + await state.clear() + + # Fetch messages + try: + # Determine if input is username or chat ID + chat_id = chat_input + if chat_input.startswith("@"): + # This is a username, need to resolve + # For simplicity, we'll ask user to provide chat ID directly in this demo + await message.reply( + "Username resolution not implemented in this demo. " + "Please use numeric chat ID. You can find chat IDs using /list_chats command." + ) + return + else: + try: + chat_id = int(chat_input) + except ValueError: + await message.reply("Invalid chat ID format. Please provide a numeric chat ID.") + return + + # Use ChatFetcher to get messages + from botspot import get_chat_fetcher + + chat_fetcher = get_chat_fetcher() + messages = await chat_fetcher.get_messages( + chat_id=chat_id, user_id=message.from_user.id, limit=count + ) + + if not messages: + await message.reply(f"No messages found in chat {chat_id}.") + return + + # Format and send results + result = f"Found {len(messages)} messages in chat {chat_id}:\n\n" + for i, msg in enumerate(messages[:10], 1): # Limit display to 10 messages + date_str = msg.date.strftime("%Y-%m-%d %H:%M:%S") + text = msg.text or "[No text]" + if len(text) > 50: + text = text[:47] + "..." + + result += f"{i}. [{date_str}] {text}\n" + + if len(messages) > 10: + result += f"\n... and {len(messages) - 10} more messages." + + await message.reply(result) + + except Exception as e: + logger.exception(f"Error fetching messages: {e}") + await message.reply(f"Error fetching messages: {str(e)}") + + +@dp.message(Command("search_messages")) +async def cmd_search_messages(message: Message, state: FSMContext): + """Search for messages in a specific chat""" + # Check if user has Telethon client + from botspot import get_telethon_manager + + try: + telethon_manager = get_telethon_manager() + client = await telethon_manager.get_client(message.from_user.id) + if not client or not await client.is_user_authorized(): + await message.reply( + "You need to set up Telethon client first. Use /setup_telethon command." + ) + return + except Exception as e: + await message.reply(f"Error checking Telethon client: {str(e)}") + return + + await message.reply( + "Please enter chat ID to search messages in.\n" "You can use numeric chat ID." + ) + await state.set_state(ChatStates.waiting_for_chat) + + +@dp.message(ChatStates.waiting_for_chat, F.text) +async def process_chat_for_search(message: Message, state: FSMContext): + """Process chat ID input for message search""" + try: + chat_id = int(message.text.strip()) + except ValueError: + await message.reply("Invalid chat ID format. Please provide a numeric chat ID.") + return + + # Store chat ID in state + await state.update_data(chat_id=chat_id) + + await message.reply("What text would you like to search for in messages?") + await state.set_state(ChatStates.waiting_for_search_query) + + +@dp.message(ChatStates.waiting_for_search_query) +async def process_search_query(message: Message, state: FSMContext): + """Process search query and search messages""" + search_query = message.text.strip() + + # Get chat ID from state + data = await state.get_data() + chat_id = data.get("chat_id") + + # Reset state + await state.clear() + + # Search messages + try: + # Use ChatFetcher to search messages + from botspot import get_chat_fetcher + + chat_fetcher = get_chat_fetcher() + messages = await chat_fetcher.find_messages( + query=search_query, chat_id=chat_id, user_id=message.from_user.id, limit=10 + ) + + if not messages: + await message.reply(f"No messages found matching '{search_query}' in chat {chat_id}.") + return + + # Format and send results + result = f"Found {len(messages)} messages matching '{search_query}' in chat {chat_id}:\n\n" + for i, msg in enumerate(messages, 1): + date_str = msg.date.strftime("%Y-%m-%d %H:%M:%S") + text = msg.text or "[No text]" + if len(text) > 50: + text = text[:47] + "..." + + result += f"{i}. [{date_str}] {text}\n" + + await message.reply(result) + + except Exception as e: + logger.exception(f"Error searching messages: {e}") + await message.reply(f"Error searching messages: {str(e)}") + + +async def main(): + """Main function to start the bot""" + # Initialize Botspot + bot_manager = botspot.core.bot_manager.BotManager(bot=bot, dispatcher=dp) + + # Setup Botspot components with the dispatcher + bot_manager.setup_dispatcher(dp) + + # Start polling + logger.info("Starting bot...") + await dp.start_polling(bot) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/components_examples/chat_fetcher_demo/sample.env b/examples/components_examples/chat_fetcher_demo/sample.env new file mode 100644 index 0000000..b6f0e6f --- /dev/null +++ b/examples/components_examples/chat_fetcher_demo/sample.env @@ -0,0 +1,28 @@ +# t.me/your_bot_name +TELEGRAM_BOT_TOKEN= + +# Telethon Manager settings (required for ChatFetcher) +BOTSPOT_TELETHON_MANAGER_ENABLED=True +BOTSPOT_TELETHON_MANAGER_API_ID= +BOTSPOT_TELETHON_MANAGER_API_HASH= +BOTSPOT_TELETHON_MANAGER_SESSIONS_DIR=sessions +BOTSPOT_TELETHON_MANAGER_AUTO_AUTH=True + +# MongoDB settings (optional, for caching) +BOTSPOT_MONGO_DATABASE_ENABLED=True +BOTSPOT_MONGO_DATABASE_CONN_STR=mongodb://localhost:27017 +BOTSPOT_MONGO_DATABASE_DATABASE=botspot_test + +# ChatFetcher settings +BOTSPOT_CHAT_FETCHER_ENABLED=True +BOTSPOT_CHAT_FETCHER_CACHE_ENABLED=True +BOTSPOT_CHAT_FETCHER_COLLECTION_NAME=telegram_messages +BOTSPOT_CHAT_FETCHER_MAX_MESSAGES_PER_REQUEST=100 + +# Optional: Single User Mode +# BOTSPOT_SINGLE_USER_MODE_ENABLED=True +# BOTSPOT_SINGLE_USER_MODE_USER=@your_username + +# Error handling +BOTSPOT_ERROR_HANDLING_ENABLED=True +BOTSPOT_ERROR_HANDLING_DEVELOPER_CHAT_ID=your_chat_id \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 90b2a83..a4873ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "botspot" -version = "0.6.0" +version = "0.6.1" description = "" authors = ["Petr Lavrov "] readme = "README.md" From 5cce9bc1430513d93c3f820c0d4273be741e7fd4 Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 17 Mar 2025 07:56:44 +0300 Subject: [PATCH 2/8] dump changes --- botspot/components/new/chat_fetcher.py | 619 +++++++++++++++++-------- dev/todo.md | 8 +- dev/workalong.md | 17 + dev/workalong_chat_fetcher.md | 111 +++++ pyproject.toml | 1 + 5 files changed, 559 insertions(+), 197 deletions(-) create mode 100644 dev/workalong_chat_fetcher.md diff --git a/botspot/components/new/chat_fetcher.py b/botspot/components/new/chat_fetcher.py index 14e969b..0b10a4d 100644 --- a/botspot/components/new/chat_fetcher.py +++ b/botspot/components/new/chat_fetcher.py @@ -1,16 +1,19 @@ -from datetime import datetime -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union - from aiogram import Dispatcher +from datetime import datetime, timedelta, timezone +from pathlib import Path from pydantic import BaseModel, Field from pydantic_settings import BaseSettings +from telegram_downloader.config import StorageMode +from telegram_downloader.data_model import ChatData +from telegram_downloader.telegram_downloader import TelegramDownloader +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from botspot.utils.internal import get_logger if TYPE_CHECKING: from motor.motor_asyncio import AsyncIOMotorCollection from telethon import TelegramClient + from telethon.types import Message logger = get_logger() @@ -19,7 +22,15 @@ class ChatFetcherSettings(BaseSettings): enabled: bool = False cache_enabled: bool = True collection_name: str = "telegram_messages" + chats_collection_name: str = "telegram_chats" max_messages_per_request: int = 100 + storage_mode: str = "mongo" + recent_threshold_days: int = 30 + + # Chat download limits + backdays: Optional[int] = 30 # How far back to download messages (days) + limit: Optional[int] = 1000 # Maximum number of messages to download per chat + skip_big: bool = True # Skip chats with many participants class Config: env_prefix = "BOTSPOT_CHAT_FETCHER_" @@ -36,6 +47,38 @@ class ChatModel(BaseModel): username: Optional[str] = None type: str = "private" # private, group, channel, etc. last_message_date: Optional[datetime] = None + finished_downloading: bool = False + + @classmethod + def from_chat_data(cls, chat_data: ChatData) -> "ChatModel": + """Convert ChatData from telegram-downloader to ChatModel""" + chat_type = chat_data.entity_category + + # Get username if available + username = None + if hasattr(chat_data.entity, "username"): + username = chat_data.entity.username + + # Get title or generate from name + if hasattr(chat_data.entity, "title"): + title = chat_data.entity.title + else: + # For users, create title from first/last name + name_parts = [] + if hasattr(chat_data.entity, "first_name") and chat_data.entity.first_name: + name_parts.append(chat_data.entity.first_name) + if hasattr(chat_data.entity, "last_name") and chat_data.entity.last_name: + name_parts.append(chat_data.entity.last_name) + title = " ".join(name_parts) if name_parts else f"Chat {chat_data.id}" + + return cls( + id=chat_data.id, + title=title, + username=username, + type=chat_type, + last_message_date=chat_data.last_message_date, + finished_downloading=chat_data.finished_downloading, + ) class MessageModel(BaseModel): @@ -52,73 +95,150 @@ class MessageModel(BaseModel): file_path: Optional[str] = None metadata: Dict[str, Any] = Field(default_factory=dict) + @classmethod + def from_telethon_message(cls, message: "Message", chat_id: int) -> "MessageModel": + """Convert Telethon message to MessageModel""" + # Determine media type + media_type = None + has_media = False + + if hasattr(message, "media") and message.media: + has_media = True + media_type = type(message.media).__name__ + + return cls( + id=message.id, + chat_id=chat_id, + text=message.text if hasattr(message, "text") else None, + date=message.date, + from_id=( + message.from_id.user_id if hasattr(message, "from_id") and message.from_id else None + ), + reply_to_message_id=( + message.reply_to.reply_to_msg_id + if hasattr(message, "reply_to") and message.reply_to + else None + ), + media=has_media, + media_type=media_type, + ) + class ChatFetcher: - """Component for fetching and caching Telegram chats and messages""" + """Component for fetching and caching Telegram chats and messages using telegram-downloader""" def __init__( self, settings: ChatFetcherSettings, - collection: Optional["AsyncIOMotorCollection"] = None, + messages_collection: Optional["AsyncIOMotorCollection"] = None, + chats_collection: Optional["AsyncIOMotorCollection"] = None, single_user_id: Optional[str] = None, ): self.settings = settings - self.collection = collection + self.messages_collection = messages_collection + self.chats_collection = chats_collection self.single_user_id = single_user_id + self._telegram_downloader = None + self._telethon_client = None logger.info(f"ChatFetcher initialized with cache_enabled={settings.cache_enabled}") + @property + def storage_mode(self) -> StorageMode: + """Get storage mode (mongo or local)""" + if self.settings.storage_mode.lower() == "mongo": + return StorageMode.MONGO + else: + return StorageMode.LOCAL + + async def get_telegram_downloader(self, user_id: Optional[int] = None) -> TelegramDownloader: + """Get or create TelegramDownloader instance""" + if self._telegram_downloader is not None: + return self._telegram_downloader + + # Resolve user_id + resolved_user_id = await self._resolve_user_id(user_id) + + # Configure environment for TelegramDownloader + from botspot.core.dependency_manager import get_dependency_manager + + deps = get_dependency_manager() + telegram_settings = deps.botspot_settings.telethon_manager + mongo_settings = deps.botspot_settings.mongo_database + + # Create config for TelegramDownloader + config = { + "storage_mode": self.storage_mode.value, + "TELEGRAM_API_ID": telegram_settings.api_id, + "TELEGRAM_API_HASH": telegram_settings.api_hash, + "TELEGRAM_USER_ID": str(resolved_user_id), + } + + if self.storage_mode == StorageMode.MONGO and mongo_settings.enabled: + config.update( + { + "MONGO_CONN_STR": mongo_settings.mongo_uri, + "MONGO_DB_NAME": mongo_settings.db_name, + "MONGO_MESSAGES_COLLECTION": self.settings.collection_name, + "MONGO_CHATS_COLLECTION": self.settings.chats_collection_name, + } + ) + + # Create downloader + self._telegram_downloader = TelegramDownloader(**config) + return self._telegram_downloader + async def get_client(self, user_id: Optional[int] = None) -> "TelegramClient": """Get telethon client for the specified user or for single user mode""" + if self._telethon_client is not None: + return self._telethon_client + + # Try to get client from TelegramDownloader + try: + downloader = await self.get_telegram_downloader(user_id) + self._telethon_client = await downloader.get_telethon_client() + return self._telethon_client + except Exception as e: + logger.error(f"Error getting client from TelegramDownloader: {e}") + + # Fallback to telethon_manager from botspot.components.main.telethon_manager import get_telethon_manager - from botspot.core.dependency_manager import get_dependency_manager - deps = get_dependency_manager() telethon_manager = get_telethon_manager() + resolved_user_id = await self._resolve_user_id(user_id) + self._telethon_client = await telethon_manager.get_client(resolved_user_id) + return self._telethon_client - # If user_id is not provided and single user mode is enabled, use the single user - if user_id is None: - if not deps.botspot_settings.single_user_mode.enabled: - raise ValueError("User ID must be provided when not in single user mode") - user = deps.botspot_settings.single_user_mode.user - # Convert username to numeric ID if needed - if user.startswith("@"): - # TODO: Implement username to ID conversion - raise NotImplementedError("Username conversion not implemented yet") - user_id = int(user) + async def _resolve_user_id(self, user_id: Optional[int] = None) -> int: + """Resolve user_id, using single user mode if needed""" + if user_id is not None: + return user_id - return await telethon_manager.get_client(user_id) + from botspot.core.dependency_manager import get_dependency_manager + + deps = get_dependency_manager() + if not deps.botspot_settings.single_user_mode.enabled: + raise ValueError("User ID must be provided when not in single user mode") + + user = deps.botspot_settings.single_user_mode.user + # Convert username to numeric ID if needed + if user.startswith("@"): + # TODO: Implement username to ID conversion + raise NotImplementedError("Username conversion not implemented yet") + return int(user) async def get_chats(self, user_id: Optional[int] = None, limit: int = 100) -> List[ChatModel]: - """Get chats for the specified user or for single user mode""" - client = await self.get_client(user_id) + """Get chats for the specified user or for single user mode using telegram-downloader""" + downloader = await self.get_telegram_downloader(user_id) - # Get dialogs from Telethon - dialogs = await client.get_dialogs(limit=limit) + # Use telegram-downloader to get chats + chat_data_list = await downloader._get_chats() - # Convert to ChatModel - chats = [] - for dialog in dialogs: - entity = dialog.entity - - # Determine chat type - chat_type = "private" - if hasattr(entity, "megagroup") and entity.megagroup: - chat_type = "group" - elif hasattr(entity, "broadcast") and entity.broadcast: - chat_type = "channel" - - chat = ChatModel( - id=dialog.id, - title=dialog.title, - username=getattr(entity, "username", None), - type=chat_type, - last_message_date=dialog.date, - ) - chats.append(chat) + # Limit the number of chats if needed + if limit: + chat_data_list = chat_data_list[:limit] - # Cache the chat if caching is enabled - if self.settings.cache_enabled and self.collection: - await self._cache_chat(chat) + # Convert to ChatModel + chats = [ChatModel.from_chat_data(chat) for chat in chat_data_list] return chats @@ -126,14 +246,36 @@ async def find_chat( self, query: str, user_id: Optional[int] = None, limit: int = 5 ) -> List[ChatModel]: """Find chats by title or username""" - # First try to get from cache - if self.settings.cache_enabled and self.collection: - cached_chats = await self._find_chats_in_cache(query, limit) - if cached_chats: - return cached_chats + downloader = await self.get_telegram_downloader(user_id) + + # Try to find chats using telegram-downloader's built-in functions + try: + # Get all chats and filter locally + chats_data = await downloader._get_chats() + + # Filter chats by query in name + query = query.lower() + filtered_chats = [] + + for chat in chats_data: + chat_name = chat.name.lower() + if query in chat_name: + filtered_chats.append(chat) + + # Convert to ChatModel and limit + return [ChatModel.from_chat_data(chat) for chat in filtered_chats[:limit]] + + except Exception as e: + logger.error(f"Error finding chats in telegram-downloader: {e}") + + # Fallback to original cache method + if self.settings.cache_enabled and self.messages_collection: + cached_chats = await self._find_chats_in_cache(query, limit) + if cached_chats: + return cached_chats - # If not found in cache, get all chats and filter - chats = await self.get_chats(user_id, limit=100) # Get more to search through + # If all else fails, get all chats and filter manually + chats = await self.get_chats(user_id, limit=100) # Filter by title or username query = query.lower() @@ -155,61 +297,82 @@ async def get_messages( max_id: Optional[int] = None, reverse: bool = False, ) -> List[MessageModel]: - """Get messages from a specific chat""" - client = await self.get_client(user_id) + """Get messages from a specific chat using telegram-downloader""" + downloader = await self.get_telegram_downloader(user_id) - # Try to get from cache first if appropriate - if ( - self.settings.cache_enabled - and self.collection - and not min_id - and not max_id - and offset_id == 0 - ): - cached_messages = await self._get_cached_messages(chat_id, limit) - if cached_messages: - return cached_messages + # Get chat entity + chats = await downloader._get_chats() + chat_data = next((chat for chat in chats if chat.id == chat_id), None) - # Limit the number of messages per request - limit = min(limit, self.settings.max_messages_per_request) + if not chat_data: + logger.error(f"Chat with ID {chat_id} not found") + return [] - # Get messages from Telethon - messages = await client.get_messages( - chat_id, limit=limit, offset_id=offset_id, min_id=min_id, max_id=max_id, reverse=reverse - ) + # Use telegram-downloader to load messages + kwargs = {"limit": min(limit, self.settings.max_messages_per_request)} - # Convert to MessageModel - result = [] - for msg in messages: - # Determine media type - media_type = None - has_media = False - - if hasattr(msg, "media") and msg.media: - has_media = True - media_type = type(msg.media).__name__ - - message = MessageModel( - id=msg.id, - chat_id=chat_id, - text=msg.text if hasattr(msg, "text") else None, - date=msg.date, - from_id=msg.from_id.user_id if hasattr(msg, "from_id") and msg.from_id else None, - reply_to_message_id=( - msg.reply_to.reply_to_msg_id - if hasattr(msg, "reply_to") and msg.reply_to - else None - ), - media=has_media, - media_type=media_type, + if offset_id: + kwargs["offset_id"] = offset_id + + if min_id: + kwargs["min_id"] = min_id + + if max_id: + kwargs["max_id"] = max_id + + if reverse: + kwargs["reverse"] = reverse + + try: + messages = await downloader._load_messages(chat_data, **kwargs) + + # Convert to MessageModel + result = [] + for msg in messages: + message = MessageModel.from_telethon_message(msg, chat_id) + result.append(message) + + return result + + except Exception as e: + logger.error(f"Error loading messages with telegram-downloader: {e}") + + # Fallback to direct Telethon access + client = await self.get_client(user_id) + + # Try to get from cache first if appropriate + if ( + self.settings.cache_enabled + and self.messages_collection + and not min_id + and not max_id + and offset_id == 0 + ): + cached_messages = await self._get_cached_messages(chat_id, limit) + if cached_messages: + return cached_messages + + # Get messages from Telethon + messages = await client.get_messages( + chat_id, + limit=limit, + offset_id=offset_id, + min_id=min_id, + max_id=max_id, + reverse=reverse, ) - result.append(message) - # Cache the message if caching is enabled - if self.settings.cache_enabled and self.collection: - await self._cache_message(message) + # Convert to MessageModel + result = [] + for msg in messages: + message = MessageModel.from_telethon_message(msg, chat_id) + result.append(message) - return result + # Cache the message + if self.settings.cache_enabled and self.messages_collection: + await self._cache_message(message) + + return result async def find_messages( self, @@ -221,7 +384,7 @@ async def find_messages( ) -> List[MessageModel]: """Find messages by text content""" # First try to search in cache - if search_in_cached and self.settings.cache_enabled and self.collection: + if search_in_cached and self.settings.cache_enabled and self.messages_collection: cached_messages = await self._find_messages_in_cache(query, chat_id, limit) if cached_messages: return cached_messages @@ -233,32 +396,11 @@ async def find_messages( # Convert to MessageModel result = [] for msg in messages: - # Determine media type - media_type = None - has_media = False - - if hasattr(msg, "media") and msg.media: - has_media = True - media_type = type(msg.media).__name__ - - message = MessageModel( - id=msg.id, - chat_id=chat_id, - text=msg.text if hasattr(msg, "text") else None, - date=msg.date, - from_id=msg.from_id.user_id if hasattr(msg, "from_id") and msg.from_id else None, - reply_to_message_id=( - msg.reply_to.reply_to_msg_id - if hasattr(msg, "reply_to") and msg.reply_to - else None - ), - media=has_media, - media_type=media_type, - ) + message = MessageModel.from_telethon_message(msg, chat_id) result.append(message) # Cache the message if caching is enabled - if self.settings.cache_enabled and self.collection: + if self.settings.cache_enabled and self.messages_collection: await self._cache_message(message) return result @@ -283,61 +425,102 @@ async def download_media( file_path = await message.download(file=target_path) # Update the cache with file path - if self.settings.cache_enabled and self.collection and file_path: + if self.settings.cache_enabled and self.messages_collection and file_path: await self._update_message_file_path(message_id, chat_id, str(file_path)) return str(file_path) if file_path else None + async def load_chat_history( + self, + chat_id: int, + user_id: Optional[int] = None, + backdays: Optional[int] = None, + limit: Optional[int] = None, + ignore_finished: bool = False, + ) -> List[MessageModel]: + """ + Load full chat history using telegram-downloader's advanced features + + This method uses telegram-downloader to efficiently download all messages from a chat, + with support for incremental updates, pagination, and rate limiting. + + Args: + chat_id: ID of the chat to download messages from + user_id: User ID to use for authentication (optional if single_user_mode is enabled) + backdays: Number of days to look back (default from settings) + limit: Maximum messages to download (default from settings) + ignore_finished: Whether to ignore the "finished_downloading" flag + + Returns: + List of downloaded messages as MessageModel objects + """ + downloader = await self.get_telegram_downloader(user_id) + + # Get chat entity + chats = await downloader._get_chats() + chat_data = next((chat for chat in chats if chat.id == chat_id), None) + + if not chat_data: + logger.error(f"Chat with ID {chat_id} not found") + return [] + + # Configure download limits + from telegram_downloader.config import ChatCategoryConfig + + chat_config = ChatCategoryConfig( + enabled=True, + backdays=backdays or self.settings.backdays, + limit=limit or self.settings.limit, + skip_big=self.settings.skip_big, + ) + + # Download messages + raw_messages = await downloader._download_messages( + chat_data, chat_config, ignore_finished=ignore_finished + ) + + # Convert to MessageModel + result = [] + for msg in raw_messages: + try: + message = MessageModel.from_telethon_message(msg, chat_id) + result.append(message) + except Exception as e: + logger.error(f"Error converting message {msg.id}: {e}") + + return result + # Cache-related methods async def _cache_chat(self, chat: ChatModel) -> None: """Cache chat information""" - if not self.collection: + if not self.chats_collection: return try: - # Check if the chat already exists - existing = await self.collection.find_one({"type": "chat", "chat_id": chat.id}) - + # Use dict() to convert to dictionary chat_dict = chat.dict() - chat_dict["type"] = "chat" - chat_dict["chat_id"] = chat.id - if existing: - # Update existing chat - await self.collection.update_one( - {"type": "chat", "chat_id": chat.id}, {"$set": chat_dict} - ) - else: - # Insert new chat - await self.collection.insert_one(chat_dict) + # Upsert the chat + await self.chats_collection.update_one( + {"id": chat.id}, {"$set": chat_dict}, upsert=True + ) except Exception as e: logger.error(f"Error caching chat: {e}") async def _cache_message(self, message: MessageModel) -> None: """Cache message information""" - if not self.collection: + if not self.messages_collection: return try: - # Check if the message already exists - existing = await self.collection.find_one( - {"type": "message", "message_id": message.id, "chat_id": message.chat_id} - ) - + # Convert to dict and add metadata for easier querying msg_dict = message.dict() - msg_dict["type"] = "message" - msg_dict["message_id"] = message.id - - if existing: - # Update existing message - await self.collection.update_one( - {"type": "message", "message_id": message.id, "chat_id": message.chat_id}, - {"$set": msg_dict}, - ) - else: - # Insert new message - await self.collection.insert_one(msg_dict) + + # Upsert the message + await self.messages_collection.update_one( + {"id": message.id, "chat_id": message.chat_id}, {"$set": msg_dict}, upsert=True + ) except Exception as e: logger.error(f"Error caching message: {e}") @@ -346,12 +529,12 @@ async def _update_message_file_path( self, message_id: int, chat_id: int, file_path: str ) -> None: """Update file path in cached message""" - if not self.collection: + if not self.messages_collection: return try: - await self.collection.update_one( - {"type": "message", "message_id": message_id, "chat_id": chat_id}, + await self.messages_collection.update_one( + {"id": message_id, "chat_id": chat_id}, {"$set": {"file_path": file_path}}, ) except Exception as e: @@ -359,23 +542,19 @@ async def _update_message_file_path( async def _get_cached_messages(self, chat_id: int, limit: int) -> List[MessageModel]: """Get cached messages for a chat""" - if not self.collection: + if not self.messages_collection: return [] try: cursor = ( - self.collection.find({"type": "message", "chat_id": chat_id}) - .sort("date", -1) - .limit(limit) + self.messages_collection.find({"chat_id": chat_id}).sort("date", -1).limit(limit) ) messages = [] async for doc in cursor: - # Remove the "type" field added for cache - if "type" in doc: - doc.pop("type") - if "message_id" in doc: - doc.pop("message_id") + # Remove the MongoDB _id field + if "_id" in doc: + doc.pop("_id") messages.append(MessageModel(**doc)) @@ -387,13 +566,12 @@ async def _get_cached_messages(self, chat_id: int, limit: int) -> List[MessageMo async def _find_chats_in_cache(self, query: str, limit: int) -> List[ChatModel]: """Find chats in cache by title or username""" - if not self.collection: + if not self.chats_collection: return [] try: - cursor = self.collection.find( + cursor = self.chats_collection.find( { - "type": "chat", "$or": [ {"title": {"$regex": query, "$options": "i"}}, {"username": {"$regex": query, "$options": "i"}}, @@ -403,11 +581,9 @@ async def _find_chats_in_cache(self, query: str, limit: int) -> List[ChatModel]: chats = [] async for doc in cursor: - # Remove the "type" field added for cache - if "type" in doc: - doc.pop("type") - if "chat_id" in doc: - doc.pop("chat_id") + # Remove the MongoDB _id field + if "_id" in doc: + doc.pop("_id") chats.append(ChatModel(**doc)) @@ -421,21 +597,19 @@ async def _find_messages_in_cache( self, query: str, chat_id: int, limit: int ) -> List[MessageModel]: """Find messages in cache by text content""" - if not self.collection: + if not self.messages_collection: return [] try: - cursor = self.collection.find( - {"type": "message", "chat_id": chat_id, "text": {"$regex": query, "$options": "i"}} + cursor = self.messages_collection.find( + {"chat_id": chat_id, "text": {"$regex": query, "$options": "i"}} ).limit(limit) messages = [] async for doc in cursor: - # Remove the "type" field added for cache - if "type" in doc: - doc.pop("type") - if "message_id" in doc: - doc.pop("message_id") + # Remove the MongoDB _id field + if "_id" in doc: + doc.pop("_id") messages.append(MessageModel(**doc)) @@ -503,6 +677,61 @@ async def search_chat_command(message: Message) -> None: logger.error(f"Error in search_chat_command: {e}") await message.reply(f"Error searching chats: {str(e)}") + @add_command("download_chat", "Download all messages from a chat", visibility="hidden") + @dp.message(Command("download_chat")) + async def download_chat_command(message: Message) -> None: + try: + parts = message.text.split(maxsplit=2) + if len(parts) < 2: + await message.reply( + "Please provide a chat ID or search query, e.g. /download_chat 123456789 or /download_chat Python Group" + ) + return + + chat_fetcher = get_chat_fetcher() + + # Try to parse chat_id directly + try: + chat_id = int(parts[1]) + chat_name = str(chat_id) + limit = int(parts[2]) if len(parts) > 2 else None + except ValueError: + # If not an ID, treat as a search query + query = parts[1] + chats = await chat_fetcher.find_chat(query, message.from_user.id, limit=1) + if not chats: + await message.reply(f"No chats found matching '{query}'.") + return + chat_id = chats[0].id + chat_name = chats[0].title + limit = int(parts[2]) if len(parts) > 2 else None + + # Send initial response + status_message = await message.reply( + f"Starting download of messages from '{chat_name}'..." + ) + + # Use the load_chat_history method from telegram-downloader + messages = await chat_fetcher.load_chat_history( + chat_id=chat_id, user_id=message.from_user.id, limit=limit + ) + + # Send completion message + if messages: + await status_message.edit_text( + f"Successfully downloaded {len(messages)} messages from '{chat_name}'.\n\n" + f"Date range: {min(msg.date for msg in messages).strftime('%d %b %Y')} - " + f"{max(msg.date for msg in messages).strftime('%d %b %Y')}" + ) + else: + await status_message.edit_text( + f"No messages found in '{chat_name}' or download was skipped." + ) + + except Exception as e: + logger.error(f"Error in download_chat_command: {e}") + await message.reply(f"Error downloading chat: {str(e)}") + return dp @@ -512,21 +741,25 @@ def initialize(settings: ChatFetcherSettings) -> ChatFetcher: deps = get_dependency_manager() - # Get MongoDB collection if available - collection = None + # Get MongoDB collections if available + messages_collection = None + chats_collection = None if settings.cache_enabled and deps.botspot_settings.mongo_database.enabled: try: - collection = deps.mongo_database[settings.collection_name] - logger.info(f"ChatFetcher using MongoDB collection: {settings.collection_name}") + messages_collection = deps.mongo_database[settings.collection_name] + chats_collection = deps.mongo_database[settings.chats_collection_name] + logger.info( + f"ChatFetcher using MongoDB collections: {settings.collection_name} and {settings.chats_collection_name}" + ) except Exception as e: - logger.error(f"Error getting MongoDB collection: {e}") + logger.error(f"Error getting MongoDB collections: {e}") # Get single user mode settings if enabled single_user_id = None if deps.botspot_settings.single_user_mode.enabled: single_user_id = deps.botspot_settings.single_user_mode.user - return ChatFetcher(settings, collection, single_user_id) + return ChatFetcher(settings, messages_collection, chats_collection, single_user_id) def get_chat_fetcher() -> ChatFetcher: diff --git a/dev/todo.md b/dev/todo.md index 80d9e5b..66581da 100644 --- a/dev/todo.md +++ b/dev/todo.md @@ -39,10 +39,10 @@ Notes: - Implement caching to work around Telegram API limitations -- Consider importing telegram_downloader as dependency vs. moving code -- Reference implementation in projects/telegram_downloader -- Draft in calmmage/seasonal/season_1_winter_2024/draft/download-telegram-messages -- Should adapt telegram_downloader to use custom DB and telethon client +- Imported telegram_downloader as dependency +- Reference implementation from projects/telegram_downloader +- Adapted telegram_downloader to use custom DB and telethon client +- Added advanced load_chat_history method with proper incremental updates ## LLM Provider - ✅ DONE diff --git a/dev/workalong.md b/dev/workalong.md index e69de29..043228e 100644 --- a/dev/workalong.md +++ b/dev/workalong.md @@ -0,0 +1,17 @@ +What is the plan? + +Idea 1: just make a bot and run it + +- Done! [basic_bot](../examples/basic_bot) + +idea 2: add 'register post link on startup' util +idea 3: try teletalk + new-bot-lib +idea 4: commit +Idea 5: Chat Fetcher - Adapt from existing telegram_downloader implementation + +## Task: Integrate telegram-downloader into chat_fetcher component + +- Use telegram-downloader from GitHub as dependency (added to pyproject.toml) +- Adapt the chat_fetcher.py to use telegram-downloader features +- Ensure compatibility with MongoDB caching and Telethon client +- Update the component to leverage advanced features from telegram-downloader diff --git a/dev/workalong_chat_fetcher.md b/dev/workalong_chat_fetcher.md new file mode 100644 index 0000000..698ed85 --- /dev/null +++ b/dev/workalong_chat_fetcher.md @@ -0,0 +1,111 @@ +# Chat Fetcher Component Integration + +## Task + +Integrate telegram-downloader as a dependency into the chat_fetcher component, similar +to how calmlib is integrated. + +## Current Implementation Status + +1. Added telegram-downloader as a dependency in pyproject.toml: + +``` +telegram-downloader = { git = "https://github.com/calmmage/telegram-downloader.git", branch = "main" } +``` + +2. Enhanced ChatFetcherSettings with telegram-downloader specific configuration: + +- Added separate collections for messages and chats +- Added configuration for storage mode +- Added parameters for download limits (backdays, message limits, etc.) + +3. Created a wrapper around telegram-downloader in the ChatFetcher class: + +- Created conversion methods between ChatData and ChatModel +- Added methods to initialize and use TelegramDownloader +- Implemented fallback mechanisms for reliability +- Added separate messages and chats collections + +## Current Limitations + +Currently, the integration has a major limitation: telegram-downloader doesn't support: + +- Accepting a custom MongoDB connection +- Using an external Telethon client + +Instead, it creates its own connections internally, which creates inefficiency and +potential conflicts. + +## Required Changes to telegram-downloader + +To properly integrate telegram-downloader with botspot, we need to modify the +telegram-downloader package: + +1. In `TelegramDownloader.__init__()`: + - Add parameters for external Telethon client: + `external_client: Optional[TelegramClient] = None` + - Add parameters for external MongoDB database: + `external_db: Optional[Database] = None` + +2. In `get_telethon_client()` method: + - Check if an external client was provided and use it if available + - Fall back to creating a new client only if needed + +3. In database-related methods: + - Use the external database connection if provided + - Only create a new connection if not provided + +4. Update the `TelethonClientManager` class: + - Add support for using an existing client + - Add method to register an external client + +## Implementation Plan + +1. Create a fork of telegram-downloader +2. Make the necessary modifications: + - Update TelegramDownloader class + - Update TelethonClientManager + - Add tests for external client/DB support +3. Submit a pull request to the original repository +4. Update the botspot chat_fetcher component to use these new features + +## Example Code Changes (Draft) + +```python +# In telegram_downloader.py +def __init__(self, config_path: Path | str = Path("config.yaml"), + external_client: Optional[TelegramClient] = None, + external_db: Optional[Database] = None, + **kwargs): + self.env = TelegramDownloaderEnvSettings(**kwargs) + + config_path = Path(config_path) + self.config = TelegramDownloaderConfig.from_yaml(config_path, **kwargs) + + self._external_client = external_client + self._external_db = external_db + self._db = None + self._telethon_client = None + self.reset_properties() + + +async def get_telethon_client(self) -> TelegramClient: + if self._telethon_client is not None: + return self._telethon_client + + if self._external_client is not None: + self._telethon_client = self._external_client + return self._telethon_client + + # Original code for creating a client... +``` + +## Next Steps + +1. Discuss this approach - confirm if this matches the intended integration +2. Determine whether to: + - Fork telegram-downloader and implement these changes + - Create a pull request to upstream repository + - Or use a simpler approach like copying select code +3. Test the implementation with a real bot +4. Document the API and provide usage examples \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index a4873ab..32b7134 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ packages=[ [tool.poetry.dependencies] python = ">=3.11,<4.0" calmlib = { git = "https://github.com/calmmage/calmlib.git", branch = "main" } +telegram-downloader = { git = "https://github.com/calmmage/telegram-downloader.git", branch = "main" } loguru = ">=0.7" # for parsing version toml = ">=0.10" From 2bd4be30c6bd4f0e33e5790a8204f5cf9f217525 Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Fri, 21 Mar 2025 03:35:07 +0300 Subject: [PATCH 3/8] update CLAUDE.md --- CLAUDE.md | 1 - dev/workalong_chat_fetcher.md | 50 ++++++++++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 327d0bc..3dfad56 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,4 +1,3 @@ - # Botspot Development Guide ## Build & Test Commands diff --git a/dev/workalong_chat_fetcher.md b/dev/workalong_chat_fetcher.md index 698ed85..34b8fae 100644 --- a/dev/workalong_chat_fetcher.md +++ b/dev/workalong_chat_fetcher.md @@ -102,10 +102,48 @@ async def get_telethon_client(self) -> TelegramClient: ## Next Steps -1. Discuss this approach - confirm if this matches the intended integration -2. Determine whether to: - - Fork telegram-downloader and implement these changes - - Create a pull request to upstream repository - - Or use a simpler approach like copying select code +1. Continue work on the botspot-integration branch of the telegram-downloader repo +2. Implement these changes: + - Add external client support to TelegramDownloader + - Add external database support to TelegramDownloader + - Update TelethonClientManager to support external clients 3. Test the implementation with a real bot -4. Document the API and provide usage examples \ No newline at end of file +4. Document the API and provide usage examples + +## Implementation Tasks + +1. Modify `TelegramDownloader.__init__()`: + ```python + def __init__(self, + config_path: Path | str = Path("config.yaml"), + external_client: Optional[TelegramClient] = None, + external_db: Optional[Database] = None, + **kwargs): + ``` + +2. Update `get_telethon_client()` to use the external client if provided + ```python + async def get_telethon_client(self) -> TelegramClient: + if self._telethon_client is not None: + return self._telethon_client + + if self._external_client is not None: + self._telethon_client = self._external_client + return self._telethon_client + + # Original code for creating a client... + ``` + +3. Update the database property to use external_db if provided + ```python + @property + def db(self): + if self._external_db is not None: + return self._external_db + + if self._db is None: + self._db = self._get_database() + return self._db + ``` + +4. Finally, update the botspot chat_fetcher to use these new features once they're implemented \ No newline at end of file From 1980dbe088708d7dfcc598b570d8efa97b5efb20 Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 24 Mar 2025 01:36:34 +0300 Subject: [PATCH 4/8] chat fetcher: Spec and workalong --- dev/workalong_chat_fetcher.md | 23 +++++++- dev/workalong_chat_fetcher/rework_protocol.md | 58 ++++++++++++++++++ dev/workalong_chat_fetcher/spec.md | 20 +++++++ dev/workalong_chat_fetcher/workalong.md | 59 +++++++++++++++++++ 4 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 dev/workalong_chat_fetcher/rework_protocol.md create mode 100644 dev/workalong_chat_fetcher/spec.md create mode 100644 dev/workalong_chat_fetcher/workalong.md diff --git a/dev/workalong_chat_fetcher.md b/dev/workalong_chat_fetcher.md index 34b8fae..207e180 100644 --- a/dev/workalong_chat_fetcher.md +++ b/dev/workalong_chat_fetcher.md @@ -26,6 +26,10 @@ telegram-downloader = { git = "https://github.com/calmmage/telegram-downloader.g - Implemented fallback mechanisms for reliability - Added separate messages and chats collections +4. Created a demo bot (chat_analyzer_demo) that showcases the main features: + - `/ingest [chat_id]` to load messages from a chat into MongoDB cache + - `/stats` to get statistics about the cached messages + ## Current Limitations Currently, the integration has a major limitation: telegram-downloader doesn't support: @@ -146,4 +150,21 @@ async def get_telethon_client(self) -> TelegramClient: return self._db ``` -4. Finally, update the botspot chat_fetcher to use these new features once they're implemented \ No newline at end of file +4. Finally, update the botspot chat_fetcher to use these new features once they're implemented + +## Demo Bots + +### 1. Chat Fetcher Demo (chat_fetcher_demo) +This demo shows the basic ChatFetcher functionality: +- Listing and searching chats +- Fetching messages from chats +- Searching message content + +### 2. Chat Analyzer Demo (chat_analyzer_demo) +This demo focuses on caching and analysis: +- `/ingest [chat_id]` - Loads messages from a specific chat into MongoDB +- `/stats` - Shows statistics about cached chats (message count, last message) +- Efficiently handles data by only downloading new messages when ingesting + +The Chat Analyzer demo demonstrates the "name of the game is caching" concept - ingesting data once +into MongoDB and then performing all operations on the cached data for speed. \ No newline at end of file diff --git a/dev/workalong_chat_fetcher/rework_protocol.md b/dev/workalong_chat_fetcher/rework_protocol.md new file mode 100644 index 0000000..e28f6e4 --- /dev/null +++ b/dev/workalong_chat_fetcher/rework_protocol.md @@ -0,0 +1,58 @@ +This is a protocol for refactoring / reworking code components with claude-code or other +ai tools. +The main goal of this protocol is to ensure the codebase doesn't bloat and grow riddled +with duplicated code during AI refactoring / reworks. + +# Pre-rework + +- Commit. Make sure everything is committed + - Optional: Create a new branch for the rework +- Optional: Check tests, if present. + +# Simple protocol: + +If the task is clear, go directly to coding phase + +# Complex protocol: + +If there's a complex task - first, read spec.md or ask user for a list of requirements + +- After that, evaluate the current solution on the match with specs + - Which additional featuers were implemented that are not present in the spec + - What is the likely reason were they added + - + - Which components / features explicitly listed in the spec are missing + - How difficult it is to add this + - write to workalong.md + - proceed to coding + +## Coding: + +- Before coding, lay out a plan to the user in clear terms. + - Which components / features will be added + - Which modified + - Which moved / removed + - Make an explicit enumeration for user to specify which steps are approved and + which are declined + - Each item should be formulated in as simple terms as possible, 1 line maximum per + item, not longer than a few words +- Always remove duplicate code or alternative / previous implementations of the same + feature + - After making a change, call git diff and make sure file is in a desired target + state and the changes you planned are correctly reflected +- proceed with implementing each item one by one and track progress with checkboxes in + workalong.md + - [x] item 1 - keep to original item descriptions, DO NOT ADD SUB-ITEMS. List which + files were affected for this feature. + +AI Issue resolution + +1) Failed deletions + Sometimes AI applier fails to delete code according to instructions + This results in really confusing situations with duplicate functions / methods + present in multiple places across the codebase + To mitigate that + +- Check file diff after each modifications and make sure it reflects the changes you've + made + diff --git a/dev/workalong_chat_fetcher/spec.md b/dev/workalong_chat_fetcher/spec.md new file mode 100644 index 0000000..e119a43 --- /dev/null +++ b/dev/workalong_chat_fetcher/spec.md @@ -0,0 +1,20 @@ +# Chat Fetcher + +- **Main Feature**: Fetches Telegram chat messages with caching in MongoDB to avoid re-downloading the same messages. +- **Demo Bot**: `/ingest [chat_id]` - Loads messages from the specified chat into MongoDB (skipping already-cached ones), then `/stats` instantly returns message count and last message from the cache. +- **Useful Bot**: **Chat Analyzer** - Ingests a chat once with `/ingest`, caches it, and provides instant stats (e.g., active users, message frequency) or summaries via LLM, all from the cached data. + +**Notes**: The "name of the game is caching"—so `/ingest` checks MongoDB first, only fetching new messages (e.g., by comparing timestamps or message IDs). Subsequent operations like stats pull directly from the cache for speed. + +## Developer Notes + +0) this is painful. or not. I already have several drafts at it + +1) not sure how to start - i can simply implement this standalone as simple feature on top of telethon. or i can use my lib telegram-downloader - beacuse i already did a lot of work there, including caching. For small bot would be better to start small, feel that I can do it. On the other hand. i need to fix and use telegram-downloader. There is no way around using cache, so I will need to do that, and I don't want to do it a second time - why would I? + +2) main features - get_chat_messages, get_chats, find_chat, find_messages - and corresponding telegram command handlers (setup visibility through settings) + +3) this should be on top of telethon manager - get client from it and pass all relevant arguments + +4) link of telegram downloader: /Users/petrlavrov/work/projects/telegram-downloader +IMPORT AND USE IT FOR ALL FEATURES!!! \ No newline at end of file diff --git a/dev/workalong_chat_fetcher/workalong.md b/dev/workalong_chat_fetcher/workalong.md new file mode 100644 index 0000000..7393842 --- /dev/null +++ b/dev/workalong_chat_fetcher/workalong.md @@ -0,0 +1,59 @@ + +# Chat Fetcher Implementation Analysis + +## Features Not in Original Spec But Implemented + +### Settings +1. `storage_mode` and related functionality for different storage methods besides MongoDB +2. `skip_big` flag for skipping chats with many participants +3. `recent_threshold_days` setting +4. Detailed configuration for chat download limits (`backdays`, `limit`) + +### Models +1. `ChatModel` with detailed fields beyond what was needed for basic caching + - `finished_downloading` field + - `type` classification (private, group, channel) + - Conversion methods from telegram-downloader objects + +2. `MessageModel` with fields beyond basic text caching: + - Media handling (`media`, `media_type`, `file_path`) + - Detailed metadata storage + - Reply tracking + - Conversion methods from telethon messages + +### Methods +1. `download_media` - Media file downloading and caching +2. `_update_message_file_path` - File path updating in cached messages +3. Complex fallback mechanisms when telegram-downloader operations fail +4. `find_chat` and `find_messages` - Search functionality +5. `get_chats` - Listing all available chats + +### Commands +1. `/list_chats` and `/search_chat` commands (not in original spec) +2. `/download_chat` command (beyond the simpler `/ingest` in spec) + +## Features Mentioned in Spec But Not Fully Implemented + +1. Chat Analyzer functionality: + - No implementation for "active users" statistics + - No implementation for "message frequency" statistics + - No LLM integration for chat summaries (though LLM Provider exists) + +2. The command handlers in the chat_analyzer_demo are simplified compared to what was described: + - Statistics are basic (just message counts and last messages) + - No analysis of user activity + - No message frequency analysis + +3. Missing explicit MongoDB query optimization for fast stats retrieval: + - No aggregation pipeline for complex statistics + - No indexing strategy mentioned for large chat databases + +## Potential Features to Cut + +Based on the analysis above, these features could potentially be removed to simplify: + +1. Media handling (`download_media` method and related fields) +2. Storage mode flexibility (just focus on MongoDB) +3. Complex fallback mechanisms (simplify error handling) +4. Some of the detailed fields in MessageModel and ChatModel +5. `/download_chat` command (focus on simpler `/ingest`) From 0f7313cdc028fce7134574d9dbc8c73dbc50edc6 Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 24 Mar 2025 01:39:38 +0300 Subject: [PATCH 5/8] chat fetcher: dump changes --- .../chat_analyzer_demo/README.md | 84 ++++++ .../chat_analyzer_demo/bot.py | 240 ++++++++++++++++++ .../chat_analyzer_demo/sample.env | 27 ++ 3 files changed, 351 insertions(+) create mode 100644 examples/components_examples/chat_analyzer_demo/README.md create mode 100644 examples/components_examples/chat_analyzer_demo/bot.py create mode 100644 examples/components_examples/chat_analyzer_demo/sample.env diff --git a/examples/components_examples/chat_analyzer_demo/README.md b/examples/components_examples/chat_analyzer_demo/README.md new file mode 100644 index 0000000..92fab39 --- /dev/null +++ b/examples/components_examples/chat_analyzer_demo/README.md @@ -0,0 +1,84 @@ +# Chat Analyzer Demo Bot + +This demo showcases the ChatFetcher component from Botspot, focusing on the ability to ingest +and cache chat messages in MongoDB for fast access and analysis. + +## Features + +- `/ingest [chat_id]` - Load messages from a specific chat into MongoDB cache +- `/stats` - Display statistics from the cached data (message count, chat count, last messages) +- Efficiently downloads only new messages when ingesting a chat +- Uses MongoDB for persistent storage of chat messages + +## Prerequisites + +- Python 3.8+ +- Telegram Bot Token +- Telegram API ID and API Hash (from https://my.telegram.org/apps) +- MongoDB server (required for caching) + +## Setup + +1. Clone the repository +2. Copy `sample.env` to `.env` and fill in the required values +3. Install dependencies: + +```bash +pip install -r requirements.txt # or use poetry +``` + +4. Run the bot: + +```bash +python bot.py +``` + +## Usage + +1. Start the bot with `/start` +2. Setup your Telethon client with `/setup_telethon` +3. Ingest messages from a chat with `/ingest [chat_id]` + - This loads messages from the specified chat into MongoDB + - If the chat was previously ingested, only new messages will be downloaded +4. Get statistics about ingested chats with `/stats` + - See total message count + - See last message from each cached chat + +## How It Works + +The key component used in this bot is `ChatFetcher`, which: + +1. Uses Telethon to access the Telegram API +2. Efficiently downloads messages with pagination and rate limiting +3. Stores messages and chat information in MongoDB +4. Provides methods to search and access the cached data + +When you use `/ingest`, the bot: +1. Checks if the chat is already in the cache +2. Downloads only new messages that aren't already cached +3. Updates the cache with the new messages + +When you use `/stats`, the bot: +1. Queries the MongoDB cache directly +2. Aggregates information about all cached chats +3. Returns statistics without needing to access the Telegram API + +## Extending This Demo + +This demo can be extended in several ways: + +1. Add more advanced analytics using aggregation queries on the MongoDB collection +2. Integrate with LLMProvider to generate summaries of chats +3. Add text analysis features (e.g., word frequency, sentiment analysis) +4. Create a dashboard for visualizing chat statistics + +## Environment Variables + +See `sample.env` for all available configuration options. + +## Dependencies + +- telethon +- motor (MongoDB driver) +- aiogram +- botspot \ No newline at end of file diff --git a/examples/components_examples/chat_analyzer_demo/bot.py b/examples/components_examples/chat_analyzer_demo/bot.py new file mode 100644 index 0000000..f40eae2 --- /dev/null +++ b/examples/components_examples/chat_analyzer_demo/bot.py @@ -0,0 +1,240 @@ +import asyncio +import os +from datetime import datetime + +from aiogram import Bot, Dispatcher, F +from aiogram.filters import Command +from aiogram.fsm.context import FSMContext +from aiogram.fsm.state import State, StatesGroup +from aiogram.types import Message +from dotenv import load_dotenv +from loguru import logger + +import botspot + +# Load environment variables +load_dotenv() + +# Initialize bot and dispatcher +token = os.getenv("TELEGRAM_BOT_TOKEN") +if not token: + raise ValueError("TELEGRAM_BOT_TOKEN is not set in environment variables") + +bot = Bot(token=token) +dp = Dispatcher() + + +class ChatStates(StatesGroup): + waiting_for_chat = State() + + +@dp.message(Command("start")) +async def cmd_start(message: Message): + """Start command handler""" + username = message.from_user.username or message.from_user.first_name + await message.answer( + f"Hello, {username}! I'm a Chat Analyzer bot.\n\n" + "I can help you analyze Telegram chats by first ingesting messages " + "and then providing statistics from the cached data.\n\n" + "Available commands:\n" + "/setup_telethon - Set up Telethon client\n" + "/ingest - Ingest messages from a specific chat\n" + "/stats - Show statistics about ingested chats\n" + "/help - Show this help message" + ) + + +@dp.message(Command("help")) +async def cmd_help(message: Message): + """Help command handler""" + await message.answer( + "Chat Analyzer Bot Help:\n\n" + "This bot demonstrates the ChatFetcher component with message caching.\n\n" + "Steps to use this bot:\n" + "1. Use /setup_telethon to authorize the bot to access your Telegram account\n" + "2. Use /ingest to load messages from a specific chat into the cache\n" + "3. Use /stats to view statistics about cached chats and messages\n\n" + "Available commands:\n" + "/setup_telethon - Set up Telethon client\n" + "/ingest - Ingest messages from a specific chat\n" + "/stats - Show statistics about ingested chats and messages\n" + "/help - Show this help message" + ) + + +@dp.message(Command("ingest")) +async def cmd_ingest(message: Message, state: FSMContext): + """Command to ingest messages from a chat and store them in MongoDB""" + # Check if user has Telethon client + from botspot import get_telethon_manager + + try: + telethon_manager = get_telethon_manager() + client = await telethon_manager.get_client(message.from_user.id) + if not client or not await client.is_user_authorized(): + await message.reply( + "You need to set up Telethon client first. Use /setup_telethon command." + ) + return + except Exception as e: + await message.reply(f"Error checking Telethon client: {str(e)}") + return + + # Check if command has a chat_id parameter + parts = message.text.split(maxsplit=1) + if len(parts) > 1: + # Use provided chat_id + try: + chat_id = int(parts[1]) + await ingest_chat(message, chat_id) + except ValueError: + await message.reply( + "Invalid chat ID format. Please provide a numeric chat ID or use /ingest without parameters to enter interactive mode." + ) + else: + # Ask user for chat ID + await message.reply( + "Please enter chat ID to ingest messages from.\n" + "You can use numeric chat ID or use /list_chats to see available chats." + ) + await state.set_state(ChatStates.waiting_for_chat) + + +@dp.message(ChatStates.waiting_for_chat) +async def process_chat_for_ingestion(message: Message, state: FSMContext): + """Process chat ID input for message ingestion""" + await state.clear() + + try: + chat_id = int(message.text.strip()) + await ingest_chat(message, chat_id) + except ValueError: + await message.reply("Invalid chat ID format. Please provide a numeric chat ID.") + + +async def ingest_chat(message: Message, chat_id: int): + """Ingest messages from the specified chat""" + try: + # Use ChatFetcher to download messages + from botspot import get_chat_fetcher + + chat_fetcher = get_chat_fetcher() + + # Send initial response + status_message = await message.reply( + f"Starting ingestion of messages from chat {chat_id}...\n" + "This may take a few minutes depending on chat size." + ) + + # Use load_chat_history to efficiently fetch and cache messages + messages = await chat_fetcher.load_chat_history( + chat_id=chat_id, + user_id=message.from_user.id, + # The method automatically checks the cache first and only fetches new messages + ) + + # Send completion message + if messages: + min_date = min(msg.date for msg in messages).strftime("%d %b %Y") + max_date = max(msg.date for msg in messages).strftime("%d %b %Y") + + await status_message.edit_text( + f"✅ Successfully ingested {len(messages)} messages from chat {chat_id}.\n\n" + f"Date range: {min_date} - {max_date}\n\n" + "Use /stats to see information about the cached messages." + ) + else: + await status_message.edit_text( + f"No messages found in chat {chat_id} or ingestion was skipped." + ) + + except Exception as e: + logger.exception(f"Error in ingest_chat: {e}") + await message.reply(f"Error ingesting chat: {str(e)}") + + +@dp.message(Command("stats")) +async def cmd_stats(message: Message): + """Show statistics about ingested chats and messages""" + try: + # Use ChatFetcher to get statistics from the cache + from botspot import get_chat_fetcher, get_dependency_manager + + chat_fetcher = get_chat_fetcher() + deps = get_dependency_manager() + + if not deps.mongo_database or not chat_fetcher.messages_collection: + await message.reply("MongoDB cache is not available. Make sure it's properly configured.") + return + + # Get message count from MongoDB + message_count = await chat_fetcher.messages_collection.count_documents({}) + + if message_count == 0: + await message.reply( + "No messages found in the cache. Use /ingest to load messages first." + ) + return + + # Get distinct chat count + chats = await chat_fetcher.messages_collection.distinct("chat_id") + chat_count = len(chats) + + # Get latest message from each chat (up to 5 chats) + stats_text = f"📊 Chat Statistics\n\n" + stats_text += f"Total messages in cache: {message_count}\n" + stats_text += f"Total chats in cache: {chat_count}\n\n" + + # For each chat, show message count and latest message + for i, chat_id in enumerate(chats[:5]): # Limit to first 5 chats + # Get count of messages for this chat + chat_message_count = await chat_fetcher.messages_collection.count_documents({"chat_id": chat_id}) + + # Get latest message + latest_message = await chat_fetcher.messages_collection.find_one( + {"chat_id": chat_id}, + sort=[("date", -1)] + ) + + if latest_message: + # Try to get chat name + chat_obj = await chat_fetcher.chats_collection.find_one({"id": chat_id}) + chat_name = chat_obj["title"] if chat_obj else f"Chat {chat_id}" + + # Format the message text + message_text = latest_message.get("text", "[No text]") + if message_text and len(message_text) > 50: + message_text = message_text[:47] + "..." + + # Format date + date_str = datetime.fromisoformat(str(latest_message["date"])).strftime("%Y-%m-%d %H:%M") + + stats_text += f"Chat: {chat_name}\n" + stats_text += f"- Messages: {chat_message_count}\n" + stats_text += f"- Latest [{date_str}]: {message_text}\n\n" + + if len(chats) > 5: + stats_text += f"... and {len(chats) - 5} more chats" + + await message.reply(stats_text) + + except Exception as e: + logger.exception(f"Error in stats command: {e}") + await message.reply(f"Error fetching statistics: {str(e)}") + + +async def main(): + """Main function to start the bot""" + # Initialize Botspot + bot_manager = botspot.core.bot_manager.BotManager(bot=bot, dispatcher=dp) + + # Setup Botspot components with the dispatcher + bot_manager.setup_dispatcher(dp) + + # Start polling + logger.info("Starting bot...") + await dp.start_polling(bot) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/components_examples/chat_analyzer_demo/sample.env b/examples/components_examples/chat_analyzer_demo/sample.env new file mode 100644 index 0000000..f311f8e --- /dev/null +++ b/examples/components_examples/chat_analyzer_demo/sample.env @@ -0,0 +1,27 @@ +# t.me/your_bot_name +TELEGRAM_BOT_TOKEN= + +# Telethon Manager settings (required for ChatFetcher) +BOTSPOT_TELETHON_MANAGER_ENABLED=True +BOTSPOT_TELETHON_MANAGER_API_ID= +BOTSPOT_TELETHON_MANAGER_API_HASH= +BOTSPOT_TELETHON_MANAGER_SESSIONS_DIR=sessions +BOTSPOT_TELETHON_MANAGER_AUTO_AUTH=True + +# MongoDB settings (required for caching) +BOTSPOT_MONGO_DATABASE_ENABLED=True +BOTSPOT_MONGO_DATABASE_CONN_STR=mongodb://localhost:27017 +BOTSPOT_MONGO_DATABASE_DATABASE=botspot_chat_analyzer + +# ChatFetcher settings +BOTSPOT_CHAT_FETCHER_ENABLED=True +BOTSPOT_CHAT_FETCHER_CACHE_ENABLED=True +BOTSPOT_CHAT_FETCHER_COLLECTION_NAME=telegram_messages +BOTSPOT_CHAT_FETCHER_CHATS_COLLECTION_NAME=telegram_chats +BOTSPOT_CHAT_FETCHER_MAX_MESSAGES_PER_REQUEST=100 +BOTSPOT_CHAT_FETCHER_BACKDAYS=30 +BOTSPOT_CHAT_FETCHER_LIMIT=1000 + +# Error handling +BOTSPOT_ERROR_HANDLING_ENABLED=True +BOTSPOT_ERROR_HANDLING_DEVELOPER_CHAT_ID=your_chat_id \ No newline at end of file From 751f91a7f75ee80685a36e1c4791a9c579a8e2b6 Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 24 Mar 2025 22:15:29 +0300 Subject: [PATCH 6/8] Optimize imports --- botspot/__init__.py | 1 + botspot/components/new/chat_fetcher.py | 7 ++++--- botspot/utils/__init__.py | 12 +++++++++++ botspot/utils/deps_getters.py | 28 ++++++++++++++------------ 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/botspot/__init__.py b/botspot/__init__.py index 316d1ad..be553ba 100644 --- a/botspot/__init__.py +++ b/botspot/__init__.py @@ -26,6 +26,7 @@ answer_safe, compare_users, get_bot, + get_chat_fetcher, get_database, get_dispatcher, get_message_text, diff --git a/botspot/components/new/chat_fetcher.py b/botspot/components/new/chat_fetcher.py index 0b10a4d..4b8eff6 100644 --- a/botspot/components/new/chat_fetcher.py +++ b/botspot/components/new/chat_fetcher.py @@ -1,12 +1,13 @@ -from aiogram import Dispatcher -from datetime import datetime, timedelta, timezone +from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +from aiogram import Dispatcher from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from telegram_downloader.config import StorageMode from telegram_downloader.data_model import ChatData from telegram_downloader.telegram_downloader import TelegramDownloader -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from botspot.utils.internal import get_logger diff --git a/botspot/utils/__init__.py b/botspot/utils/__init__.py index 6c07268..8ddea0c 100644 --- a/botspot/utils/__init__.py +++ b/botspot/utils/__init__.py @@ -7,6 +7,7 @@ # Then import deps_getters which depends on component functions from .deps_getters import ( get_bot, + get_chat_fetcher, get_database, get_dispatcher, get_scheduler, @@ -14,3 +15,14 @@ get_telethon_manager, get_user_manager, ) + +__all__ = [ + "get_bot", + "get_database", + "get_dispatcher", + "get_scheduler", + "get_telethon_client", + "get_telethon_manager", + "get_user_manager", + "get_chat_fetcher", +] diff --git a/botspot/utils/deps_getters.py b/botspot/utils/deps_getters.py index 26aa3f8..7b32616 100644 --- a/botspot/utils/deps_getters.py +++ b/botspot/utils/deps_getters.py @@ -20,9 +20,9 @@ from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase # noqa: F401 from telethon import TelegramClient - from botspot.components.main.telethon_manager import TelethonManager from botspot.components.new.chat_fetcher import ChatFetcher + # Core getters for bot and dispatcher def get_bot() -> "Bot": from botspot.core.dependency_manager import get_dependency_manager @@ -63,6 +63,19 @@ async def get_telethon_client( return client +# todo: move to chat_fetcher.py +def get_chat_fetcher() -> "ChatFetcher": + """Get ChatFetcher instance from dependency manager.""" + from botspot.core.dependency_manager import get_dependency_manager + + chat_fetcher = get_dependency_manager().chat_fetcher + if chat_fetcher is None: + raise RuntimeError( + "ChatFetcher is not initialized. Make sure chat_fetcher component is enabled in settings." + ) + return chat_fetcher + + # Re-export all for convenience __all__ = [ "get_bot", @@ -74,16 +87,5 @@ async def get_telethon_client( "get_telethon_client", "get_mongo_client", "get_chat_binder", + "get_chat_fetcher", ] - - -def get_chat_fetcher() -> "ChatFetcher": - """Get ChatFetcher instance from dependency manager.""" - from botspot.core.dependency_manager import get_dependency_manager - - chat_fetcher = get_dependency_manager().chat_fetcher - if chat_fetcher is None: - raise RuntimeError( - "ChatFetcher is not initialized. Make sure chat_fetcher component is enabled in settings." - ) - return chat_fetcher From a05a3fd89360e70cc8cf63343a7b8b5bc255cfdc Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Mon, 24 Mar 2025 22:54:54 +0300 Subject: [PATCH 7/8] v0.6.2 - Preparation for chat_fetcher rework - fix tests - assert dependency_manager.py is not initialized before bot manager - DependencyManager requires botspot settings - archive old workalong notes --- botspot/components/new/chat_fetcher.py | 4 +- botspot/core/bot_manager.py | 1 + botspot/core/dependency_manager.py | 8 +-- botspot/utils/internal.py | 8 ++- .../workalong_chat_binder/rework_protocol.md | 0 .../workalong_chat_binder/spec.md | 0 .../workalong_chat_binder/workalong.md | 0 dev/workalong_chat_fetcher/workalong.md | 27 ++++++++++ pyproject.toml | 2 +- tests/core/test_bot_manager.py | 51 +++++++++++++++++- tests/core/test_dependency_manager.py | 52 +++++++++++-------- 11 files changed, 121 insertions(+), 32 deletions(-) rename dev/{ => archive}/workalong_chat_binder/rework_protocol.md (100%) rename dev/{ => archive}/workalong_chat_binder/spec.md (100%) rename dev/{ => archive}/workalong_chat_binder/workalong.md (100%) diff --git a/botspot/components/new/chat_fetcher.py b/botspot/components/new/chat_fetcher.py index 4b8eff6..6f6c056 100644 --- a/botspot/components/new/chat_fetcher.py +++ b/botspot/components/new/chat_fetcher.py @@ -12,7 +12,7 @@ from botspot.utils.internal import get_logger if TYPE_CHECKING: - from motor.motor_asyncio import AsyncIOMotorCollection + from motor.motor_asyncio import AsyncIOMotorCollection # noqa: F401 from telethon import TelegramClient from telethon.types import Message @@ -362,6 +362,7 @@ async def get_messages( max_id=max_id, reverse=reverse, ) + assert messages is not None # Convert to MessageModel result = [] @@ -393,6 +394,7 @@ async def find_messages( # If not found in cache, use telethon's search function client = await self.get_client(user_id) messages = await client.get_messages(chat_id, search=query, limit=limit) + assert messages is not None # Convert to MessageModel result = [] diff --git a/botspot/core/bot_manager.py b/botspot/core/bot_manager.py index 8414edb..9215545 100644 --- a/botspot/core/bot_manager.py +++ b/botspot/core/bot_manager.py @@ -33,6 +33,7 @@ def __init__( logger.info( f"Initializing BotManager with config: {self.settings.model_dump_json(indent=2)}" ) + assert DependencyManager.is_initialized() is False, "BotManager is already initialized" self.deps = DependencyManager( botspot_settings=self.settings, bot=bot, dispatcher=dispatcher ) diff --git a/botspot/core/dependency_manager.py b/botspot/core/dependency_manager.py index f89ff2b..4257f4b 100644 --- a/botspot/core/dependency_manager.py +++ b/botspot/core/dependency_manager.py @@ -15,22 +15,22 @@ from botspot.components.data.user_data import UserManager from botspot.components.main.telethon_manager import TelethonManager - from botspot.components.new.chat_fetcher import ChatFetcher from botspot.components.new.chat_binder import ChatBinder + from botspot.components.new.chat_fetcher import ChatFetcher from botspot.components.new.llm_provider import LLMProvider class DependencyManager(metaclass=Singleton): def __init__( self, - botspot_settings: Optional[BotspotSettings] = None, + botspot_settings: BotspotSettings, bot: Optional[Bot] = None, dispatcher: Optional[Dispatcher] = None, mongo_client: Optional["AsyncIOMotorClient"] = None, mongo_database: Optional["AsyncIOMotorDatabase"] = None, **kwargs ): - self._botspot_settings = botspot_settings or BotspotSettings() + self._botspot_settings = botspot_settings self._bot = bot self._dispatcher = dispatcher self._mongo_client = mongo_client @@ -159,4 +159,4 @@ def is_initialized(cls) -> bool: def get_dependency_manager() -> DependencyManager: if not DependencyManager.is_initialized(): raise ValueError("Dependency manager is not initialized") - return DependencyManager() + return Singleton.get_instance(DependencyManager) diff --git a/botspot/utils/internal.py b/botspot/utils/internal.py index fb271b1..ce4cfba 100644 --- a/botspot/utils/internal.py +++ b/botspot/utils/internal.py @@ -1,3 +1,5 @@ +from typing import Type + from loguru import logger @@ -20,5 +22,9 @@ class MyClass(BaseClass, metaclass=Singleton): def __call__(cls, *args, **kwargs): if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] + + @classmethod + def get_instance(cls, TargetClass: Type): + return cls._instances[TargetClass] diff --git a/dev/workalong_chat_binder/rework_protocol.md b/dev/archive/workalong_chat_binder/rework_protocol.md similarity index 100% rename from dev/workalong_chat_binder/rework_protocol.md rename to dev/archive/workalong_chat_binder/rework_protocol.md diff --git a/dev/workalong_chat_binder/spec.md b/dev/archive/workalong_chat_binder/spec.md similarity index 100% rename from dev/workalong_chat_binder/spec.md rename to dev/archive/workalong_chat_binder/spec.md diff --git a/dev/workalong_chat_binder/workalong.md b/dev/archive/workalong_chat_binder/workalong.md similarity index 100% rename from dev/workalong_chat_binder/workalong.md rename to dev/archive/workalong_chat_binder/workalong.md diff --git a/dev/workalong_chat_fetcher/workalong.md b/dev/workalong_chat_fetcher/workalong.md index 7393842..18783a6 100644 --- a/dev/workalong_chat_fetcher/workalong.md +++ b/dev/workalong_chat_fetcher/workalong.md @@ -57,3 +57,30 @@ Based on the analysis above, these features could potentially be removed to simp 3. Complex fallback mechanisms (simplify error handling) 4. Some of the detailed fields in MessageModel and ChatModel 5. `/download_chat` command (focus on simpler `/ingest`) + +# ChatFetcher Component Rework Analysis + +## Spec vs Implementation Analysis + +### Additional Features Implemented (Not in Spec) +- Storage mode switching (mongo/local) when spec only mentioned MongoDB +- Media handling and downloading (`download_media` method) +- Complex fallback to direct Telethon usage when telegram-downloader fails +- `/list_chats` and `/search_chat` commands not mentioned in spec +- Advanced message searching with `find_messages` + +### Missing Features (In Spec But Not Implemented) +- Chat Analyzer features (active users stats, message frequency analysis) +- LLM integration for chat summaries +- Optimized MongoDB queries for fast stats retrieval + +## Proposed Changes + +1. Remove direct Telethon usage in ChatFetcher class (use only telegram-downloader) +2. Remove storage_mode option and simplify to MongoDB-only +3. Remove media handling code (download_media method) +4. Rename `/download_chat` command to `/ingest` to match spec +5. Add basic analytics methods for chat stats +6. Add LLM integration for chat summaries +7. Add MongoDB indexing for optimized query performance +8. Add ChatAnalyzer demo features (message frequency, active users) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 32b7134..164edff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "botspot" -version = "0.6.1" +version = "0.6.2" description = "" authors = ["Petr Lavrov "] readme = "README.md" diff --git a/tests/core/test_bot_manager.py b/tests/core/test_bot_manager.py index 2c72a3e..af776b1 100644 --- a/tests/core/test_bot_manager.py +++ b/tests/core/test_bot_manager.py @@ -30,6 +30,10 @@ def test_singleton_pattern(self): ) as mock_user_data, patch( "botspot.core.bot_manager.chat_binder" ) as mock_chat_binder, patch( + "botspot.core.bot_manager.chat_fetcher" + ) as mock_chat_fetcher, patch( + "botspot.core.bot_manager.llm_provider" + ) as mock_llm_provider, patch( "botspot.core.bot_manager.logger" ): @@ -42,6 +46,8 @@ def test_singleton_pattern(self): "user_data", "single_user_mode", "chat_binder", + "chat_fetcher", + "llm_provider", ]: component_settings = MagicMock() component_settings.enabled = False @@ -49,10 +55,17 @@ def test_singleton_pattern(self): mock_settings_class.return_value = mock_settings + # Configure dependency manager mock + mock_deps_class.return_value = MagicMock() + # During initialization, is_initialized should return False, then True after initialization + mock_deps_class.is_initialized.side_effect = [False, True, True] + # Mock components to avoid real initialization mock_telethon_manager.initialize.return_value = None mock_user_data.initialize.return_value = None mock_chat_binder.initialize.return_value = None + mock_chat_fetcher.initialize.return_value = None + mock_llm_provider.initialize.return_value = None bm1 = BotManager() bm2 = BotManager() @@ -67,6 +80,10 @@ def test_initialization_without_parameters(self): ) as mock_user_data, patch( "botspot.core.bot_manager.chat_binder" ) as mock_chat_binder, patch( + "botspot.core.bot_manager.chat_fetcher" + ) as mock_chat_fetcher, patch( + "botspot.core.bot_manager.llm_provider" + ) as mock_llm_provider, patch( "botspot.core.bot_manager.logger" ), patch( "botspot.core.bot_manager.DependencyManager" @@ -81,6 +98,8 @@ def test_initialization_without_parameters(self): "user_data", "single_user_mode", "chat_binder", + "chat_fetcher", + "llm_provider", ]: component_settings = MagicMock() component_settings.enabled = False @@ -89,11 +108,15 @@ def test_initialization_without_parameters(self): mock_deps = MagicMock() mock_deps_class.return_value = mock_deps + # During initialization, is_initialized should return False, then True after initialization + mock_deps_class.is_initialized.side_effect = [False, True, True] # Mock components to avoid real initialization mock_telethon_manager.initialize.return_value = None mock_user_data.initialize.return_value = None mock_chat_binder.initialize.return_value = None + mock_chat_fetcher.initialize.return_value = None + mock_llm_provider.initialize.return_value = None bm = BotManager() @@ -113,6 +136,10 @@ def test_initialization_with_parameters(self): ) as mock_user_data, patch( "botspot.core.bot_manager.chat_binder" ) as mock_chat_binder, patch( + "botspot.core.bot_manager.chat_fetcher" + ) as mock_chat_fetcher, patch( + "botspot.core.bot_manager.llm_provider" + ) as mock_llm_provider, patch( "botspot.core.bot_manager.logger" ): @@ -125,6 +152,8 @@ def test_initialization_with_parameters(self): "user_data", "single_user_mode", "chat_binder", + "chat_fetcher", + "llm_provider", ]: component_settings = MagicMock() component_settings.enabled = False @@ -133,11 +162,15 @@ def test_initialization_with_parameters(self): mock_deps = MagicMock() mock_deps_class.return_value = mock_deps + # During initialization, is_initialized should return False, then True after initialization + mock_deps_class.is_initialized.side_effect = [False, True, True] # Mock components to avoid real initialization mock_telethon_manager.initialize.return_value = None mock_user_data.initialize.return_value = None mock_chat_binder.initialize.return_value = None + mock_chat_fetcher.initialize.return_value = None + mock_llm_provider.initialize.return_value = None # Create mock bot, dispatcher, and user_class mock_bot = MagicMock(spec=Bot) @@ -169,6 +202,10 @@ class TestUser(User): ("user_data", False, False, False), ("single_user_mode", True, True, True), ("single_user_mode", False, False, False), + ("chat_fetcher", True, True, True), + ("chat_fetcher", False, False, False), + ("llm_provider", True, True, True), + ("llm_provider", False, False, False), ], ) def test_component_initialization( @@ -184,6 +221,8 @@ def test_component_initialization( "user_data", "single_user_mode", "chat_binder", + "chat_fetcher", + "llm_provider", ]: if comp != component_name: patches[comp] = patch(f"botspot.core.bot_manager.{comp}") @@ -253,7 +292,8 @@ def test_component_initialization( mock_deps = MagicMock() mock_deps.botspot_settings = mock_settings mocks["DependencyManager"].return_value = mock_deps - mocks["DependencyManager"].is_initialized.return_value = True + # During initialization, is_initialized should return False, then True after initialization + mocks["DependencyManager"].is_initialized.side_effect = [False, True, True] mocks["get_dependency_manager"].return_value = mock_deps # Create BotManager @@ -293,6 +333,10 @@ def test_ask_user_requires_bot(self): ) as mock_user_data, patch( "botspot.core.bot_manager.chat_binder" ) as mock_chat_binder, patch( + "botspot.core.bot_manager.chat_fetcher" + ) as mock_chat_fetcher, patch( + "botspot.core.bot_manager.llm_provider" + ) as mock_llm_provider, patch( "botspot.core.bot_manager.user_interactions" ) as mock_user_interactions, patch( "botspot.core.bot_manager.error_handler" @@ -326,6 +370,8 @@ def test_ask_user_requires_bot(self): "print_bot_url", "bot_commands_menu", "bot_info", + "chat_fetcher", + "llm_provider", ]: comp_settings = MagicMock() comp_settings.enabled = False @@ -342,7 +388,8 @@ def test_ask_user_requires_bot(self): mock_deps.bot = None mock_deps.botspot_settings = mock_settings mock_deps_class.return_value = mock_deps - mock_deps_class.is_initialized.return_value = True + # During initialization, is_initialized should return False, then True after initialization + mock_deps_class.is_initialized.side_effect = [False, True, True] # Mock dependency_manager in all components with patch("botspot.core.dependency_manager.get_dependency_manager") as mock_get_deps: diff --git a/tests/core/test_dependency_manager.py b/tests/core/test_dependency_manager.py index a73f4d9..36a3974 100644 --- a/tests/core/test_dependency_manager.py +++ b/tests/core/test_dependency_manager.py @@ -21,8 +21,9 @@ def clean_singleton(): class TestDependencyManager: def test_singleton_pattern(self): """Test that DependencyManager follows singleton pattern""" - dm1 = DependencyManager() - dm2 = DependencyManager() + mock_settings = MagicMock(spec=BotspotSettings) + dm1 = DependencyManager(botspot_settings=mock_settings) + dm2 = DependencyManager(botspot_settings=mock_settings) assert dm1 is dm2 def test_initialization(self): @@ -48,7 +49,8 @@ def test_initialization(self): def test_property_setters(self): """Test that properties can be set after initialization""" - dm = DependencyManager() + mock_settings = MagicMock(spec=BotspotSettings) + dm = DependencyManager(botspot_settings=mock_settings) # Create mock dependencies mock_bot = MagicMock(spec=Bot) @@ -76,7 +78,8 @@ def test_property_setters(self): def test_custom_attributes(self): """Test that custom attributes can be set at initialization""" - dm = DependencyManager(custom_service=MagicMock()) + mock_settings = MagicMock(spec=BotspotSettings) + dm = DependencyManager(botspot_settings=mock_settings, custom_service=MagicMock()) assert hasattr(dm, "custom_service") def test_is_initialized(self): @@ -85,14 +88,16 @@ def test_is_initialized(self): assert not DependencyManager.is_initialized() # After initialization - DependencyManager() + mock_settings = MagicMock(spec=BotspotSettings) + DependencyManager(botspot_settings=mock_settings) assert DependencyManager.is_initialized() def test_default_attributes(self): """Test that default attributes are set when not provided""" - dm = DependencyManager() + mock_settings = MagicMock(spec=BotspotSettings) + dm = DependencyManager(botspot_settings=mock_settings) - assert isinstance(dm.botspot_settings, BotspotSettings) + assert dm.botspot_settings is mock_settings # Check internal attributes directly instead of properties that raise exceptions assert dm._bot is None assert dm._dispatcher is None @@ -101,17 +106,13 @@ def test_default_attributes(self): assert dm._telethon_manager is None assert dm._user_manager is None - def test_botspot_settings_default_initialization(self): - """Test that botspot_settings is initialized with BotspotSettings if not provided""" - with patch("botspot.core.dependency_manager.BotspotSettings") as mock_settings_class: - mock_settings = MagicMock() - mock_settings_class.return_value = mock_settings - - dm = DependencyManager() + def test_botspot_settings_initialization(self): + """Test that botspot_settings is properly initialized""" + mock_settings = MagicMock(spec=BotspotSettings) + dm = DependencyManager(botspot_settings=mock_settings) - # Should create a default settings object - mock_settings_class.assert_called_once() - assert dm.botspot_settings is mock_settings + # Check that the provided settings object is used + assert dm.botspot_settings is mock_settings def test_multiple_dependency_instances(self): """Test that attributes from first instance persist in subsequent instances due to singleton""" @@ -120,8 +121,9 @@ def test_multiple_dependency_instances(self): Singleton._instances = {} - # Create a fresh instance - dm1 = DependencyManager() + # Create a fresh instance with settings + mock_settings = MagicMock(spec=BotspotSettings) + dm1 = DependencyManager(botspot_settings=mock_settings) mock_bot = MagicMock(spec=Bot) # Set a property on the first instance @@ -131,7 +133,9 @@ def test_multiple_dependency_instances(self): assert dm1._bot is mock_bot # Both instances should be the same object due to singleton - dm2 = DependencyManager() + dm2 = DependencyManager( + botspot_settings=MagicMock() + ) # Different settings should be ignored due to singleton assert dm1 is dm2 assert dm2._bot is mock_bot @@ -147,16 +151,18 @@ def test_get_uninitialized_manager(self): def test_get_initialized_manager(self): """Test that get_dependency_manager returns the singleton instance""" - dm = DependencyManager() + mock_settings = MagicMock(spec=BotspotSettings) + dm = DependencyManager(botspot_settings=mock_settings) retrieved_dm = get_dependency_manager() assert retrieved_dm is dm def test_dependency_manager_functionality(self): """Test that get_dependency_manager returns a fully functional DependencyManager""" + mock_settings = MagicMock(spec=BotspotSettings) mock_bot = MagicMock(spec=Bot) - # Initialize with a bot - DependencyManager(bot=mock_bot) + # Initialize with botspot_settings and bot + DependencyManager(botspot_settings=mock_settings, bot=mock_bot) # Get the manager dm = get_dependency_manager() From 298957783b1bb8c4e8639b5025fd45eb57a792fc Mon Sep 17 00:00:00 2001 From: Petr Lavrov Date: Wed, 26 Mar 2025 04:41:33 +0300 Subject: [PATCH 8/8] rename workalong.md --- .../workalong_stage_1.md} | 0 dev/workalong_chat_fetcher/{workalong.md => workalong_stage_2.md} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename dev/{workalong_chat_fetcher.md => workalong_chat_fetcher/workalong_stage_1.md} (100%) rename dev/workalong_chat_fetcher/{workalong.md => workalong_stage_2.md} (100%) diff --git a/dev/workalong_chat_fetcher.md b/dev/workalong_chat_fetcher/workalong_stage_1.md similarity index 100% rename from dev/workalong_chat_fetcher.md rename to dev/workalong_chat_fetcher/workalong_stage_1.md diff --git a/dev/workalong_chat_fetcher/workalong.md b/dev/workalong_chat_fetcher/workalong_stage_2.md similarity index 100% rename from dev/workalong_chat_fetcher/workalong.md rename to dev/workalong_chat_fetcher/workalong_stage_2.md