Files
paperclip/server/src/services/plugin-job-scheduler.ts
2026-03-13 16:22:34 -05:00

753 lines
21 KiB
TypeScript

/**
* PluginJobScheduler — tick-based scheduler for plugin scheduled jobs.
*
* The scheduler is the central coordinator for all plugin cron jobs. It
* periodically ticks (default every 30 seconds), queries the `plugin_jobs`
* table for jobs whose `nextRunAt` has passed, dispatches `runJob` RPC calls
* to the appropriate worker processes, records each execution in the
* `plugin_job_runs` table, and advances the scheduling pointer.
*
* ## Responsibilities
*
* 1. **Tick loop** — A `setInterval`-based loop fires every `tickIntervalMs`
* (default 30s). Each tick scans for due jobs and dispatches them.
*
* 2. **Cron parsing & next-run calculation** — Uses the lightweight built-in
* cron parser ({@link parseCron}, {@link nextCronTick}) to compute the
* `nextRunAt` timestamp after each run or when a new job is registered.
*
* 3. **Overlap prevention** — Before dispatching a job, the scheduler checks
* for an existing `running` run for the same job. If one exists, the job
* is skipped for that tick.
*
* 4. **Job run recording** — Every execution creates a `plugin_job_runs` row:
* `queued` → `running` → `succeeded` | `failed`. Duration and error are
* captured.
*
* 5. **Lifecycle integration** — The scheduler exposes `registerPlugin()` and
* `unregisterPlugin()` so the host lifecycle manager can wire up job
* scheduling when plugins start/stop. On registration, the scheduler
* computes `nextRunAt` for all active jobs that don't already have one.
*
* @see PLUGIN_SPEC.md §17 — Scheduled Jobs
* @see ./plugin-job-store.ts — Persistence layer
* @see ./cron.ts — Cron parsing utilities
*/
import { and, eq, lte, or } from "drizzle-orm";
import type { Db } from "@paperclipai/db";
import { pluginJobs, pluginJobRuns } from "@paperclipai/db";
import type { PluginJobStore } from "./plugin-job-store.js";
import type { PluginWorkerManager } from "./plugin-worker-manager.js";
import { parseCron, nextCronTick, validateCron } from "./cron.js";
import { logger } from "../middleware/logger.js";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Default interval between scheduler ticks (30 seconds). */
const DEFAULT_TICK_INTERVAL_MS = 30_000;
/** Default timeout for a runJob RPC call (5 minutes). */
const DEFAULT_JOB_TIMEOUT_MS = 5 * 60 * 1_000;
/** Maximum number of concurrent job executions across all plugins. */
const DEFAULT_MAX_CONCURRENT_JOBS = 10;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/**
* Options for creating a PluginJobScheduler.
*/
export interface PluginJobSchedulerOptions {
/** Drizzle database instance. */
db: Db;
/** Persistence layer for jobs and runs. */
jobStore: PluginJobStore;
/** Worker process manager for RPC calls. */
workerManager: PluginWorkerManager;
/** Interval between scheduler ticks in ms (default: 30s). */
tickIntervalMs?: number;
/** Timeout for individual job RPC calls in ms (default: 5min). */
jobTimeoutMs?: number;
/** Maximum number of concurrent job executions (default: 10). */
maxConcurrentJobs?: number;
}
/**
* Result of a manual job trigger.
*/
export interface TriggerJobResult {
/** The created run ID. */
runId: string;
/** The job ID that was triggered. */
jobId: string;
}
/**
* Diagnostic information about the scheduler.
*/
export interface SchedulerDiagnostics {
/** Whether the tick loop is running. */
running: boolean;
/** Number of jobs currently executing. */
activeJobCount: number;
/** Set of job IDs currently in-flight. */
activeJobIds: string[];
/** Total number of ticks executed since start. */
tickCount: number;
/** Timestamp of the last tick (ISO 8601). */
lastTickAt: string | null;
}
// ---------------------------------------------------------------------------
// Scheduler
// ---------------------------------------------------------------------------
/**
* The public interface of the job scheduler.
*/
export interface PluginJobScheduler {
/**
* Start the scheduler tick loop.
*
* Safe to call multiple times — subsequent calls are no-ops.
*/
start(): void;
/**
* Stop the scheduler tick loop.
*
* In-flight job runs are NOT cancelled — they are allowed to finish
* naturally. The tick loop simply stops firing.
*/
stop(): void;
/**
* Register a plugin with the scheduler.
*
* Computes `nextRunAt` for all active jobs that are missing it. This is
* typically called after a plugin's worker process starts and
* `syncJobDeclarations()` has been called.
*
* @param pluginId - UUID of the plugin
*/
registerPlugin(pluginId: string): Promise<void>;
/**
* Unregister a plugin from the scheduler.
*
* Cancels any in-flight runs for the plugin and removes tracking state.
*
* @param pluginId - UUID of the plugin
*/
unregisterPlugin(pluginId: string): Promise<void>;
/**
* Manually trigger a specific job (outside of the cron schedule).
*
* Creates a run with `trigger: "manual"` and dispatches immediately,
* respecting the overlap prevention check.
*
* @param jobId - UUID of the job to trigger
* @param trigger - What triggered this run (default: "manual")
* @returns The created run info
* @throws {Error} if the job is not found, not active, or already running
*/
triggerJob(jobId: string, trigger?: "manual" | "retry"): Promise<TriggerJobResult>;
/**
* Run a single scheduler tick immediately (for testing).
*
* @internal
*/
tick(): Promise<void>;
/**
* Get diagnostic information about the scheduler state.
*/
diagnostics(): SchedulerDiagnostics;
}
// ---------------------------------------------------------------------------
// Implementation
// ---------------------------------------------------------------------------
/**
* Create a new PluginJobScheduler.
*
* @example
* ```ts
* const scheduler = createPluginJobScheduler({
* db,
* jobStore,
* workerManager,
* });
*
* // Start the tick loop
* scheduler.start();
*
* // When a plugin comes online, register it
* await scheduler.registerPlugin(pluginId);
*
* // Manually trigger a job
* const { runId } = await scheduler.triggerJob(jobId);
*
* // On server shutdown
* scheduler.stop();
* ```
*/
export function createPluginJobScheduler(
options: PluginJobSchedulerOptions,
): PluginJobScheduler {
const {
db,
jobStore,
workerManager,
tickIntervalMs = DEFAULT_TICK_INTERVAL_MS,
jobTimeoutMs = DEFAULT_JOB_TIMEOUT_MS,
maxConcurrentJobs = DEFAULT_MAX_CONCURRENT_JOBS,
} = options;
const log = logger.child({ service: "plugin-job-scheduler" });
// -----------------------------------------------------------------------
// State
// -----------------------------------------------------------------------
/** Timer handle for the tick loop. */
let tickTimer: ReturnType<typeof setInterval> | null = null;
/** Whether the scheduler is running. */
let running = false;
/** Set of job IDs currently being executed (for overlap prevention). */
const activeJobs = new Set<string>();
/** Total number of ticks since start. */
let tickCount = 0;
/** Timestamp of the last tick. */
let lastTickAt: Date | null = null;
/** Guard against concurrent tick execution. */
let tickInProgress = false;
// -----------------------------------------------------------------------
// Core: tick
// -----------------------------------------------------------------------
/**
* A single scheduler tick. Queries for due jobs and dispatches them.
*/
async function tick(): Promise<void> {
// Prevent overlapping ticks (in case a tick takes longer than the interval)
if (tickInProgress) {
log.debug("skipping tick — previous tick still in progress");
return;
}
tickInProgress = true;
tickCount++;
lastTickAt = new Date();
try {
const now = new Date();
// Query for jobs whose nextRunAt has passed and are active.
// We include jobs with null nextRunAt since they may have just been
// registered and need their first run calculated.
const dueJobs = await db
.select()
.from(pluginJobs)
.where(
and(
eq(pluginJobs.status, "active"),
lte(pluginJobs.nextRunAt, now),
),
);
if (dueJobs.length === 0) {
return;
}
log.debug({ count: dueJobs.length }, "found due jobs");
// Dispatch each due job (respecting concurrency limits)
const dispatches: Promise<void>[] = [];
for (const job of dueJobs) {
// Concurrency limit
if (activeJobs.size >= maxConcurrentJobs) {
log.warn(
{ maxConcurrentJobs, activeJobCount: activeJobs.size },
"max concurrent jobs reached, deferring remaining jobs",
);
break;
}
// Overlap prevention: skip if this job is already running
if (activeJobs.has(job.id)) {
log.debug(
{ jobId: job.id, jobKey: job.jobKey, pluginId: job.pluginId },
"skipping job — already running (overlap prevention)",
);
continue;
}
// Check if the worker is available
if (!workerManager.isRunning(job.pluginId)) {
log.debug(
{ jobId: job.id, pluginId: job.pluginId },
"skipping job — worker not running",
);
continue;
}
// Validate cron expression before dispatching
if (!job.schedule) {
log.warn(
{ jobId: job.id, jobKey: job.jobKey },
"skipping job — no schedule defined",
);
continue;
}
dispatches.push(dispatchJob(job));
}
if (dispatches.length > 0) {
await Promise.allSettled(dispatches);
}
} catch (err) {
log.error(
{ err: err instanceof Error ? err.message : String(err) },
"scheduler tick error",
);
} finally {
tickInProgress = false;
}
}
// -----------------------------------------------------------------------
// Core: dispatch a single job
// -----------------------------------------------------------------------
/**
* Dispatch a single job run — create the run record, call the worker,
* record the result, and advance the schedule pointer.
*/
async function dispatchJob(
job: typeof pluginJobs.$inferSelect,
): Promise<void> {
const { id: jobId, pluginId, jobKey, schedule } = job;
const jobLog = log.child({ jobId, pluginId, jobKey });
// Mark as active (overlap prevention)
activeJobs.add(jobId);
let runId: string | undefined;
const startedAt = Date.now();
try {
// 1. Create run record
const run = await jobStore.createRun({
jobId,
pluginId,
trigger: "schedule",
});
runId = run.id;
jobLog.info({ runId }, "dispatching scheduled job");
// 2. Mark run as running
await jobStore.markRunning(runId);
// 3. Call worker via RPC
await workerManager.call(
pluginId,
"runJob",
{
job: {
jobKey,
runId,
trigger: "schedule" as const,
scheduledAt: (job.nextRunAt ?? new Date()).toISOString(),
},
},
jobTimeoutMs,
);
// 4. Mark run as succeeded
const durationMs = Date.now() - startedAt;
await jobStore.completeRun(runId, {
status: "succeeded",
durationMs,
});
jobLog.info({ runId, durationMs }, "job completed successfully");
} catch (err) {
const durationMs = Date.now() - startedAt;
const errorMessage = err instanceof Error ? err.message : String(err);
jobLog.error(
{ runId, durationMs, err: errorMessage },
"job execution failed",
);
// Record the failure
if (runId) {
try {
await jobStore.completeRun(runId, {
status: "failed",
error: errorMessage,
durationMs,
});
} catch (completeErr) {
jobLog.error(
{
runId,
err: completeErr instanceof Error ? completeErr.message : String(completeErr),
},
"failed to record job failure",
);
}
}
} finally {
// Remove from active set
activeJobs.delete(jobId);
// 5. Always advance the schedule pointer (even on failure)
try {
await advanceSchedulePointer(job);
} catch (err) {
jobLog.error(
{ err: err instanceof Error ? err.message : String(err) },
"failed to advance schedule pointer",
);
}
}
}
// -----------------------------------------------------------------------
// Core: manual trigger
// -----------------------------------------------------------------------
async function triggerJob(
jobId: string,
trigger: "manual" | "retry" = "manual",
): Promise<TriggerJobResult> {
const job = await jobStore.getJobById(jobId);
if (!job) {
throw new Error(`Job not found: ${jobId}`);
}
if (job.status !== "active") {
throw new Error(
`Job "${job.jobKey}" is not active (status: ${job.status})`,
);
}
// Overlap prevention
if (activeJobs.has(jobId)) {
throw new Error(
`Job "${job.jobKey}" is already running — cannot trigger while in progress`,
);
}
// Also check DB for running runs (defensive — covers multi-instance)
const existingRuns = await db
.select()
.from(pluginJobRuns)
.where(
and(
eq(pluginJobRuns.jobId, jobId),
eq(pluginJobRuns.status, "running"),
),
);
if (existingRuns.length > 0) {
throw new Error(
`Job "${job.jobKey}" already has a running execution — cannot trigger while in progress`,
);
}
// Check worker availability
if (!workerManager.isRunning(job.pluginId)) {
throw new Error(
`Worker for plugin "${job.pluginId}" is not running — cannot trigger job`,
);
}
// Create the run and dispatch (non-blocking)
const run = await jobStore.createRun({
jobId,
pluginId: job.pluginId,
trigger,
});
// Dispatch in background — don't block the caller
void dispatchManualRun(job, run.id, trigger);
return { runId: run.id, jobId };
}
/**
* Dispatch a manually triggered job run.
*/
async function dispatchManualRun(
job: typeof pluginJobs.$inferSelect,
runId: string,
trigger: "manual" | "retry",
): Promise<void> {
const { id: jobId, pluginId, jobKey } = job;
const jobLog = log.child({ jobId, pluginId, jobKey, runId, trigger });
activeJobs.add(jobId);
const startedAt = Date.now();
try {
await jobStore.markRunning(runId);
await workerManager.call(
pluginId,
"runJob",
{
job: {
jobKey,
runId,
trigger,
scheduledAt: new Date().toISOString(),
},
},
jobTimeoutMs,
);
const durationMs = Date.now() - startedAt;
await jobStore.completeRun(runId, {
status: "succeeded",
durationMs,
});
jobLog.info({ durationMs }, "manual job completed successfully");
} catch (err) {
const durationMs = Date.now() - startedAt;
const errorMessage = err instanceof Error ? err.message : String(err);
jobLog.error({ durationMs, err: errorMessage }, "manual job failed");
try {
await jobStore.completeRun(runId, {
status: "failed",
error: errorMessage,
durationMs,
});
} catch (completeErr) {
jobLog.error(
{
err: completeErr instanceof Error ? completeErr.message : String(completeErr),
},
"failed to record manual job failure",
);
}
} finally {
activeJobs.delete(jobId);
}
}
// -----------------------------------------------------------------------
// Schedule pointer management
// -----------------------------------------------------------------------
/**
* Advance the `lastRunAt` and `nextRunAt` timestamps on a job after a run.
*/
async function advanceSchedulePointer(
job: typeof pluginJobs.$inferSelect,
): Promise<void> {
const now = new Date();
let nextRunAt: Date | null = null;
if (job.schedule) {
const validationError = validateCron(job.schedule);
if (validationError) {
log.warn(
{ jobId: job.id, schedule: job.schedule, error: validationError },
"invalid cron schedule — cannot compute next run",
);
} else {
const cron = parseCron(job.schedule);
nextRunAt = nextCronTick(cron, now);
}
}
await jobStore.updateRunTimestamps(job.id, now, nextRunAt);
}
/**
* Ensure all active jobs for a plugin have a `nextRunAt` value.
* Called when a plugin is registered with the scheduler.
*/
async function ensureNextRunTimestamps(pluginId: string): Promise<void> {
const jobs = await jobStore.listJobs(pluginId, "active");
for (const job of jobs) {
// Skip jobs that already have a valid nextRunAt in the future
if (job.nextRunAt && job.nextRunAt.getTime() > Date.now()) {
continue;
}
// Skip jobs without a schedule
if (!job.schedule) {
continue;
}
const validationError = validateCron(job.schedule);
if (validationError) {
log.warn(
{ jobId: job.id, jobKey: job.jobKey, schedule: job.schedule, error: validationError },
"skipping job with invalid cron schedule",
);
continue;
}
const cron = parseCron(job.schedule);
const nextRunAt = nextCronTick(cron, new Date());
if (nextRunAt) {
await jobStore.updateRunTimestamps(
job.id,
job.lastRunAt ?? new Date(0),
nextRunAt,
);
log.debug(
{ jobId: job.id, jobKey: job.jobKey, nextRunAt: nextRunAt.toISOString() },
"computed nextRunAt for job",
);
}
}
}
// -----------------------------------------------------------------------
// Plugin registration
// -----------------------------------------------------------------------
async function registerPlugin(pluginId: string): Promise<void> {
log.info({ pluginId }, "registering plugin with job scheduler");
await ensureNextRunTimestamps(pluginId);
}
async function unregisterPlugin(pluginId: string): Promise<void> {
log.info({ pluginId }, "unregistering plugin from job scheduler");
// Cancel any in-flight run records for this plugin that are still
// queued or running. Active jobs in-memory will finish naturally.
try {
const runningRuns = await db
.select()
.from(pluginJobRuns)
.where(
and(
eq(pluginJobRuns.pluginId, pluginId),
or(
eq(pluginJobRuns.status, "running"),
eq(pluginJobRuns.status, "queued"),
),
),
);
for (const run of runningRuns) {
await jobStore.completeRun(run.id, {
status: "cancelled",
error: "Plugin unregistered",
durationMs: run.startedAt
? Date.now() - run.startedAt.getTime()
: null,
});
}
} catch (err) {
log.error(
{
pluginId,
err: err instanceof Error ? err.message : String(err),
},
"error cancelling in-flight runs during unregister",
);
}
// Remove any active tracking for jobs owned by this plugin
const jobs = await jobStore.listJobs(pluginId);
for (const job of jobs) {
activeJobs.delete(job.id);
}
}
// -----------------------------------------------------------------------
// Lifecycle: start / stop
// -----------------------------------------------------------------------
function start(): void {
if (running) {
log.debug("scheduler already running");
return;
}
running = true;
tickTimer = setInterval(() => {
void tick();
}, tickIntervalMs);
log.info(
{ tickIntervalMs, maxConcurrentJobs },
"plugin job scheduler started",
);
}
function stop(): void {
// Always clear the timer defensively, even if `running` is already false,
// to prevent leaked interval timers.
if (tickTimer !== null) {
clearInterval(tickTimer);
tickTimer = null;
}
if (!running) return;
running = false;
log.info(
{ activeJobCount: activeJobs.size },
"plugin job scheduler stopped",
);
}
// -----------------------------------------------------------------------
// Diagnostics
// -----------------------------------------------------------------------
function diagnostics(): SchedulerDiagnostics {
return {
running,
activeJobCount: activeJobs.size,
activeJobIds: [...activeJobs],
tickCount,
lastTickAt: lastTickAt?.toISOString() ?? null,
};
}
// -----------------------------------------------------------------------
// Public API
// -----------------------------------------------------------------------
return {
start,
stop,
registerPlugin,
unregisterPlugin,
triggerJob,
tick,
diagnostics,
};
}