From 6dbbf1bbece488a02193c58c64d6f4a5a47ee2ed Mon Sep 17 00:00:00 2001 From: Forgotten Date: Tue, 17 Feb 2026 20:46:07 -0600 Subject: [PATCH] Add CLI heartbeat-run command for manual agent invocation Add heartbeat-run command that triggers a single agent heartbeat from the CLI. Register it in the CLI entrypoint alongside existing commands. Co-Authored-By: Claude Opus 4.6 --- cli/src/commands/heartbeat-run.ts | 178 ++++++++++++++++++++++++++++++ cli/src/index.ts | 17 +++ 2 files changed, 195 insertions(+) create mode 100644 cli/src/commands/heartbeat-run.ts diff --git a/cli/src/commands/heartbeat-run.ts b/cli/src/commands/heartbeat-run.ts new file mode 100644 index 00000000..c31e39da --- /dev/null +++ b/cli/src/commands/heartbeat-run.ts @@ -0,0 +1,178 @@ +import { resolve } from "node:path"; +import pc from "picocolors"; +import { createDb, createPgliteDb } from "@paperclip/db"; +import { heartbeatService, subscribeCompanyLiveEvents } from "../../server/src/services/index.js"; +import { agents } from "@paperclip/db"; +import { eq } from "drizzle-orm"; +import type { PaperclipConfig } from "../config/schema.js"; +import { readConfig } from "../config/store.js"; +import type { LiveEvent } from "@paperclip/shared"; + +const HEARTBEAT_SOURCES = ["timer", "assignment", "on_demand", "automation"] as const; +const HEARTBEAT_TRIGGERS = ["manual", "ping", "callback", "system"] as const; +const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]); +const POLL_INTERVAL_MS = 200; + +type HeartbeatSource = (typeof HEARTBEAT_SOURCES)[number]; +type HeartbeatTrigger = (typeof HEARTBEAT_TRIGGERS)[number]; + +interface HeartbeatRunOptions { + config?: string; + agentId: string; + source: string; + trigger: string; + timeoutMs: string; +} + +export async function heartbeatRun(opts: HeartbeatRunOptions): Promise { + const parsedTimeout = Number.parseInt(opts.timeoutMs, 10); + const timeoutMs = Number.isFinite(parsedTimeout) ? parsedTimeout : 0; + const source = HEARTBEAT_SOURCES.includes(opts.source as HeartbeatSource) + ? (opts.source as HeartbeatSource) + : "on_demand"; + const triggerDetail = HEARTBEAT_TRIGGERS.includes(opts.trigger as HeartbeatTrigger) + ? (opts.trigger as HeartbeatTrigger) + : "manual"; + + const config = readConfig(opts.config); + const db = await createHeartbeatDb(config); + + const [agent] = await db.select().from(agents).where(eq(agents.id, opts.agentId)); + if (!agent) { + console.error(pc.red(`Agent not found: ${opts.agentId}`)); + return; + } + + const heartbeat = heartbeatService(db); + let activeRunId: string | null = null; + const unsubscribe = subscribeCompanyLiveEvents(agent.companyId, (event: LiveEvent) => { + const payload = normalizePayload(event.payload); + const payloadRunId = typeof payload.runId === "string" ? payload.runId : null; + const payloadAgentId = typeof payload.agentId === "string" ? payload.agentId : null; + if (!payloadRunId || (payloadAgentId && payloadAgentId !== agent.id)) return; + + if (activeRunId === null) { + activeRunId = payloadRunId; + } else if (payloadRunId !== activeRunId) { + return; + } + + if (event.type === "heartbeat.run.status") { + const status = typeof payload.status === "string" ? payload.status : null; + if (status) { + console.log(pc.blue(`[status] ${status}`)); + } + } else if (event.type === "heartbeat.run.log") { + const stream = typeof payload.stream === "string" ? payload.stream : "system"; + const chunk = typeof payload.chunk === "string" ? payload.chunk : ""; + if (!chunk) return; + if (stream === "stdout") { + process.stdout.write(pc.green("[stdout] ") + chunk); + } else if (stream === "stderr") { + process.stdout.write(pc.red("[stderr] ") + chunk); + } else { + process.stdout.write(pc.yellow("[system] ") + chunk); + } + } else if (event.type === "heartbeat.run.event") { + if (typeof payload.message === "string") { + console.log(pc.gray(`[event] ${payload.eventType ?? "heartbeat.run.event"}: ${payload.message}`)); + } + } + }); + + const run = await heartbeat.invoke(opts.agentId, source, {}, triggerDetail, { + actorType: "user", + actorId: "paperclip cli", + }); + + if (!run) { + console.error(pc.red("Heartbeat was not queued.")); + return; + } + + console.log(pc.cyan(`Invoked heartbeat run ${run.id} for agent ${agent.name} (${agent.id})`)); + + activeRunId = run.id; + let finalStatus: string | null = null; + let finalError: string | null = null; + + const deadline = timeoutMs > 0 ? Date.now() + timeoutMs : null; + if (!activeRunId) { + console.error(pc.red("Failed to capture heartbeat run id")); + return; + } + + try { + while (true) { + const currentRun = await heartbeat.getRun(activeRunId); + if (!currentRun) { + console.error(pc.red("Heartbeat run disappeared")); + break; + } + + if (currentRun.status !== finalStatus && currentRun.status) { + finalStatus = currentRun.status; + const statusText = `Status: ${currentRun.status}`; + console.log(pc.blue(statusText)); + } + + if (TERMINAL_STATUSES.has(currentRun.status)) { + finalStatus = currentRun.status; + finalError = currentRun.error; + break; + } + + if (deadline && Date.now() >= deadline) { + finalError = `CLI timed out after ${timeoutMs}ms`; + finalStatus = "timed_out"; + console.error(pc.yellow(finalError)); + break; + } + + await sleep(POLL_INTERVAL_MS); + } + } finally { + unsubscribe(); + } + + if (finalStatus) { + const label = `Run ${activeRunId} completed with status ${finalStatus}`; + if (finalStatus === "succeeded") { + console.log(pc.green(label)); + return; + } + + console.log(pc.red(label)); + if (finalError) { + console.log(pc.red(`Error: ${finalError}`)); + } + process.exitCode = 1; + } else { + process.exitCode = 1; + console.log(pc.gray("Heartbeat stream ended without terminal status")); + } +} + +function normalizePayload(payload: unknown): Record { + return typeof payload === "object" && payload !== null ? (payload as Record) : {}; +} + +async function createHeartbeatDb(config: PaperclipConfig | null) { + if (process.env.DATABASE_URL) { + return createDb(process.env.DATABASE_URL); + } + + if (!config || config.database.mode === "pglite") { + return createPgliteDb(resolve(process.cwd(), config?.database.pgliteDataDir ?? "./data/pglite")); + } + + if (!config.database.connectionString) { + throw new Error("Postgres mode is configured but connectionString is missing"); + } + + return createDb(config.database.connectionString); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/cli/src/index.ts b/cli/src/index.ts index d22b872a..0a3e17c4 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -3,6 +3,7 @@ import { Command } from "commander"; import { onboard } from "./commands/onboard.js"; import { doctor } from "./commands/doctor.js"; import { configure } from "./commands/configure.js"; +import { heartbeatRun } from "./commands/heartbeat-run.js"; const program = new Command(); @@ -33,4 +34,20 @@ program .option("-s, --section
", "Section to configure (llm, database, logging, server)") .action(configure); +const heartbeat = program.command("heartbeat").description("Heartbeat utilities"); + +heartbeat + .command("run") + .description("Run one agent heartbeat and stream live logs") + .requiredOption("-a, --agent-id ", "Agent ID to invoke") + .option("-c, --config ", "Path to config file") + .option( + "--source ", + "Invocation source (timer | assignment | on_demand | automation)", + "on_demand", + ) + .option("--trigger ", "Trigger detail (manual | ping | callback | system)", "manual") + .option("--timeout-ms ", "Max time to wait before giving up", "0") + .action(heartbeatRun); + program.parse();