Source code for folderbot.telegram_handler

"""Telegram bot handler."""

import asyncio
import base64
import hashlib
import logging
import re
from pathlib import Path
from typing import Any

from telegram import (
    Bot,
    InlineKeyboardButton,
    InlineKeyboardMarkup,
    KeyboardButton,
    Message,
    ReplyKeyboardMarkup,
    ReplyKeyboardRemove,
    Update,
)
from telegram.error import BadRequest
from telegram.ext import (
    Application,
    CallbackQueryHandler,
    CommandHandler,
    ContextTypes,
    MessageHandler,
    filters,
)

from . import __version__
from .llm_client import AskUserRequest, LLMClient
from .status_notifier import StatusNotifier
from .config import Config
from .context_builder import ContextBuilder
from .file_watcher import FileWatcher
from .migration import migrate_database
from .scheduler import TaskScheduler, TaskStore
from .session_manager import SessionManager
from .tools.upload_tools import UploadServices
from .transcription import transcribe_audio

logger = logging.getLogger(__name__)


def _markdown_to_html(text: str) -> str:
    """Convert common Markdown formatting to Telegram HTML.

    Handles:
    - **bold** or __bold__ → <b>bold</b>
    - *italic* or _italic_ → <i>italic</i>
    - `code` → <code>code</code>
    - ```code block``` → <pre>code block</pre>
    """
    # Code blocks first (``` ... ```) - must come before inline code
    text = re.sub(r"```(\w*)\n?(.*?)```", r"<pre>\2</pre>", text, flags=re.DOTALL)

    # Inline code (` ... `)
    text = re.sub(r"`([^`]+)`", r"<code>\1</code>", text)

    # Bold: **text** or __text__ (must come before italic)
    text = re.sub(r"\*\*(.+?)\*\*", r"<b>\1</b>", text)
    text = re.sub(r"__(.+?)__", r"<b>\1</b>", text)

    # Italic: *text* or _text_ (but not inside words like file_name)
    # Only match * for italic to avoid false positives with underscores in names
    text = re.sub(r"(?<!\w)\*([^*]+)\*(?!\w)", r"<i>\1</i>", text)

    return text


