Source code for folderbot.tools.scheduler_tools

"""Scheduler tools for scheduling and managing background tasks.

These tools are registered on the central folder_bot instance and executed
asynchronously through SchedulerServices.
"""

from __future__ import annotations

import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, Field

from .base import ToolResult
from .registry import folder_bot, get_service

if TYPE_CHECKING:
    from ..bot import BotContext
    from ..scheduler.models import Schedule
    from ..scheduler.scheduler import TaskScheduler


[docs] class TaskStepInput(BaseModel, frozen=True): """A single tool call within a task.""" tool_name: str = Field( description=( "Name of the tool to call (e.g., 'search_files', 'read_file', " "or any custom tool)" ) ) tool_input: dict[str, Any] = Field( description="Input parameters for the tool, matching the tool's input schema" )
[docs] class ScheduleTaskRequest(BaseModel, frozen=True): """Request for scheduling a new task.""" description: str = Field( description="Human-readable description of what this task does and why" ) steps: list[TaskStepInput] = Field( description=( "List of tool calls to execute each iteration. Usually one step, " "but can be multiple for multi-step tasks." ) ) schedule_type: str = Field( description=( "Type of schedule: 'once' (run once after delay), " "'repeating' (run at intervals), 'cron' (cron expression), " "'time_limited' (run until time expires)" ) ) delay_seconds: int = Field( default=0, description=( "For 'once': seconds to wait before executing. " "For 'repeating': initial delay before first execution." ), ) interval_seconds: int = Field( default=0, description=( "For 'repeating': seconds between executions. " "For 'time_limited': seconds between iterations." ), ) cron_expression: str = Field( default="", description=( "For 'cron': standard cron expression (e.g., '0 9 * * *' for daily at 9am)" ), ) duration_seconds: int = Field( default=0, description="For 'time_limited': maximum total duration in seconds", ) max_iterations: int = Field( default=0, description="Maximum number of iterations. 0 = unlimited (within schedule constraints)", ) summarize_on_complete: bool = Field( default=True, description="Whether to generate a Claude summary when the task completes", ) progress_interval: int = Field( default=1, description=( "Send progress update every N iterations. " "Set higher for fast-running tasks." ), )
[docs] class ListTasksRequest(BaseModel, frozen=True): """Request for listing scheduled tasks.""" status_filter: str = Field( default="", description=( "Filter by status: 'pending', 'running', 'completed', " "'cancelled', 'failed', 'active' (pending+running), " "'done' (completed+cancelled+failed). Empty for all." ), )
[docs] class CancelTaskRequest(BaseModel, frozen=True): """Request for cancelling a scheduled task.""" task_id: str = Field(description="ID of the task to cancel")
[docs] class GetTaskResultsRequest(BaseModel, frozen=True): """Request for getting results of a task.""" task_id: str = Field(description="ID of the task to get results for") last_n: int = Field( default=10, description="Number of most recent results to return", )
[docs] @dataclass class SchedulerServices: """Services for scheduler tool execution. This class wraps the TaskScheduler and provides async handlers for scheduler tools. It's stored in BotContext.services["scheduler"]. """ scheduler: "TaskScheduler" chat_id: int = 0
[docs] async def schedule_task( self, request: ScheduleTaskRequest, context: BotContext | None ) -> ToolResult: """Schedule a new task.""" from ..scheduler.models import Schedule, ScheduleType, TaskPlan, TaskStep user_id = context.user_id if context else 0 try: schedule_type = ScheduleType(request.schedule_type) except ValueError: return ToolResult( content=f"Invalid schedule_type: {request.schedule_type}. " f"Must be one of: once, repeating, cron, time_limited", is_error=True, ) steps = tuple( TaskStep(tool_name=s.tool_name, tool_input=s.tool_input) for s in request.steps ) schedule = Schedule( schedule_type=schedule_type, delay_seconds=request.delay_seconds, interval_seconds=request.interval_seconds, cron_expression=request.cron_expression, duration_seconds=request.duration_seconds, max_iterations=request.max_iterations, ) # Compute next_run_at based on schedule type now = datetime.now(timezone.utc) next_run_at = "" deadline_at = "" if schedule_type == ScheduleType.once: next_run_at = (now + timedelta(seconds=request.delay_seconds)).isoformat() elif schedule_type == ScheduleType.repeating: next_run_at = (now + timedelta(seconds=request.delay_seconds)).isoformat() elif schedule_type == ScheduleType.cron: next_run_at = self._next_cron_time(request.cron_expression).isoformat() elif schedule_type == ScheduleType.time_limited: next_run_at = now.isoformat() deadline_at = ( now + timedelta(seconds=request.duration_seconds) ).isoformat() plan = TaskPlan( task_id=uuid.uuid4().hex[:12], chat_id=self.chat_id, user_id=user_id, description=request.description, steps=steps, schedule=schedule, created_at=now.isoformat(), next_run_at=next_run_at, deadline_at=deadline_at, summarize_on_complete=request.summarize_on_complete, progress_interval=request.progress_interval, ) task_id = await self.scheduler.create_task(plan) schedule_desc = self._describe_schedule(schedule) return ToolResult( content=f"Task scheduled: {plan.description}\n" f"ID: {task_id}\n" f"Schedule: {schedule_desc}\n" f"Steps: {len(steps)} tool call(s) per iteration" )
[docs] async def list_tasks( self, request: ListTasksRequest, context: BotContext | None ) -> ToolResult: """List scheduled tasks.""" user_id = context.user_id if context else 0 tasks = self.scheduler.list_tasks(user_id, request.status_filter) if not tasks: return ToolResult(content="No scheduled tasks found.") lines = [] for t in tasks[:20]: schedule_desc = self._describe_schedule(t.schedule) lines.append( f"- [{t.task_id}] {t.description}\n" f" Status: {t.status.value} | Iterations: {t.current_iteration} | " f"Schedule: {schedule_desc}" ) return ToolResult(content="\n".join(lines))
[docs] async def cancel_task( self, request: CancelTaskRequest, context: BotContext | None ) -> ToolResult: """Cancel a scheduled task.""" user_id = context.user_id if context else 0 cancelled = await self.scheduler.cancel_task(request.task_id, user_id) if cancelled: return ToolResult(content=f"Task {request.task_id} cancelled.") return ToolResult( content=f"Task {request.task_id} not found or not owned by you.", is_error=True, )
[docs] async def get_task_results( self, request: GetTaskResultsRequest, context: BotContext | None ) -> ToolResult: """Get results of a scheduled task.""" user_id = context.user_id if context else 0 plan = self.scheduler.get_task_results(request.task_id, user_id) if plan is None: return ToolResult( content=f"Task {request.task_id} not found or not owned by you.", is_error=True, ) lines = [ f"Task: {plan.description}", f"Status: {plan.status.value}", f"Iterations: {plan.current_iteration}", "", f"Last {request.last_n} results:", ] for r in plan.results[-request.last_n :]: status = "ERROR" if r.is_error else "OK" content_preview = ( r.content[:300] + "..." if len(r.content) > 300 else r.content ) lines.append( f" [{r.iteration}] {r.tool_name} ({status}): {content_preview}" ) return ToolResult(content="\n".join(lines))
@staticmethod def _describe_schedule(schedule: "Schedule") -> str: """Human-readable schedule description.""" from ..scheduler.models import ScheduleType st = schedule.schedule_type if st == ScheduleType.once: if schedule.delay_seconds > 0: return f"once after {schedule.delay_seconds}s delay" return "once immediately" if st == ScheduleType.repeating: parts = f"every {schedule.interval_seconds}s" if schedule.max_iterations > 0: parts += f" (max {schedule.max_iterations} iterations)" return parts if st == ScheduleType.cron: return f"cron: {schedule.cron_expression}" if st == ScheduleType.time_limited: return ( f"for {schedule.duration_seconds}s, every {schedule.interval_seconds}s" ) return str(st) @staticmethod def _next_cron_time(expression: str) -> datetime: """Calculate next run time from a cron expression.""" from croniter import croniter cron = croniter(expression, datetime.now(timezone.utc)) return cron.get_next(datetime) # type: ignore[return-value]
# Register scheduler tools on folder_bot
[docs] @folder_bot.tool( name="schedule_task", request_type=ScheduleTaskRequest, response_type=ToolResult, description=( "Schedule a task to be executed later or repeatedly. The task will run " "the specified tool(s) on the given schedule and report progress via " "Telegram messages. Use this for: delayed execution ('check this in 30 min'), " "repeating tasks ('check every hour'), cron tasks ('daily at 9am'), or " "time-limited bursts ('try for 5 minutes')." ), ) async def schedule_task( request: ScheduleTaskRequest, _context: BotContext | None = None ) -> ToolResult: """Schedule a task to be executed later or repeatedly.""" services: SchedulerServices | None = get_service(_context, "scheduler") if services is None: return ToolResult(content="Scheduler not available", is_error=True) return await services.schedule_task(request, _context)
[docs] @folder_bot.tool( name="list_tasks", request_type=ListTasksRequest, response_type=ToolResult, description=( "List all scheduled tasks, optionally filtered by status. Shows task ID, " "description, schedule, status, and iteration count." ), ) async def list_tasks( request: ListTasksRequest, _context: BotContext | None = None ) -> ToolResult: """List all scheduled tasks.""" services: SchedulerServices | None = get_service(_context, "scheduler") if services is None: return ToolResult(content="Scheduler not available", is_error=True) return await services.list_tasks(request, _context)
[docs] @folder_bot.tool( name="cancel_task", request_type=CancelTaskRequest, response_type=ToolResult, description="Cancel a running or pending scheduled task.", ) async def cancel_task( request: CancelTaskRequest, _context: BotContext | None = None ) -> ToolResult: """Cancel a scheduled task.""" services: SchedulerServices | None = get_service(_context, "scheduler") if services is None: return ToolResult(content="Scheduler not available", is_error=True) return await services.cancel_task(request, _context)
[docs] @folder_bot.tool( name="get_task_results", request_type=GetTaskResultsRequest, response_type=ToolResult, description=( "Get the results of a scheduled task. Returns the most recent execution " "results. Useful for checking what a task has found so far." ), ) async def get_task_results( request: GetTaskResultsRequest, _context: BotContext | None = None ) -> ToolResult: """Get results of a scheduled task.""" services: SchedulerServices | None = get_service(_context, "scheduler") if services is None: return ToolResult(content="Scheduler not available", is_error=True) return await services.get_task_results(request, _context)