"""Scheduler tools for scheduling and managing background tasks.
These tools are registered on the central folder_bot instance and executed
asynchronously through SchedulerServices.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field
from .base import ToolResult
from .registry import folder_bot, get_service
if TYPE_CHECKING:
from ..bot import BotContext
from ..scheduler.models import Schedule
from ..scheduler.scheduler import TaskScheduler
[docs]
class ScheduleTaskRequest(BaseModel, frozen=True):
"""Request for scheduling a new task."""
description: str = Field(
description="Human-readable description of what this task does and why"
)
steps: list[TaskStepInput] = Field(
description=(
"List of tool calls to execute each iteration. Usually one step, "
"but can be multiple for multi-step tasks."
)
)
schedule_type: str = Field(
description=(
"Type of schedule: 'once' (run once after delay), "
"'repeating' (run at intervals), 'cron' (cron expression), "
"'time_limited' (run until time expires)"
)
)
delay_seconds: int = Field(
default=0,
description=(
"For 'once': seconds to wait before executing. "
"For 'repeating': initial delay before first execution."
),
)
interval_seconds: int = Field(
default=0,
description=(
"For 'repeating': seconds between executions. "
"For 'time_limited': seconds between iterations."
),
)
cron_expression: str = Field(
default="",
description=(
"For 'cron': standard cron expression (e.g., '0 9 * * *' for daily at 9am)"
),
)
duration_seconds: int = Field(
default=0,
description="For 'time_limited': maximum total duration in seconds",
)
max_iterations: int = Field(
default=0,
description="Maximum number of iterations. 0 = unlimited (within schedule constraints)",
)
summarize_on_complete: bool = Field(
default=True,
description="Whether to generate a Claude summary when the task completes",
)
progress_interval: int = Field(
default=1,
description=(
"Send progress update every N iterations. "
"Set higher for fast-running tasks."
),
)
[docs]
class ListTasksRequest(BaseModel, frozen=True):
"""Request for listing scheduled tasks."""
status_filter: str = Field(
default="",
description=(
"Filter by status: 'pending', 'running', 'completed', "
"'cancelled', 'failed', 'active' (pending+running), "
"'done' (completed+cancelled+failed). Empty for all."
),
)
[docs]
class CancelTaskRequest(BaseModel, frozen=True):
"""Request for cancelling a scheduled task."""
task_id: str = Field(description="ID of the task to cancel")
[docs]
class GetTaskResultsRequest(BaseModel, frozen=True):
"""Request for getting results of a task."""
task_id: str = Field(description="ID of the task to get results for")
last_n: int = Field(
default=10,
description="Number of most recent results to return",
)
[docs]
@dataclass
class SchedulerServices:
"""Services for scheduler tool execution.
This class wraps the TaskScheduler and provides async handlers for
scheduler tools. It's stored in BotContext.services["scheduler"].
"""
scheduler: "TaskScheduler"
chat_id: int = 0
[docs]
async def schedule_task(
self, request: ScheduleTaskRequest, context: BotContext | None
) -> ToolResult:
"""Schedule a new task."""
from ..scheduler.models import Schedule, ScheduleType, TaskPlan, TaskStep
user_id = context.user_id if context else 0
try:
schedule_type = ScheduleType(request.schedule_type)
except ValueError:
return ToolResult(
content=f"Invalid schedule_type: {request.schedule_type}. "
f"Must be one of: once, repeating, cron, time_limited",
is_error=True,
)
steps = tuple(
TaskStep(tool_name=s.tool_name, tool_input=s.tool_input)
for s in request.steps
)
schedule = Schedule(
schedule_type=schedule_type,
delay_seconds=request.delay_seconds,
interval_seconds=request.interval_seconds,
cron_expression=request.cron_expression,
duration_seconds=request.duration_seconds,
max_iterations=request.max_iterations,
)
# Compute next_run_at based on schedule type
now = datetime.now(timezone.utc)
next_run_at = ""
deadline_at = ""
if schedule_type == ScheduleType.once:
next_run_at = (now + timedelta(seconds=request.delay_seconds)).isoformat()
elif schedule_type == ScheduleType.repeating:
next_run_at = (now + timedelta(seconds=request.delay_seconds)).isoformat()
elif schedule_type == ScheduleType.cron:
next_run_at = self._next_cron_time(request.cron_expression).isoformat()
elif schedule_type == ScheduleType.time_limited:
next_run_at = now.isoformat()
deadline_at = (
now + timedelta(seconds=request.duration_seconds)
).isoformat()
plan = TaskPlan(
task_id=uuid.uuid4().hex[:12],
chat_id=self.chat_id,
user_id=user_id,
description=request.description,
steps=steps,
schedule=schedule,
created_at=now.isoformat(),
next_run_at=next_run_at,
deadline_at=deadline_at,
summarize_on_complete=request.summarize_on_complete,
progress_interval=request.progress_interval,
)
task_id = await self.scheduler.create_task(plan)
schedule_desc = self._describe_schedule(schedule)
return ToolResult(
content=f"Task scheduled: {plan.description}\n"
f"ID: {task_id}\n"
f"Schedule: {schedule_desc}\n"
f"Steps: {len(steps)} tool call(s) per iteration"
)
[docs]
async def list_tasks(
self, request: ListTasksRequest, context: BotContext | None
) -> ToolResult:
"""List scheduled tasks."""
user_id = context.user_id if context else 0
tasks = self.scheduler.list_tasks(user_id, request.status_filter)
if not tasks:
return ToolResult(content="No scheduled tasks found.")
lines = []
for t in tasks[:20]:
schedule_desc = self._describe_schedule(t.schedule)
lines.append(
f"- [{t.task_id}] {t.description}\n"
f" Status: {t.status.value} | Iterations: {t.current_iteration} | "
f"Schedule: {schedule_desc}"
)
return ToolResult(content="\n".join(lines))
[docs]
async def cancel_task(
self, request: CancelTaskRequest, context: BotContext | None
) -> ToolResult:
"""Cancel a scheduled task."""
user_id = context.user_id if context else 0
cancelled = await self.scheduler.cancel_task(request.task_id, user_id)
if cancelled:
return ToolResult(content=f"Task {request.task_id} cancelled.")
return ToolResult(
content=f"Task {request.task_id} not found or not owned by you.",
is_error=True,
)
[docs]
async def get_task_results(
self, request: GetTaskResultsRequest, context: BotContext | None
) -> ToolResult:
"""Get results of a scheduled task."""
user_id = context.user_id if context else 0
plan = self.scheduler.get_task_results(request.task_id, user_id)
if plan is None:
return ToolResult(
content=f"Task {request.task_id} not found or not owned by you.",
is_error=True,
)
lines = [
f"Task: {plan.description}",
f"Status: {plan.status.value}",
f"Iterations: {plan.current_iteration}",
"",
f"Last {request.last_n} results:",
]
for r in plan.results[-request.last_n :]:
status = "ERROR" if r.is_error else "OK"
content_preview = (
r.content[:300] + "..." if len(r.content) > 300 else r.content
)
lines.append(
f" [{r.iteration}] {r.tool_name} ({status}): {content_preview}"
)
return ToolResult(content="\n".join(lines))
@staticmethod
def _describe_schedule(schedule: "Schedule") -> str:
"""Human-readable schedule description."""
from ..scheduler.models import ScheduleType
st = schedule.schedule_type
if st == ScheduleType.once:
if schedule.delay_seconds > 0:
return f"once after {schedule.delay_seconds}s delay"
return "once immediately"
if st == ScheduleType.repeating:
parts = f"every {schedule.interval_seconds}s"
if schedule.max_iterations > 0:
parts += f" (max {schedule.max_iterations} iterations)"
return parts
if st == ScheduleType.cron:
return f"cron: {schedule.cron_expression}"
if st == ScheduleType.time_limited:
return (
f"for {schedule.duration_seconds}s, every {schedule.interval_seconds}s"
)
return str(st)
@staticmethod
def _next_cron_time(expression: str) -> datetime:
"""Calculate next run time from a cron expression."""
from croniter import croniter
cron = croniter(expression, datetime.now(timezone.utc))
return cron.get_next(datetime) # type: ignore[return-value]
# Register scheduler tools on folder_bot
[docs]
@folder_bot.tool(
name="schedule_task",
request_type=ScheduleTaskRequest,
response_type=ToolResult,
description=(
"Schedule a task to be executed later or repeatedly. The task will run "
"the specified tool(s) on the given schedule and report progress via "
"Telegram messages. Use this for: delayed execution ('check this in 30 min'), "
"repeating tasks ('check every hour'), cron tasks ('daily at 9am'), or "
"time-limited bursts ('try for 5 minutes')."
),
)
async def schedule_task(
request: ScheduleTaskRequest, _context: BotContext | None = None
) -> ToolResult:
"""Schedule a task to be executed later or repeatedly."""
services: SchedulerServices | None = get_service(_context, "scheduler")
if services is None:
return ToolResult(content="Scheduler not available", is_error=True)
return await services.schedule_task(request, _context)
[docs]
@folder_bot.tool(
name="list_tasks",
request_type=ListTasksRequest,
response_type=ToolResult,
description=(
"List all scheduled tasks, optionally filtered by status. Shows task ID, "
"description, schedule, status, and iteration count."
),
)
async def list_tasks(
request: ListTasksRequest, _context: BotContext | None = None
) -> ToolResult:
"""List all scheduled tasks."""
services: SchedulerServices | None = get_service(_context, "scheduler")
if services is None:
return ToolResult(content="Scheduler not available", is_error=True)
return await services.list_tasks(request, _context)
[docs]
@folder_bot.tool(
name="cancel_task",
request_type=CancelTaskRequest,
response_type=ToolResult,
description="Cancel a running or pending scheduled task.",
)
async def cancel_task(
request: CancelTaskRequest, _context: BotContext | None = None
) -> ToolResult:
"""Cancel a scheduled task."""
services: SchedulerServices | None = get_service(_context, "scheduler")
if services is None:
return ToolResult(content="Scheduler not available", is_error=True)
return await services.cancel_task(request, _context)
[docs]
@folder_bot.tool(
name="get_task_results",
request_type=GetTaskResultsRequest,
response_type=ToolResult,
description=(
"Get the results of a scheduled task. Returns the most recent execution "
"results. Useful for checking what a task has found so far."
),
)
async def get_task_results(
request: GetTaskResultsRequest, _context: BotContext | None = None
) -> ToolResult:
"""Get results of a scheduled task."""
services: SchedulerServices | None = get_service(_context, "scheduler")
if services is None:
return ToolResult(content="Scheduler not available", is_error=True)
return await services.get_task_results(request, _context)