"""Telegram bot handler."""
import asyncio
import base64
import hashlib
import logging
import re
from pathlib import Path
from typing import Any
from telegram import (
Bot,
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
Update,
)
from telegram.error import BadRequest
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from . import __version__
from .llm_client import AskUserRequest, LLMClient
from .status_notifier import StatusNotifier
from .config import Config
from .context_builder import ContextBuilder
from .file_watcher import FileWatcher
from .migration import migrate_database
from .scheduler import TaskScheduler, TaskStore
from .session_manager import SessionManager
from .tools.upload_tools import UploadServices
from .transcription import transcribe_audio
logger = logging.getLogger(__name__)
def _markdown_to_html(text: str) -> str:
"""Convert common Markdown formatting to Telegram HTML.
Handles:
- **bold** or __bold__ → <b>bold</b>
- *italic* or _italic_ → <i>italic</i>
- `code` → <code>code</code>
- ```code block``` → <pre>code block</pre>
"""
# Code blocks first (``` ... ```) - must come before inline code
text = re.sub(r"```(\w*)\n?(.*?)```", r"<pre>\2</pre>", text, flags=re.DOTALL)
# Inline code (` ... `)
text = re.sub(r"`([^`]+)`", r"<code>\1</code>", text)
# Bold: **text** or __text__ (must come before italic)
text = re.sub(r"\*\*(.+?)\*\*", r"<b>\1</b>", text)
text = re.sub(r"__(.+?)__", r"<b>\1</b>", text)
# Italic: *text* or _text_ (but not inside words like file_name)
# Only match * for italic to avoid false positives with underscores in names
text = re.sub(r"(?<!\w)\*([^*]+)\*(?!\w)", r"<i>\1</i>", text)
return text
[docs]
class TelegramBot:
"""Main Telegram bot class."""
[docs]
def __init__(self, config: Config):
self.config = config
self.context_builder = ContextBuilder(config)
# Migrate data from old global database if needed
migrate_database(config.db_path, config.allowed_user_ids)
self.session_manager = SessionManager(config.db_path)
# Initialize scheduler
self._task_store = TaskStore(config.db_path)
self._scheduler = TaskScheduler(
task_store=self._task_store,
send_message=self._send_scheduler_message,
summarize=self._summarize_for_scheduler,
)
# Create LLM client (scheduler tools registered on folder_bot)
self.llm_client = LLMClient(config)
# Wire up dependencies:
# - Scheduler needs folder_tools to execute tools
# - FolderTools needs scheduler for SchedulerServices
# - FolderTools needs session_manager for file notification preferences
self._scheduler.set_folder_tools(self.llm_client.tools)
self.llm_client.tools.set_scheduler(self._scheduler)
self.llm_client.tools.set_session_manager(self.session_manager)
# Wire up upload services
uploads_dir = config.root_folder / ".folderbot" / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
self.llm_client.tools.set_upload_services(
UploadServices(
session_manager=self.session_manager,
uploads_dir=uploads_dir,
send_document=self._send_upload_document,
)
)
# Initialize file watcher (sends notifications on file changes)
self._file_watcher = FileWatcher(
config.root_folder,
config.watch_config,
self._send_file_change_notification,
)
# Application reference (set in run())
self._application: Application | None = None # type: ignore[type-arg]
# Track pending messages per user (accumulated while processing)
self._pending_messages: dict[int, list[str]] = {}
# Track pending images per user (for multimodal LLM input)
self._pending_images: dict[int, list[dict[str, str]]] = {}
# Track current processing task per user (for cancellation)
self._processing_tasks: dict[int, asyncio.Task] = {}
# Track the latest update object for sending responses
self._pending_updates: dict[int, Update] = {}
# Track cancelled state per user (to prevent responses after cancellation)
self._cancelled_users: set[int] = set()
# Track messages currently being processed (to restore on cancellation)
self._processing_messages: dict[int, list[str]] = {}
# Pending ask_user futures per user (user_id -> Future[str])
self._ask_user_futures: dict[int, asyncio.Future[str]] = {}
# Track expected input type per user (for routing responses)
self._ask_user_input_types: dict[int, str] = {}
# Track option mappings per user (index -> option text)
self._ask_user_options: dict[int, list[str]] = {}
def _is_authorized(self, user_id: int) -> bool:
"""Check if user is authorized."""
return user_id in self.config.allowed_user_ids
@staticmethod
async def _reply_text(message: Message, text: str, **kwargs: Any) -> None:
"""Reply with HTML parse_mode, falling back to plain text on failure."""
text = _markdown_to_html(text)
try:
await message.reply_text(text, parse_mode="HTML", **kwargs)
except BadRequest:
await message.reply_text(text, **kwargs)
@staticmethod
async def _send_text(bot: Bot, chat_id: int, text: str, **kwargs: Any) -> None:
"""Send message with HTML parse_mode, falling back to plain text on failure."""
text = _markdown_to_html(text)
try:
await bot.send_message(
chat_id=chat_id, text=text, parse_mode="HTML", **kwargs
)
except BadRequest:
await bot.send_message(chat_id=chat_id, text=text, **kwargs)
async def _check_version_notification(self, user_id: int, update: Update) -> None:
"""Check if user needs to be notified about a new version."""
if not update.message:
return
last_version = self.session_manager.get_last_notified_version(user_id)
if last_version != __version__:
logger.info(
f"[{user_id}] New version notification: {last_version} -> {__version__}"
)
await self._reply_text(
update.message, f"Folderbot updated to v{__version__}"
)
self.session_manager.set_last_notified_version(user_id, __version__)
[docs]
async def start_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /start command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
await self._reply_text(
update.message,
"Self-bot ready. Send me a message to chat with your folder.\n\n"
"Commands:\n"
"/clear - Clear conversation history\n"
"/new - Start new topic\n"
"/status - Show session info\n"
"/files - List files in context\n"
"/tasks - List scheduled tasks",
)
[docs]
async def clear_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /clear command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
self.session_manager.clear_session(user_id)
await self._reply_text(update.message, "Conversation history cleared.")
[docs]
async def new_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /new command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
# Clear session and signal new topic
self.session_manager.clear_session(user_id)
await self._reply_text(update.message, "New topic started. History cleared.")
[docs]
async def status_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /status command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
session_info = self.session_manager.get_session_info(user_id)
context_stats = self.context_builder.get_context_stats()
status = (
f"Session:\n"
f" Messages: {session_info['message_count']}\n"
f" Last update: {session_info['updated_at'] or 'Never'}\n\n"
f"Context:\n"
f" Files: {context_stats['file_count']}\n"
f" Size: {context_stats['total_chars']:,} chars\n"
f" Cache age: {context_stats['cache_age_seconds']}s"
)
await self._reply_text(update.message, status)
[docs]
async def files_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /files command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
files = self.context_builder.get_file_list()
if not files:
await self._reply_text(update.message, "No files in context.")
return
# Truncate if too many files
if len(files) > 50:
file_list = "\n".join(files[:50])
file_list += f"\n... and {len(files) - 50} more"
else:
file_list = "\n".join(files)
await self._reply_text(
update.message, f"Files in context ({len(files)}):\n\n{file_list}"
)
[docs]
async def handle_message(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle regular messages with immediate processing and restart on new message."""
if not update.effective_user or not update.message or not update.message.text:
return
user_id = update.effective_user.id
user_message = update.message.text
logger.info(f"[{user_id}] Message received: {user_message[:50]}...")
# Check if there's a pending ask_user text input for this user
if (
user_id in self._ask_user_futures
and self._ask_user_input_types.get(user_id) == "text"
):
future = self._ask_user_futures[user_id]
if not future.done():
future.set_result(user_message)
return
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
# Check for version update notification (only on first message)
await self._check_version_notification(user_id, update)
# Add message to pending list
if user_id not in self._pending_messages:
self._pending_messages[user_id] = []
self._pending_messages[user_id].append(user_message)
self._pending_updates[user_id] = update
logger.info(
f"[{user_id}] Added to pending. Total pending: {len(self._pending_messages[user_id])}"
)
# Check if already processing for this user
if user_id in self._processing_tasks:
task = self._processing_tasks[user_id]
if not task.done():
# Already processing - cancel it, messages are accumulated
logger.info(f"[{user_id}] Existing task found, cancelling...")
self._cancelled_users.add(user_id)
task.cancel()
# Wait for cancellation to complete before starting new task
try:
await task
except asyncio.CancelledError:
pass
logger.info(f"[{user_id}] Previous task cancelled successfully")
# Restore messages that were being processed
if user_id in self._processing_messages:
cancelled_msgs = self._processing_messages.pop(user_id)
logger.info(
f"[{user_id}] Restoring {len(cancelled_msgs)} cancelled message(s)"
)
# Prepend to pending so they come first
if user_id in self._pending_messages:
self._pending_messages[user_id] = (
cancelled_msgs + self._pending_messages[user_id]
)
else:
self._pending_messages[user_id] = cancelled_msgs
else:
logger.info(f"[{user_id}] Existing task already done")
# Clear cancelled state before starting new processing
self._cancelled_users.discard(user_id)
# Start processing with all accumulated messages
await self._start_processing(user_id)
[docs]
async def handle_document(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle document uploads."""
if (
not update.effective_user
or not update.message
or not update.message.document
):
return
user_id = update.effective_user.id
document = update.message.document
logger.info(f"[{user_id}] Document received: {document.file_name}")
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
try:
file = await document.get_file()
file_bytes = await file.download_as_bytearray()
except Exception as e:
logger.exception("Error downloading document")
await self._reply_text(update.message, f"Error downloading file: {e}")
return
file_hash = hashlib.sha256(file_bytes).hexdigest()
original_filename = document.file_name or "unknown"
extension = Path(original_filename).suffix
uploads_dir = self.config.root_folder / ".folderbot" / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
file_path = uploads_dir / file_hash
file_path.write_bytes(file_bytes)
record = self.session_manager.save_upload(
user_id=user_id,
original_filename=original_filename,
hash_filename=file_hash,
extension=extension,
file_size=len(file_bytes),
mime_type=document.mime_type or "application/octet-stream",
)
logger.info(
f"[{user_id}] Upload saved: {original_filename} -> {file_hash} "
f"({record.file_size} bytes)"
)
await self._reply_text(
update.message,
f"Uploaded: {original_filename} ({record.file_size:,} bytes)",
)
[docs]
async def handle_photo(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle photo uploads: save to uploads and process with LLM vision."""
if not update.effective_user or not update.message or not update.message.photo:
return
user_id = update.effective_user.id
logger.info(f"[{user_id}] Photo received")
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
try:
# Get the largest photo resolution (last in the list)
photo = update.message.photo[-1]
file = await photo.get_file()
file_bytes = bytes(await file.download_as_bytearray())
except Exception as e:
logger.exception("Error downloading photo")
await self._reply_text(update.message, f"Error downloading photo: {e}")
return
# Save to uploads
file_hash = hashlib.sha256(file_bytes).hexdigest()
uploads_dir = self.config.root_folder / ".folderbot" / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
file_path = uploads_dir / file_hash
file_path.write_bytes(file_bytes)
self.session_manager.save_upload(
user_id=user_id,
original_filename=f"photo_{file_hash[:8]}.jpg",
hash_filename=file_hash,
extension=".jpg",
file_size=len(file_bytes),
mime_type="image/jpeg",
)
logger.info(f"[{user_id}] Photo saved: {file_hash} ({len(file_bytes)} bytes)")
# Queue for LLM processing with image data
caption = update.message.caption or "Describe this image."
image_b64 = base64.b64encode(file_bytes).decode("utf-8")
if user_id not in self._pending_messages:
self._pending_messages[user_id] = []
self._pending_messages[user_id].append(caption)
self._pending_updates[user_id] = update
if user_id not in self._pending_images:
self._pending_images[user_id] = []
self._pending_images[user_id].append(
{"data": image_b64, "media_type": "image/jpeg"}
)
await self._start_processing(user_id)
[docs]
async def handle_voice(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle voice messages and audio files by transcribing and processing as text."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
voice = update.message.voice
audio = update.message.audio
try:
if voice:
file = await voice.get_file()
elif audio:
file = await audio.get_file()
else:
return
file_bytes = bytes(await file.download_as_bytearray())
result = await transcribe_audio(
audio_bytes=file_bytes,
model_name=self.config.whisper_model,
)
except Exception as e:
logger.exception("Error during voice transcription")
await self._reply_text(update.message, f"Transcription failed: {e}")
return
self._save_voice_recording(file_bytes)
user_message = f"[Transcribed from voice message]\n{result.text}"
logger.info(
f"[{user_id}] Voice transcribed ({len(result.text)} chars): "
f"{result.text[:50]}..."
)
if user_id not in self._pending_messages:
self._pending_messages[user_id] = []
self._pending_messages[user_id].append(user_message)
self._pending_updates[user_id] = update
await self._start_processing(user_id)
def _save_voice_recording(self, file_bytes: bytes) -> Path:
"""Save a voice recording to the voice directory."""
from datetime import datetime, timezone
voice_dir = self.config.root_folder / ".folderbot" / "voice"
voice_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
file_path = voice_dir / f"{timestamp}.ogg"
file_path.write_bytes(file_bytes)
logger.info(f"Saved voice recording to {file_path}")
return file_path
async def _start_processing(self, user_id: int) -> None:
"""Start processing accumulated messages for a user."""
# Collect all pending messages and images
messages = self._pending_messages.pop(user_id, [])
images = self._pending_images.pop(user_id, [])
update = self._pending_updates.pop(user_id, None)
if not messages or not update or not update.message:
logger.info(f"[{user_id}] _start_processing: no messages to process")
return
# Track messages being processed (for restoration if cancelled)
self._processing_messages[user_id] = messages
# Combine multiple messages into one
combined_message = "\n".join(messages)
logger.info(f"[{user_id}] Starting processing with {len(messages)} message(s)")
# Start processing task (can be cancelled if new message arrives)
task = asyncio.create_task(
self._process_message(
user_id, combined_message, update, images=images or None
)
)
self._processing_tasks[user_id] = task
logger.info(f"[{user_id}] Processing task created")
async def _process_message(
self,
user_id: int,
user_message: str,
update: Update,
images: list[dict[str, str]] | None = None,
) -> None:
"""Process a message and send response."""
if not update.message:
return
logger.info(f"[{user_id}] _process_message started")
chat_id = update.message.chat_id # type: ignore[union-attr]
notifier = StatusNotifier(
chat_id=chat_id,
bot=self._application.bot, # type: ignore[union-attr]
)
await notifier.start()
try:
if user_id in self._cancelled_users:
logger.info(f"[{user_id}] Cancelled before API call, aborting")
return
history = self.session_manager.get_history(user_id)
context = self.llm_client.tools.create_context(user_id)
context.services["session"] = self.session_manager
logger.info(f"[{user_id}] Calling LLM API...")
async def on_ask_user(request: AskUserRequest) -> str:
return await self._handle_ask_user(user_id, chat_id, request)
response, tools_used, topic, usage = await self.llm_client.chat(
user_message,
context,
history,
on_tool_use=notifier.update,
on_ask_user=on_ask_user,
chat_id=chat_id,
images=images,
)
self.session_manager.record_token_usage(
user_id=user_id,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
model=self.config.model,
topic=topic,
)
display_response = response
if tools_used:
seen: set[str] = set()
unique_tools: list[str] = []
for t in tools_used:
if t not in seen:
seen.add(t)
unique_tools.append(t)
display_response += f"\n\n<i>Tools: {', '.join(unique_tools)}</i>"
self.llm_client.tools.activity_logger.log_message(
direction="user",
content=user_message,
user_id=user_id,
)
self.llm_client.tools.activity_logger.log_message(
direction="assistant",
content=response,
user_id=user_id,
tools_used=tools_used if tools_used else None,
)
logger.info(
f"[{user_id}] LLM API returned, response length: {len(response)}"
)
if asyncio.current_task() and asyncio.current_task().cancelled(): # type: ignore[union-attr]
logger.info(f"[{user_id}] Task cancelled after API call, aborting")
return
if user_id in self._cancelled_users:
logger.info(f"[{user_id}] Cancelled flag set after API call, aborting")
return
self.session_manager.add_message(user_id, "user", user_message, topic=topic)
self.session_manager.add_message(
user_id, "assistant", response, topic=topic
)
if user_id in self._cancelled_users:
logger.info(f"[{user_id}] Cancelled before send, aborting")
return
logger.info(f"[{user_id}] Sending response to Telegram...")
if len(display_response) > 4096:
for i in range(0, len(display_response), 4096):
if user_id in self._cancelled_users:
logger.info(
f"[{user_id}] Cancelled during split send, aborting"
)
return
await self._reply_text(
update.message, display_response[i : i + 4096]
)
else:
await self._reply_text(update.message, display_response)
logger.info(f"[{user_id}] Response sent successfully")
self._processing_messages.pop(user_id, None)
except asyncio.CancelledError:
logger.info(f"[{user_id}] CancelledError caught, response cancelled")
except Exception as e:
logger.exception("Error handling message")
await self._reply_text(update.message, f"Error: {e}")
finally:
await notifier.stop()
[docs]
async def wait_for_processing(self, user_id: int) -> None:
"""Wait for any pending processing to complete. For testing."""
if user_id in self._processing_tasks:
task = self._processing_tasks[user_id]
if not task.done():
try:
await task
except asyncio.CancelledError:
pass
ASK_USER_TIMEOUT_SECONDS = 120
async def _handle_ask_user(
self, user_id: int, chat_id: int, request: AskUserRequest
) -> str:
"""Handle an ask_user tool call from the agent loop.
Sends the appropriate Telegram UI and waits for the user's response.
Returns the user's answer as a string.
Raises asyncio.TimeoutError if the user doesn't respond in time.
"""
if not self._application:
raise RuntimeError("Application not initialized")
bot = self._application.bot
loop = asyncio.get_running_loop()
future: asyncio.Future[str] = loop.create_future()
self._ask_user_futures[user_id] = future
self._ask_user_input_types[user_id] = request.input_type
try:
if request.input_type == "choice":
options = request.options
self._ask_user_options[user_id] = options
buttons = [
[InlineKeyboardButton(opt, callback_data=f"ask:{user_id}:{i}")]
for i, opt in enumerate(options)
]
markup = InlineKeyboardMarkup(buttons)
await self._send_text(
bot, chat_id, request.question, reply_markup=markup
)
elif request.input_type == "confirm":
options = request.options if request.options else ["Yes", "No"]
self._ask_user_options[user_id] = options
buttons = [
[
InlineKeyboardButton(opt, callback_data=f"ask:{user_id}:{i}")
for i, opt in enumerate(options)
]
]
markup = InlineKeyboardMarkup(buttons)
await self._send_text(
bot, chat_id, request.question, reply_markup=markup
)
elif request.input_type == "text":
await self._send_text(bot, chat_id, request.question)
elif request.input_type == "location":
button = KeyboardButton("Share Location", request_location=True)
location_markup = ReplyKeyboardMarkup(
[[button]], one_time_keyboard=True, resize_keyboard=True
)
await self._send_text(
bot, chat_id, request.question, reply_markup=location_markup
)
return await asyncio.wait_for(future, timeout=self.ASK_USER_TIMEOUT_SECONDS)
except asyncio.TimeoutError:
await self._send_text(bot, chat_id, "(Question timed out)")
raise
finally:
self._ask_user_futures.pop(user_id, None)
self._ask_user_input_types.pop(user_id, None)
self._ask_user_options.pop(user_id, None)
async def _handle_callback_query(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle inline keyboard button presses for ask_user."""
query = update.callback_query
if not query or not query.data:
return
await query.answer()
# Parse callback data: "ask:<user_id>:<index>"
parts = query.data.split(":", 2)
if len(parts) != 3 or parts[0] != "ask":
return
try:
user_id = int(parts[1])
option_index = int(parts[2])
except ValueError:
return
# Look up the option text from the stored mapping
options = self._ask_user_options.get(user_id, [])
if option_index < len(options):
answer = options[option_index]
else:
answer = str(option_index)
# Resolve the pending future
future = self._ask_user_futures.get(user_id)
msg = query.message if isinstance(query.message, Message) else None
if future and not future.done():
future.set_result(answer)
# Edit the original message to show the selection
if msg:
try:
selected_text = f"{msg.text}\n\n<b>Selected: {answer}</b>"
await msg.edit_text(selected_text, parse_mode="HTML")
except Exception:
pass
else:
if msg:
try:
await msg.edit_text("(No pending question)")
except Exception:
pass
async def _handle_location(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle location messages for ask_user location type."""
if (
not update.effective_user
or not update.message
or not update.message.location
):
return
user_id = update.effective_user.id
location = update.message.location
future = self._ask_user_futures.get(user_id)
if (
future
and not future.done()
and self._ask_user_input_types.get(user_id) == "location"
):
future.set_result(f"lat={location.latitude}, lon={location.longitude}")
await update.message.reply_text(
"Location received.",
reply_markup=ReplyKeyboardRemove(),
)
async def _send_upload_document(
self, chat_id: int, file_path: Path, filename: str
) -> None:
"""Send an uploaded document back to a Telegram chat."""
if not self._application:
raise RuntimeError("Application not initialized")
bot = self._application.bot
with open(file_path, "rb") as f:
await bot.send_document(chat_id=chat_id, document=f, filename=filename)
async def _send_scheduler_message(self, chat_id: int, text: str) -> None:
"""Send a message from the scheduler to a Telegram chat."""
if not self._application:
logger.warning("Cannot send scheduler message: application not set")
return
bot = self._application.bot
if len(text) > 4096:
for i in range(0, len(text), 4096):
await self._send_text(bot, chat_id, text[i : i + 4096])
else:
await self._send_text(bot, chat_id, text)
async def _summarize_for_scheduler(self, prompt: str, chat_id: int) -> str:
"""Call Claude to summarize task results."""
# Create a minimal context for summarization
context = self.llm_client.tools.create_context(user_id=0)
response, _, _topic, _ = await self.llm_client.chat(
user_message=prompt,
context=context,
history=[],
)
return response
async def _send_file_change_notification(self, message: str) -> None:
"""Send file change notification to users who have enabled them."""
if not self._application:
logger.warning("Cannot send file notification: application not set")
return
# Only notify users who have enabled file notifications
enabled_users = self.session_manager.get_users_with_file_notifications()
# Filter to allowed users only (in case DB has stale data)
users_to_notify = [
uid for uid in enabled_users if uid in self.config.allowed_user_ids
]
if not users_to_notify:
return
bot = self._application.bot
for user_id in users_to_notify:
try:
await self._send_text(bot, user_id, message)
except Exception as e:
logger.warning(f"[{user_id}] Could not send file notification: {e}")
[docs]
async def tasks_command(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Handle /tasks command."""
if not update.effective_user or not update.message:
return
user_id = update.effective_user.id
if not self._is_authorized(user_id):
await self._reply_text(update.message, "Unauthorized.")
return
tasks = self._scheduler.list_tasks(user_id)
if not tasks:
await self._reply_text(update.message, "No scheduled tasks.")
return
lines = []
for t in tasks[:20]:
lines.append(f" {t.task_id} [{t.status.value}] {t.description}")
await self._reply_text(update.message, "Scheduled tasks:\n" + "\n".join(lines))
async def _post_init(self, application: Application) -> None: # type: ignore[type-arg]
"""Called after application is initialized."""
await self._scheduler.start()
logger.info("Task scheduler started")
await self._file_watcher.start()
# Send startup message to all allowed users
await self._send_startup_messages()
async def _send_startup_messages(self) -> None:
"""Send startup message to all allowed users with version update if applicable."""
if not self._application:
return
bot = self._application.bot
for user_id in self.config.allowed_user_ids:
try:
# Check for version update
last_version = self.session_manager.get_last_notified_version(user_id)
if last_version and last_version != __version__:
# Version changed - notify about update
message = f"Hi! Folderbot is back online.\n\nUpdated: v{last_version} → v{__version__}"
self.session_manager.set_last_notified_version(user_id, __version__)
logger.info(f"[{user_id}] Startup with version update notification")
elif last_version:
# Same version - just say hi
message = f"Hi! Folderbot is back online. (v{__version__})"
logger.info(f"[{user_id}] Startup message sent")
else:
# First time user - set version, skip message
# (they haven't used the bot yet, so don't spam them)
self.session_manager.set_last_notified_version(user_id, __version__)
logger.info(
f"[{user_id}] First time user, version set, no startup message"
)
continue
# Send to user (user_id == chat_id for private chats)
await self._send_text(
bot,
user_id,
message,
connect_timeout=10,
read_timeout=10,
)
except Exception as e:
# User may have blocked bot or never started it
logger.warning(f"[{user_id}] Could not send startup message: {e}")
async def _post_shutdown(self, application: Application) -> None: # type: ignore[type-arg]
"""Called during application shutdown."""
await self._file_watcher.stop()
await self._scheduler.shutdown()
logger.info("Task scheduler stopped")
[docs]
def run(self) -> None:
"""Run the bot."""
application = Application.builder().token(self.config.telegram_token).build()
self._application = application
# Scheduler lifecycle hooks
application.post_init = self._post_init
application.post_shutdown = self._post_shutdown
# Add handlers
application.add_handler(CommandHandler("start", self.start_command))
application.add_handler(CommandHandler("clear", self.clear_command))
application.add_handler(CommandHandler("new", self.new_command))
application.add_handler(CommandHandler("status", self.status_command))
application.add_handler(CommandHandler("files", self.files_command))
application.add_handler(CommandHandler("tasks", self.tasks_command))
application.add_handler(CallbackQueryHandler(self._handle_callback_query))
application.add_handler(MessageHandler(filters.LOCATION, self._handle_location))
application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)
)
application.add_handler(
MessageHandler(filters.Document.ALL, self.handle_document)
)
application.add_handler(MessageHandler(filters.PHOTO, self.handle_photo))
application.add_handler(MessageHandler(filters.VOICE, self.handle_voice))
application.add_handler(MessageHandler(filters.AUDIO, self.handle_voice))
# Run the bot
logger.info("Starting bot...")
application.run_polling(allowed_updates=Update.ALL_TYPES)