[docs] class TelegramBot: """Main Telegram bot class."""
[docs] def __init__(self, config: Config): self.config = config self.context_builder = ContextBuilder(config) # Migrate data from old global database if needed migrate_database(config.db_path, config.allowed_user_ids) self.session_manager = SessionManager(config.db_path) # Initialize scheduler self._task_store = TaskStore(config.db_path) self._scheduler = TaskScheduler( task_store=self._task_store, send_message=self._send_scheduler_message, summarize=self._summarize_for_scheduler, ) # Create LLM client (scheduler tools registered on folder_bot) self.llm_client = LLMClient(config) # Wire up dependencies: # - Scheduler needs folder_tools to execute tools # - FolderTools needs scheduler for SchedulerServices # - FolderTools needs session_manager for file notification preferences self._scheduler.set_folder_tools(self.llm_client.tools) self.llm_client.tools.set_scheduler(self._scheduler) self.llm_client.tools.set_session_manager(self.session_manager) # Wire up upload services uploads_dir = config.root_folder / ".folderbot" / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) self.llm_client.tools.set_upload_services( UploadServices( session_manager=self.session_manager, uploads_dir=uploads_dir, send_document=self._send_upload_document, ) ) # Initialize file watcher (sends notifications on file changes) self._file_watcher = FileWatcher( config.root_folder, config.watch_config, self._send_file_change_notification, ) # Application reference (set in run()) self._application: Application | None = None # type: ignore[type-arg] # Track pending messages per user (accumulated while processing) self._pending_messages: dict[int, list[str]] = {} # Track pending images per user (for multimodal LLM input) self._pending_images: dict[int, list[dict[str, str]]] = {} # Track current processing task per user (for cancellation) self._processing_tasks: dict[int, asyncio.Task] = {} # Track the latest update object for sending responses self._pending_updates: dict[int, Update] = {} # Track cancelled state per user (to prevent responses after cancellation) self._cancelled_users: set[int] = set() # Track messages currently being processed (to restore on cancellation) self._processing_messages: dict[int, list[str]] = {} # Pending ask_user futures per user (user_id -> Future[str]) self._ask_user_futures: dict[int, asyncio.Future[str]] = {} # Track expected input type per user (for routing responses) self._ask_user_input_types: dict[int, str] = {} # Track option mappings per user (index -> option text) self._ask_user_options: dict[int, list[str]] = {}
def _is_authorized(self, user_id: int) -> bool: """Check if user is authorized.""" return user_id in self.config.allowed_user_ids @staticmethod async def _reply_text(message: Message, text: str, **kwargs: Any) -> None: """Reply with HTML parse_mode, falling back to plain text on failure.""" text = _markdown_to_html(text) try: await message.reply_text(text, parse_mode="HTML", **kwargs) except BadRequest: await message.reply_text(text, **kwargs) @staticmethod async def _send_text(bot: Bot, chat_id: int, text: str, **kwargs: Any) -> None: """Send message with HTML parse_mode, falling back to plain text on failure.""" text = _markdown_to_html(text) try: await bot.send_message( chat_id=chat_id, text=text, parse_mode="HTML", **kwargs ) except BadRequest: await bot.send_message(chat_id=chat_id, text=text, **kwargs) async def _check_version_notification(self, user_id: int, update: Update) -> None: """Check if user needs to be notified about a new version.""" if not update.message: return last_version = self.session_manager.get_last_notified_version(user_id) if last_version != __version__: logger.info( f"[{user_id}] New version notification: {last_version} -> {__version__}" ) await self._reply_text( update.message, f"Folderbot updated to v{__version__}" ) self.session_manager.set_last_notified_version(user_id, __version__)
[docs] async def start_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /start command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return await self._reply_text( update.message, "Self-bot ready. Send me a message to chat with your folder.\n\n" "Commands:\n" "/clear - Clear conversation history\n" "/new - Start new topic\n" "/status - Show session info\n" "/files - List files in context\n" "/tasks - List scheduled tasks", )
[docs] async def clear_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /clear command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return self.session_manager.clear_session(user_id) await self._reply_text(update.message, "Conversation history cleared.")
[docs] async def new_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /new command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return # Clear session and signal new topic self.session_manager.clear_session(user_id) await self._reply_text(update.message, "New topic started. History cleared.")
[docs] async def status_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /status command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return session_info = self.session_manager.get_session_info(user_id) context_stats = self.context_builder.get_context_stats() status = ( f"Session:\n" f" Messages: {session_info['message_count']}\n" f" Last update: {session_info['updated_at'] or 'Never'}\n\n" f"Context:\n" f" Files: {context_stats['file_count']}\n" f" Size: {context_stats['total_chars']:,} chars\n" f" Cache age: {context_stats['cache_age_seconds']}s" ) await self._reply_text(update.message, status)
[docs] async def files_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /files command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return files = self.context_builder.get_file_list() if not files: await self._reply_text(update.message, "No files in context.") return # Truncate if too many files if len(files) > 50: file_list = "\n".join(files[:50]) file_list += f"\n... and {len(files) - 50} more" else: file_list = "\n".join(files) await self._reply_text( update.message, f"Files in context ({len(files)}):\n\n{file_list}" )
[docs] async def handle_message( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle regular messages with immediate processing and restart on new message.""" if not update.effective_user or not update.message or not update.message.text: return user_id = update.effective_user.id user_message = update.message.text logger.info(f"[{user_id}] Message received: {user_message[:50]}...") # Check if there's a pending ask_user text input for this user if ( user_id in self._ask_user_futures and self._ask_user_input_types.get(user_id) == "text" ): future = self._ask_user_futures[user_id] if not future.done(): future.set_result(user_message) return if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return # Check for version update notification (only on first message) await self._check_version_notification(user_id, update) # Add message to pending list if user_id not in self._pending_messages: self._pending_messages[user_id] = [] self._pending_messages[user_id].append(user_message) self._pending_updates[user_id] = update logger.info( f"[{user_id}] Added to pending. Total pending: {len(self._pending_messages[user_id])}" ) # Check if already processing for this user if user_id in self._processing_tasks: task = self._processing_tasks[user_id] if not task.done(): # Already processing - cancel it, messages are accumulated logger.info(f"[{user_id}] Existing task found, cancelling...") self._cancelled_users.add(user_id) task.cancel() # Wait for cancellation to complete before starting new task try: await task except asyncio.CancelledError: pass logger.info(f"[{user_id}] Previous task cancelled successfully") # Restore messages that were being processed if user_id in self._processing_messages: cancelled_msgs = self._processing_messages.pop(user_id) logger.info( f"[{user_id}] Restoring {len(cancelled_msgs)} cancelled message(s)" ) # Prepend to pending so they come first if user_id in self._pending_messages: self._pending_messages[user_id] = ( cancelled_msgs + self._pending_messages[user_id] ) else: self._pending_messages[user_id] = cancelled_msgs else: logger.info(f"[{user_id}] Existing task already done") # Clear cancelled state before starting new processing self._cancelled_users.discard(user_id) # Start processing with all accumulated messages await self._start_processing(user_id)
[docs] async def handle_document( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle document uploads.""" if ( not update.effective_user or not update.message or not update.message.document ): return user_id = update.effective_user.id document = update.message.document logger.info(f"[{user_id}] Document received: {document.file_name}") if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return try: file = await document.get_file() file_bytes = await file.download_as_bytearray() except Exception as e: logger.exception("Error downloading document") await self._reply_text(update.message, f"Error downloading file: {e}") return file_hash = hashlib.sha256(file_bytes).hexdigest() original_filename = document.file_name or "unknown" extension = Path(original_filename).suffix uploads_dir = self.config.root_folder / ".folderbot" / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) file_path = uploads_dir / file_hash file_path.write_bytes(file_bytes) record = self.session_manager.save_upload( user_id=user_id, original_filename=original_filename, hash_filename=file_hash, extension=extension, file_size=len(file_bytes), mime_type=document.mime_type or "application/octet-stream", ) logger.info( f"[{user_id}] Upload saved: {original_filename} -> {file_hash} " f"({record.file_size} bytes)" ) await self._reply_text( update.message, f"Uploaded: {original_filename} ({record.file_size:,} bytes)", )
[docs] async def handle_photo( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle photo uploads: save to uploads and process with LLM vision.""" if not update.effective_user or not update.message or not update.message.photo: return user_id = update.effective_user.id logger.info(f"[{user_id}] Photo received") if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return try: # Get the largest photo resolution (last in the list) photo = update.message.photo[-1] file = await photo.get_file() file_bytes = bytes(await file.download_as_bytearray()) except Exception as e: logger.exception("Error downloading photo") await self._reply_text(update.message, f"Error downloading photo: {e}") return # Save to uploads file_hash = hashlib.sha256(file_bytes).hexdigest() uploads_dir = self.config.root_folder / ".folderbot" / "uploads" uploads_dir.mkdir(parents=True, exist_ok=True) file_path = uploads_dir / file_hash file_path.write_bytes(file_bytes) self.session_manager.save_upload( user_id=user_id, original_filename=f"photo_{file_hash[:8]}.jpg", hash_filename=file_hash, extension=".jpg", file_size=len(file_bytes), mime_type="image/jpeg", ) logger.info(f"[{user_id}] Photo saved: {file_hash} ({len(file_bytes)} bytes)") # Queue for LLM processing with image data caption = update.message.caption or "Describe this image." image_b64 = base64.b64encode(file_bytes).decode("utf-8") if user_id not in self._pending_messages: self._pending_messages[user_id] = [] self._pending_messages[user_id].append(caption) self._pending_updates[user_id] = update if user_id not in self._pending_images: self._pending_images[user_id] = [] self._pending_images[user_id].append( {"data": image_b64, "media_type": "image/jpeg"} ) await self._start_processing(user_id)
[docs] async def handle_voice( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle voice messages and audio files by transcribing and processing as text.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return voice = update.message.voice audio = update.message.audio try: if voice: file = await voice.get_file() elif audio: file = await audio.get_file() else: return file_bytes = bytes(await file.download_as_bytearray()) result = await transcribe_audio( audio_bytes=file_bytes, model_name=self.config.whisper_model, ) except Exception as e: logger.exception("Error during voice transcription") await self._reply_text(update.message, f"Transcription failed: {e}") return self._save_voice_recording(file_bytes) user_message = f"[Transcribed from voice message]\n{result.text}" logger.info( f"[{user_id}] Voice transcribed ({len(result.text)} chars): " f"{result.text[:50]}..." ) if user_id not in self._pending_messages: self._pending_messages[user_id] = [] self._pending_messages[user_id].append(user_message) self._pending_updates[user_id] = update await self._start_processing(user_id)
def _save_voice_recording(self, file_bytes: bytes) -> Path: """Save a voice recording to the voice directory.""" from datetime import datetime, timezone voice_dir = self.config.root_folder / ".folderbot" / "voice" voice_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") file_path = voice_dir / f"{timestamp}.ogg" file_path.write_bytes(file_bytes) logger.info(f"Saved voice recording to {file_path}") return file_path async def _start_processing(self, user_id: int) -> None: """Start processing accumulated messages for a user.""" # Collect all pending messages and images messages = self._pending_messages.pop(user_id, []) images = self._pending_images.pop(user_id, []) update = self._pending_updates.pop(user_id, None) if not messages or not update or not update.message: logger.info(f"[{user_id}] _start_processing: no messages to process") return # Track messages being processed (for restoration if cancelled) self._processing_messages[user_id] = messages # Combine multiple messages into one combined_message = "\n".join(messages) logger.info(f"[{user_id}] Starting processing with {len(messages)} message(s)") # Start processing task (can be cancelled if new message arrives) task = asyncio.create_task( self._process_message( user_id, combined_message, update, images=images or None ) ) self._processing_tasks[user_id] = task logger.info(f"[{user_id}] Processing task created") async def _process_message( self, user_id: int, user_message: str, update: Update, images: list[dict[str, str]] | None = None, ) -> None: """Process a message and send response.""" if not update.message: return logger.info(f"[{user_id}] _process_message started") chat_id = update.message.chat_id # type: ignore[union-attr] notifier = StatusNotifier( chat_id=chat_id, bot=self._application.bot, # type: ignore[union-attr] ) await notifier.start() try: if user_id in self._cancelled_users: logger.info(f"[{user_id}] Cancelled before API call, aborting") return history = self.session_manager.get_history(user_id) context = self.llm_client.tools.create_context(user_id) context.services["session"] = self.session_manager logger.info(f"[{user_id}] Calling LLM API...") async def on_ask_user(request: AskUserRequest) -> str: return await self._handle_ask_user(user_id, chat_id, request) response, tools_used, topic, usage = await self.llm_client.chat( user_message, context, history, on_tool_use=notifier.update, on_ask_user=on_ask_user, chat_id=chat_id, images=images, ) self.session_manager.record_token_usage( user_id=user_id, input_tokens=usage.input_tokens, output_tokens=usage.output_tokens, model=self.config.model, topic=topic, ) display_response = response if tools_used: seen: set[str] = set() unique_tools: list[str] = [] for t in tools_used: if t not in seen: seen.add(t) unique_tools.append(t) display_response += f"\n\n<i>Tools: {', '.join(unique_tools)}</i>" self.llm_client.tools.activity_logger.log_message( direction="user", content=user_message, user_id=user_id, ) self.llm_client.tools.activity_logger.log_message( direction="assistant", content=response, user_id=user_id, tools_used=tools_used if tools_used else None, ) logger.info( f"[{user_id}] LLM API returned, response length: {len(response)}" ) if asyncio.current_task() and asyncio.current_task().cancelled(): # type: ignore[union-attr] logger.info(f"[{user_id}] Task cancelled after API call, aborting") return if user_id in self._cancelled_users: logger.info(f"[{user_id}] Cancelled flag set after API call, aborting") return self.session_manager.add_message(user_id, "user", user_message, topic=topic) self.session_manager.add_message( user_id, "assistant", response, topic=topic ) if user_id in self._cancelled_users: logger.info(f"[{user_id}] Cancelled before send, aborting") return logger.info(f"[{user_id}] Sending response to Telegram...") if len(display_response) > 4096: for i in range(0, len(display_response), 4096): if user_id in self._cancelled_users: logger.info( f"[{user_id}] Cancelled during split send, aborting" ) return await self._reply_text( update.message, display_response[i : i + 4096] ) else: await self._reply_text(update.message, display_response) logger.info(f"[{user_id}] Response sent successfully") self._processing_messages.pop(user_id, None) except asyncio.CancelledError: logger.info(f"[{user_id}] CancelledError caught, response cancelled") except Exception as e: logger.exception("Error handling message") await self._reply_text(update.message, f"Error: {e}") finally: await notifier.stop()
[docs] async def wait_for_processing(self, user_id: int) -> None: """Wait for any pending processing to complete. For testing.""" if user_id in self._processing_tasks: task = self._processing_tasks[user_id] if not task.done(): try: await task except asyncio.CancelledError: pass
ASK_USER_TIMEOUT_SECONDS = 120 async def _handle_ask_user( self, user_id: int, chat_id: int, request: AskUserRequest ) -> str: """Handle an ask_user tool call from the agent loop. Sends the appropriate Telegram UI and waits for the user's response. Returns the user's answer as a string. Raises asyncio.TimeoutError if the user doesn't respond in time. """ if not self._application: raise RuntimeError("Application not initialized") bot = self._application.bot loop = asyncio.get_running_loop() future: asyncio.Future[str] = loop.create_future() self._ask_user_futures[user_id] = future self._ask_user_input_types[user_id] = request.input_type try: if request.input_type == "choice": options = request.options self._ask_user_options[user_id] = options buttons = [ [InlineKeyboardButton(opt, callback_data=f"ask:{user_id}:{i}")] for i, opt in enumerate(options) ] markup = InlineKeyboardMarkup(buttons) await self._send_text( bot, chat_id, request.question, reply_markup=markup ) elif request.input_type == "confirm": options = request.options if request.options else ["Yes", "No"] self._ask_user_options[user_id] = options buttons = [ [ InlineKeyboardButton(opt, callback_data=f"ask:{user_id}:{i}") for i, opt in enumerate(options) ] ] markup = InlineKeyboardMarkup(buttons) await self._send_text( bot, chat_id, request.question, reply_markup=markup ) elif request.input_type == "text": await self._send_text(bot, chat_id, request.question) elif request.input_type == "location": button = KeyboardButton("Share Location", request_location=True) location_markup = ReplyKeyboardMarkup( [[button]], one_time_keyboard=True, resize_keyboard=True ) await self._send_text( bot, chat_id, request.question, reply_markup=location_markup ) return await asyncio.wait_for(future, timeout=self.ASK_USER_TIMEOUT_SECONDS) except asyncio.TimeoutError: await self._send_text(bot, chat_id, "(Question timed out)") raise finally: self._ask_user_futures.pop(user_id, None) self._ask_user_input_types.pop(user_id, None) self._ask_user_options.pop(user_id, None) async def _handle_callback_query( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle inline keyboard button presses for ask_user.""" query = update.callback_query if not query or not query.data: return await query.answer() # Parse callback data: "ask:<user_id>:<index>" parts = query.data.split(":", 2) if len(parts) != 3 or parts[0] != "ask": return try: user_id = int(parts[1]) option_index = int(parts[2]) except ValueError: return # Look up the option text from the stored mapping options = self._ask_user_options.get(user_id, []) if option_index < len(options): answer = options[option_index] else: answer = str(option_index) # Resolve the pending future future = self._ask_user_futures.get(user_id) msg = query.message if isinstance(query.message, Message) else None if future and not future.done(): future.set_result(answer) # Edit the original message to show the selection if msg: try: selected_text = f"{msg.text}\n\n<b>Selected: {answer}</b>" await msg.edit_text(selected_text, parse_mode="HTML") except Exception: pass else: if msg: try: await msg.edit_text("(No pending question)") except Exception: pass async def _handle_location( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle location messages for ask_user location type.""" if ( not update.effective_user or not update.message or not update.message.location ): return user_id = update.effective_user.id location = update.message.location future = self._ask_user_futures.get(user_id) if ( future and not future.done() and self._ask_user_input_types.get(user_id) == "location" ): future.set_result(f"lat={location.latitude}, lon={location.longitude}") await update.message.reply_text( "Location received.", reply_markup=ReplyKeyboardRemove(), ) async def _send_upload_document( self, chat_id: int, file_path: Path, filename: str ) -> None: """Send an uploaded document back to a Telegram chat.""" if not self._application: raise RuntimeError("Application not initialized") bot = self._application.bot with open(file_path, "rb") as f: await bot.send_document(chat_id=chat_id, document=f, filename=filename) async def _send_scheduler_message(self, chat_id: int, text: str) -> None: """Send a message from the scheduler to a Telegram chat.""" if not self._application: logger.warning("Cannot send scheduler message: application not set") return bot = self._application.bot if len(text) > 4096: for i in range(0, len(text), 4096): await self._send_text(bot, chat_id, text[i : i + 4096]) else: await self._send_text(bot, chat_id, text) async def _summarize_for_scheduler(self, prompt: str, chat_id: int) -> str: """Call Claude to summarize task results.""" # Create a minimal context for summarization context = self.llm_client.tools.create_context(user_id=0) response, _, _topic, _ = await self.llm_client.chat( user_message=prompt, context=context, history=[], ) return response async def _send_file_change_notification(self, message: str) -> None: """Send file change notification to users who have enabled them.""" if not self._application: logger.warning("Cannot send file notification: application not set") return # Only notify users who have enabled file notifications enabled_users = self.session_manager.get_users_with_file_notifications() # Filter to allowed users only (in case DB has stale data) users_to_notify = [ uid for uid in enabled_users if uid in self.config.allowed_user_ids ] if not users_to_notify: return bot = self._application.bot for user_id in users_to_notify: try: await self._send_text(bot, user_id, message) except Exception as e: logger.warning(f"[{user_id}] Could not send file notification: {e}")
[docs] async def tasks_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Handle /tasks command.""" if not update.effective_user or not update.message: return user_id = update.effective_user.id if not self._is_authorized(user_id): await self._reply_text(update.message, "Unauthorized.") return tasks = self._scheduler.list_tasks(user_id) if not tasks: await self._reply_text(update.message, "No scheduled tasks.") return lines = [] for t in tasks[:20]: lines.append(f" {t.task_id} [{t.status.value}] {t.description}") await self._reply_text(update.message, "Scheduled tasks:\n" + "\n".join(lines))
async def _post_init(self, application: Application) -> None: # type: ignore[type-arg] """Called after application is initialized.""" await self._scheduler.start() logger.info("Task scheduler started") await self._file_watcher.start() # Send startup message to all allowed users await self._send_startup_messages() async def _send_startup_messages(self) -> None: """Send startup message to all allowed users with version update if applicable.""" if not self._application: return bot = self._application.bot for user_id in self.config.allowed_user_ids: try: # Check for version update last_version = self.session_manager.get_last_notified_version(user_id) if last_version and last_version != __version__: # Version changed - notify about update message = f"Hi! Folderbot is back online.\n\nUpdated: v{last_version} → v{__version__}" self.session_manager.set_last_notified_version(user_id, __version__) logger.info(f"[{user_id}] Startup with version update notification") elif last_version: # Same version - just say hi message = f"Hi! Folderbot is back online. (v{__version__})" logger.info(f"[{user_id}] Startup message sent") else: # First time user - set version, skip message # (they haven't used the bot yet, so don't spam them) self.session_manager.set_last_notified_version(user_id, __version__) logger.info( f"[{user_id}] First time user, version set, no startup message" ) continue # Send to user (user_id == chat_id for private chats) await self._send_text( bot, user_id, message, connect_timeout=10, read_timeout=10, ) except Exception as e: # User may have blocked bot or never started it logger.warning(f"[{user_id}] Could not send startup message: {e}") async def _post_shutdown(self, application: Application) -> None: # type: ignore[type-arg] """Called during application shutdown.""" await self._file_watcher.stop() await self._scheduler.shutdown() logger.info("Task scheduler stopped")
[docs] def run(self) -> None: """Run the bot.""" application = Application.builder().token(self.config.telegram_token).build() self._application = application # Scheduler lifecycle hooks application.post_init = self._post_init application.post_shutdown = self._post_shutdown # Add handlers application.add_handler(CommandHandler("start", self.start_command)) application.add_handler(CommandHandler("clear", self.clear_command)) application.add_handler(CommandHandler("new", self.new_command)) application.add_handler(CommandHandler("status", self.status_command)) application.add_handler(CommandHandler("files", self.files_command)) application.add_handler(CommandHandler("tasks", self.tasks_command)) application.add_handler(CallbackQueryHandler(self._handle_callback_query)) application.add_handler(MessageHandler(filters.LOCATION, self._handle_location)) application.add_handler( MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message) ) application.add_handler( MessageHandler(filters.Document.ALL, self.handle_document) ) application.add_handler(MessageHandler(filters.PHOTO, self.handle_photo)) application.add_handler(MessageHandler(filters.VOICE, self.handle_voice)) application.add_handler(MessageHandler(filters.AUDIO, self.handle_voice)) # Run the bot logger.info("Starting bot...") application.run_polling(allowed_updates=Update.ALL_TYPES)