Refactor monolithic heartbeat service, AgentConfigForm, and CLI heartbeat-run into a proper adapter registry pattern. Each adapter type (process, claude-local, codex-local, http) gets its own module with server-side execution logic, CLI invocation, and UI config form. Significantly reduces file sizes and enables adding new adapters without touching core code. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
859 lines
26 KiB
TypeScript
859 lines
26 KiB
TypeScript
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
|
|
import type { Db } from "@paperclip/db";
|
|
import {
|
|
agents,
|
|
agentRuntimeState,
|
|
agentWakeupRequests,
|
|
heartbeatRunEvents,
|
|
heartbeatRuns,
|
|
costEvents,
|
|
} from "@paperclip/db";
|
|
import { conflict, notFound } from "../errors.js";
|
|
import { logger } from "../middleware/logger.js";
|
|
import { publishLiveEvent } from "./live-events.js";
|
|
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
|
|
import { getServerAdapter, runningProcesses } from "../adapters/index.js";
|
|
import type { AdapterExecutionResult, AdapterInvocationMeta } from "../adapters/index.js";
|
|
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
|
|
|
|
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
|
|
|
|
function appendExcerpt(prev: string, chunk: string) {
|
|
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
|
|
}
|
|
|
|
interface WakeupOptions {
|
|
source?: "timer" | "assignment" | "on_demand" | "automation";
|
|
triggerDetail?: "manual" | "ping" | "callback" | "system";
|
|
reason?: string | null;
|
|
payload?: Record<string, unknown> | null;
|
|
idempotencyKey?: string | null;
|
|
requestedByActorType?: "user" | "agent" | "system";
|
|
requestedByActorId?: string | null;
|
|
contextSnapshot?: Record<string, unknown>;
|
|
}
|
|
|
|
export function heartbeatService(db: Db) {
|
|
const runLogStore = getRunLogStore();
|
|
|
|
async function getAgent(agentId: string) {
|
|
return db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.id, agentId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getRun(runId: string) {
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(eq(heartbeatRuns.id, runId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function getRuntimeState(agentId: string) {
|
|
return db
|
|
.select()
|
|
.from(agentRuntimeState)
|
|
.where(eq(agentRuntimeState.agentId, agentId))
|
|
.then((rows) => rows[0] ?? null);
|
|
}
|
|
|
|
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
|
|
const existing = await getRuntimeState(agent.id);
|
|
if (existing) return existing;
|
|
|
|
return db
|
|
.insert(agentRuntimeState)
|
|
.values({
|
|
agentId: agent.id,
|
|
companyId: agent.companyId,
|
|
adapterType: agent.adapterType,
|
|
stateJson: {},
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
}
|
|
|
|
async function setRunStatus(
|
|
runId: string,
|
|
status: string,
|
|
patch?: Partial<typeof heartbeatRuns.$inferInsert>,
|
|
) {
|
|
const updated = await db
|
|
.update(heartbeatRuns)
|
|
.set({ status, ...patch, updatedAt: new Date() })
|
|
.where(eq(heartbeatRuns.id, runId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (updated) {
|
|
publishLiveEvent({
|
|
companyId: updated.companyId,
|
|
type: "heartbeat.run.status",
|
|
payload: {
|
|
runId: updated.id,
|
|
agentId: updated.agentId,
|
|
status: updated.status,
|
|
invocationSource: updated.invocationSource,
|
|
triggerDetail: updated.triggerDetail,
|
|
error: updated.error ?? null,
|
|
errorCode: updated.errorCode ?? null,
|
|
startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null,
|
|
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
|
|
},
|
|
});
|
|
}
|
|
|
|
return updated;
|
|
}
|
|
|
|
async function setWakeupStatus(
|
|
wakeupRequestId: string | null | undefined,
|
|
status: string,
|
|
patch?: Partial<typeof agentWakeupRequests.$inferInsert>,
|
|
) {
|
|
if (!wakeupRequestId) return;
|
|
await db
|
|
.update(agentWakeupRequests)
|
|
.set({ status, ...patch, updatedAt: new Date() })
|
|
.where(eq(agentWakeupRequests.id, wakeupRequestId));
|
|
}
|
|
|
|
async function appendRunEvent(
|
|
run: typeof heartbeatRuns.$inferSelect,
|
|
seq: number,
|
|
event: {
|
|
eventType: string;
|
|
stream?: "system" | "stdout" | "stderr";
|
|
level?: "info" | "warn" | "error";
|
|
color?: string;
|
|
message?: string;
|
|
payload?: Record<string, unknown>;
|
|
},
|
|
) {
|
|
await db.insert(heartbeatRunEvents).values({
|
|
companyId: run.companyId,
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
seq,
|
|
eventType: event.eventType,
|
|
stream: event.stream,
|
|
level: event.level,
|
|
color: event.color,
|
|
message: event.message,
|
|
payload: event.payload,
|
|
});
|
|
|
|
publishLiveEvent({
|
|
companyId: run.companyId,
|
|
type: "heartbeat.run.event",
|
|
payload: {
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
seq,
|
|
eventType: event.eventType,
|
|
stream: event.stream ?? null,
|
|
level: event.level ?? null,
|
|
color: event.color ?? null,
|
|
message: event.message ?? null,
|
|
payload: event.payload ?? null,
|
|
},
|
|
});
|
|
}
|
|
|
|
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
|
|
const runtimeConfig = parseObject(agent.runtimeConfig);
|
|
const heartbeat = parseObject(runtimeConfig.heartbeat);
|
|
|
|
return {
|
|
enabled: asBoolean(heartbeat.enabled, true),
|
|
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
|
|
wakeOnAssignment: asBoolean(heartbeat.wakeOnAssignment, true),
|
|
wakeOnOnDemand: asBoolean(heartbeat.wakeOnOnDemand, true),
|
|
wakeOnAutomation: asBoolean(heartbeat.wakeOnAutomation, true),
|
|
};
|
|
}
|
|
|
|
async function finalizeAgentStatus(
|
|
agentId: string,
|
|
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
|
|
) {
|
|
const existing = await getAgent(agentId);
|
|
if (!existing) return;
|
|
|
|
if (existing.status === "paused" || existing.status === "terminated") {
|
|
return;
|
|
}
|
|
|
|
const nextStatus =
|
|
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
|
|
|
|
const updated = await db
|
|
.update(agents)
|
|
.set({
|
|
status: nextStatus,
|
|
lastHeartbeatAt: new Date(),
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agents.id, agentId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (updated) {
|
|
publishLiveEvent({
|
|
companyId: updated.companyId,
|
|
type: "agent.status",
|
|
payload: {
|
|
agentId: updated.id,
|
|
status: updated.status,
|
|
lastHeartbeatAt: updated.lastHeartbeatAt
|
|
? new Date(updated.lastHeartbeatAt).toISOString()
|
|
: null,
|
|
outcome,
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
async function updateRuntimeState(
|
|
agent: typeof agents.$inferSelect,
|
|
run: typeof heartbeatRuns.$inferSelect,
|
|
result: AdapterExecutionResult,
|
|
) {
|
|
const existing = await ensureRuntimeState(agent);
|
|
const usage = result.usage;
|
|
const inputTokens = usage?.inputTokens ?? 0;
|
|
const outputTokens = usage?.outputTokens ?? 0;
|
|
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
|
|
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
|
|
|
|
await db
|
|
.update(agentRuntimeState)
|
|
.set({
|
|
adapterType: agent.adapterType,
|
|
sessionId: result.clearSession ? null : (result.sessionId ?? existing.sessionId),
|
|
lastRunId: run.id,
|
|
lastRunStatus: run.status,
|
|
lastError: result.errorMessage ?? null,
|
|
totalInputTokens: existing.totalInputTokens + inputTokens,
|
|
totalOutputTokens: existing.totalOutputTokens + outputTokens,
|
|
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
|
|
totalCostCents: existing.totalCostCents + additionalCostCents,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentRuntimeState.agentId, agent.id));
|
|
|
|
if (additionalCostCents > 0) {
|
|
await db.insert(costEvents).values({
|
|
companyId: agent.companyId,
|
|
agentId: agent.id,
|
|
provider: result.provider ?? "unknown",
|
|
model: result.model ?? "unknown",
|
|
inputTokens,
|
|
outputTokens,
|
|
costCents: additionalCostCents,
|
|
occurredAt: new Date(),
|
|
});
|
|
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agents.id, agent.id));
|
|
}
|
|
}
|
|
|
|
async function executeRun(runId: string) {
|
|
const run = await getRun(runId);
|
|
if (!run) return;
|
|
if (run.status !== "queued" && run.status !== "running") return;
|
|
|
|
const agent = await getAgent(run.agentId);
|
|
if (!agent) {
|
|
await setRunStatus(runId, "failed", {
|
|
error: "Agent not found",
|
|
errorCode: "agent_not_found",
|
|
finishedAt: new Date(),
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
|
finishedAt: new Date(),
|
|
error: "Agent not found",
|
|
});
|
|
return;
|
|
}
|
|
|
|
const runtime = await ensureRuntimeState(agent);
|
|
|
|
let seq = 1;
|
|
let handle: RunLogHandle | null = null;
|
|
let stdoutExcerpt = "";
|
|
let stderrExcerpt = "";
|
|
|
|
try {
|
|
await setRunStatus(runId, "running", {
|
|
startedAt: new Date(),
|
|
sessionIdBefore: runtime.sessionId,
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
|
|
|
|
const runningAgent = await db
|
|
.update(agents)
|
|
.set({ status: "running", updatedAt: new Date() })
|
|
.where(eq(agents.id, agent.id))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (runningAgent) {
|
|
publishLiveEvent({
|
|
companyId: runningAgent.companyId,
|
|
type: "agent.status",
|
|
payload: {
|
|
agentId: runningAgent.id,
|
|
status: runningAgent.status,
|
|
outcome: "running",
|
|
},
|
|
});
|
|
}
|
|
|
|
const currentRun = (await getRun(runId)) ?? run;
|
|
await appendRunEvent(currentRun, seq++, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: "info",
|
|
message: "run started",
|
|
});
|
|
|
|
handle = await runLogStore.begin({
|
|
companyId: run.companyId,
|
|
agentId: run.agentId,
|
|
runId,
|
|
});
|
|
|
|
await db
|
|
.update(heartbeatRuns)
|
|
.set({
|
|
logStore: handle.store,
|
|
logRef: handle.logRef,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(heartbeatRuns.id, runId));
|
|
|
|
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
|
|
if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk);
|
|
if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk);
|
|
|
|
if (handle) {
|
|
await runLogStore.append(handle, {
|
|
stream,
|
|
chunk,
|
|
ts: new Date().toISOString(),
|
|
});
|
|
}
|
|
|
|
const payloadChunk =
|
|
chunk.length > MAX_LIVE_LOG_CHUNK_BYTES
|
|
? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES)
|
|
: chunk;
|
|
|
|
publishLiveEvent({
|
|
companyId: run.companyId,
|
|
type: "heartbeat.run.log",
|
|
payload: {
|
|
runId: run.id,
|
|
agentId: run.agentId,
|
|
stream,
|
|
chunk: payloadChunk,
|
|
truncated: payloadChunk.length !== chunk.length,
|
|
},
|
|
});
|
|
};
|
|
|
|
const config = parseObject(agent.adapterConfig);
|
|
const context = (run.contextSnapshot ?? {}) as Record<string, unknown>;
|
|
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
|
|
await appendRunEvent(currentRun, seq++, {
|
|
eventType: "adapter.invoke",
|
|
stream: "system",
|
|
level: "info",
|
|
message: "adapter invocation",
|
|
payload: meta as unknown as Record<string, unknown>,
|
|
});
|
|
};
|
|
|
|
const adapter = getServerAdapter(agent.adapterType);
|
|
const adapterResult = await adapter.execute({
|
|
runId: run.id,
|
|
agent,
|
|
runtime,
|
|
config,
|
|
context,
|
|
onLog,
|
|
onMeta: onAdapterMeta,
|
|
});
|
|
|
|
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
|
|
const latestRun = await getRun(run.id);
|
|
if (latestRun?.status === "cancelled") {
|
|
outcome = "cancelled";
|
|
} else if (adapterResult.timedOut) {
|
|
outcome = "timed_out";
|
|
} else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) {
|
|
outcome = "succeeded";
|
|
} else {
|
|
outcome = "failed";
|
|
}
|
|
|
|
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
|
|
if (handle) {
|
|
logSummary = await runLogStore.finalize(handle);
|
|
}
|
|
|
|
const status =
|
|
outcome === "succeeded"
|
|
? "succeeded"
|
|
: outcome === "cancelled"
|
|
? "cancelled"
|
|
: outcome === "timed_out"
|
|
? "timed_out"
|
|
: "failed";
|
|
|
|
const usageJson =
|
|
adapterResult.usage || adapterResult.costUsd != null
|
|
? ({
|
|
...(adapterResult.usage ?? {}),
|
|
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
|
|
} as Record<string, unknown>)
|
|
: null;
|
|
|
|
await setRunStatus(run.id, status, {
|
|
finishedAt: new Date(),
|
|
error:
|
|
outcome === "succeeded"
|
|
? null
|
|
: adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"),
|
|
errorCode:
|
|
outcome === "timed_out"
|
|
? "timeout"
|
|
: outcome === "cancelled"
|
|
? "cancelled"
|
|
: outcome === "failed"
|
|
? "adapter_failed"
|
|
: null,
|
|
exitCode: adapterResult.exitCode,
|
|
signal: adapterResult.signal,
|
|
usageJson,
|
|
resultJson: adapterResult.resultJson ?? null,
|
|
sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId,
|
|
stdoutExcerpt,
|
|
stderrExcerpt,
|
|
logBytes: logSummary?.bytes,
|
|
logSha256: logSummary?.sha256,
|
|
logCompressed: logSummary?.compressed ?? false,
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, {
|
|
finishedAt: new Date(),
|
|
error: adapterResult.errorMessage ?? null,
|
|
});
|
|
|
|
const finalizedRun = await getRun(run.id);
|
|
if (finalizedRun) {
|
|
await appendRunEvent(finalizedRun, seq++, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: outcome === "succeeded" ? "info" : "error",
|
|
message: `run ${outcome}`,
|
|
payload: {
|
|
status,
|
|
exitCode: adapterResult.exitCode,
|
|
},
|
|
});
|
|
}
|
|
|
|
if (finalizedRun) {
|
|
await updateRuntimeState(agent, finalizedRun, adapterResult);
|
|
}
|
|
await finalizeAgentStatus(agent.id, outcome);
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : "Unknown adapter failure";
|
|
logger.error({ err, runId }, "heartbeat execution failed");
|
|
|
|
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
|
|
if (handle) {
|
|
try {
|
|
logSummary = await runLogStore.finalize(handle);
|
|
} catch (finalizeErr) {
|
|
logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error");
|
|
}
|
|
}
|
|
|
|
const failedRun = await setRunStatus(run.id, "failed", {
|
|
error: message,
|
|
errorCode: "adapter_failed",
|
|
finishedAt: new Date(),
|
|
stdoutExcerpt,
|
|
stderrExcerpt,
|
|
logBytes: logSummary?.bytes,
|
|
logSha256: logSummary?.sha256,
|
|
logCompressed: logSummary?.compressed ?? false,
|
|
});
|
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
|
finishedAt: new Date(),
|
|
error: message,
|
|
});
|
|
|
|
if (failedRun) {
|
|
await appendRunEvent(failedRun, seq++, {
|
|
eventType: "error",
|
|
stream: "system",
|
|
level: "error",
|
|
message,
|
|
});
|
|
|
|
await updateRuntimeState(agent, failedRun, {
|
|
exitCode: null,
|
|
signal: null,
|
|
timedOut: false,
|
|
errorMessage: message,
|
|
});
|
|
}
|
|
|
|
await finalizeAgentStatus(agent.id, "failed");
|
|
}
|
|
}
|
|
|
|
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
|
|
const source = opts.source ?? "on_demand";
|
|
const triggerDetail = opts.triggerDetail ?? null;
|
|
const contextSnapshot = opts.contextSnapshot ?? {};
|
|
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) throw notFound("Agent not found");
|
|
|
|
if (agent.status === "paused" || agent.status === "terminated") {
|
|
throw conflict("Agent is not invokable in its current state", { status: agent.status });
|
|
}
|
|
|
|
const policy = parseHeartbeatPolicy(agent);
|
|
const writeSkippedRequest = async (reason: string) => {
|
|
await db.insert(agentWakeupRequests).values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason,
|
|
payload: opts.payload ?? null,
|
|
status: "skipped",
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
finishedAt: new Date(),
|
|
});
|
|
};
|
|
|
|
if (source === "timer" && !policy.enabled) {
|
|
await writeSkippedRequest("heartbeat.disabled");
|
|
return null;
|
|
}
|
|
if (source === "assignment" && !policy.wakeOnAssignment) {
|
|
await writeSkippedRequest("heartbeat.wakeOnAssignment.disabled");
|
|
return null;
|
|
}
|
|
if (source === "automation" && !policy.wakeOnAutomation) {
|
|
await writeSkippedRequest("heartbeat.wakeOnAutomation.disabled");
|
|
return null;
|
|
}
|
|
if (source === "on_demand" && triggerDetail === "ping" && !policy.wakeOnOnDemand) {
|
|
await writeSkippedRequest("heartbeat.wakeOnOnDemand.disabled");
|
|
return null;
|
|
}
|
|
|
|
const activeRun = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
|
|
.orderBy(desc(heartbeatRuns.createdAt))
|
|
.then((rows) => rows[0] ?? null);
|
|
|
|
if (activeRun) {
|
|
await db.insert(agentWakeupRequests).values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason: opts.reason ?? null,
|
|
payload: opts.payload ?? null,
|
|
status: "coalesced",
|
|
coalescedCount: 1,
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
runId: activeRun.id,
|
|
finishedAt: new Date(),
|
|
});
|
|
return activeRun;
|
|
}
|
|
|
|
const wakeupRequest = await db
|
|
.insert(agentWakeupRequests)
|
|
.values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
source,
|
|
triggerDetail,
|
|
reason: opts.reason ?? null,
|
|
payload: opts.payload ?? null,
|
|
status: "queued",
|
|
requestedByActorType: opts.requestedByActorType ?? null,
|
|
requestedByActorId: opts.requestedByActorId ?? null,
|
|
idempotencyKey: opts.idempotencyKey ?? null,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
const runtimeForRun = await getRuntimeState(agent.id);
|
|
|
|
const newRun = await db
|
|
.insert(heartbeatRuns)
|
|
.values({
|
|
companyId: agent.companyId,
|
|
agentId,
|
|
invocationSource: source,
|
|
triggerDetail,
|
|
status: "queued",
|
|
wakeupRequestId: wakeupRequest.id,
|
|
contextSnapshot,
|
|
sessionIdBefore: runtimeForRun?.sessionId ?? null,
|
|
})
|
|
.returning()
|
|
.then((rows) => rows[0]);
|
|
|
|
await db
|
|
.update(agentWakeupRequests)
|
|
.set({
|
|
runId: newRun.id,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
|
|
|
publishLiveEvent({
|
|
companyId: newRun.companyId,
|
|
type: "heartbeat.run.queued",
|
|
payload: {
|
|
runId: newRun.id,
|
|
agentId: newRun.agentId,
|
|
invocationSource: newRun.invocationSource,
|
|
triggerDetail: newRun.triggerDetail,
|
|
wakeupRequestId: newRun.wakeupRequestId,
|
|
},
|
|
});
|
|
|
|
void executeRun(newRun.id).catch((err) => {
|
|
logger.error({ err, runId: newRun.id }, "heartbeat execution failed");
|
|
});
|
|
|
|
return newRun;
|
|
}
|
|
|
|
return {
|
|
list: (companyId: string, agentId?: string) => {
|
|
if (!agentId) {
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(eq(heartbeatRuns.companyId, companyId))
|
|
.orderBy(desc(heartbeatRuns.createdAt));
|
|
}
|
|
|
|
return db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)))
|
|
.orderBy(desc(heartbeatRuns.createdAt));
|
|
},
|
|
|
|
getRun,
|
|
|
|
getRuntimeState: async (agentId: string) => {
|
|
const state = await getRuntimeState(agentId);
|
|
if (state) return state;
|
|
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) return null;
|
|
return ensureRuntimeState(agent);
|
|
},
|
|
|
|
resetRuntimeSession: async (agentId: string) => {
|
|
const agent = await getAgent(agentId);
|
|
if (!agent) throw notFound("Agent not found");
|
|
await ensureRuntimeState(agent);
|
|
|
|
return db
|
|
.update(agentRuntimeState)
|
|
.set({
|
|
sessionId: null,
|
|
stateJson: {},
|
|
lastError: null,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(agentRuntimeState.agentId, agentId))
|
|
.returning()
|
|
.then((rows) => rows[0] ?? null);
|
|
},
|
|
|
|
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
|
|
db
|
|
.select()
|
|
.from(heartbeatRunEvents)
|
|
.where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq)))
|
|
.orderBy(asc(heartbeatRunEvents.seq))
|
|
.limit(Math.max(1, Math.min(limit, 1000))),
|
|
|
|
readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => {
|
|
const run = await getRun(runId);
|
|
if (!run) throw notFound("Heartbeat run not found");
|
|
if (!run.logStore || !run.logRef) throw notFound("Run log not found");
|
|
|
|
const result = await runLogStore.read(
|
|
{
|
|
store: run.logStore as "local_file",
|
|
logRef: run.logRef,
|
|
},
|
|
opts,
|
|
);
|
|
|
|
return {
|
|
runId,
|
|
store: run.logStore,
|
|
logRef: run.logRef,
|
|
...result,
|
|
};
|
|
},
|
|
|
|
invoke: async (
|
|
agentId: string,
|
|
source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand",
|
|
contextSnapshot: Record<string, unknown> = {},
|
|
triggerDetail: "manual" | "ping" | "callback" | "system" = "manual",
|
|
actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null },
|
|
) =>
|
|
enqueueWakeup(agentId, {
|
|
source,
|
|
triggerDetail,
|
|
contextSnapshot,
|
|
requestedByActorType: actor?.actorType,
|
|
requestedByActorId: actor?.actorId ?? null,
|
|
}),
|
|
|
|
wakeup: enqueueWakeup,
|
|
|
|
tickTimers: async (now = new Date()) => {
|
|
const allAgents = await db.select().from(agents);
|
|
let checked = 0;
|
|
let enqueued = 0;
|
|
let skipped = 0;
|
|
|
|
for (const agent of allAgents) {
|
|
if (agent.status === "paused" || agent.status === "terminated") continue;
|
|
const policy = parseHeartbeatPolicy(agent);
|
|
if (!policy.enabled || policy.intervalSec <= 0) continue;
|
|
|
|
checked += 1;
|
|
const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0;
|
|
const elapsedMs = now.getTime() - last;
|
|
if (last && elapsedMs < policy.intervalSec * 1000) continue;
|
|
|
|
const run = await enqueueWakeup(agent.id, {
|
|
source: "timer",
|
|
triggerDetail: "system",
|
|
reason: "heartbeat_timer",
|
|
requestedByActorType: "system",
|
|
requestedByActorId: "heartbeat_scheduler",
|
|
contextSnapshot: {
|
|
source: "scheduler",
|
|
reason: "interval_elapsed",
|
|
now: now.toISOString(),
|
|
},
|
|
});
|
|
if (run) enqueued += 1;
|
|
else skipped += 1;
|
|
}
|
|
|
|
return { checked, enqueued, skipped };
|
|
},
|
|
|
|
cancelRun: async (runId: string) => {
|
|
const run = await getRun(runId);
|
|
if (!run) throw notFound("Heartbeat run not found");
|
|
if (run.status !== "running" && run.status !== "queued") return run;
|
|
|
|
const running = runningProcesses.get(run.id);
|
|
if (running) {
|
|
running.child.kill("SIGTERM");
|
|
const graceMs = Math.max(1, running.graceSec) * 1000;
|
|
setTimeout(() => {
|
|
if (!running.child.killed) {
|
|
running.child.kill("SIGKILL");
|
|
}
|
|
}, graceMs);
|
|
}
|
|
|
|
const cancelled = await setRunStatus(run.id, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled by control plane",
|
|
errorCode: "cancelled",
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled by control plane",
|
|
});
|
|
|
|
if (cancelled) {
|
|
await appendRunEvent(cancelled, 1, {
|
|
eventType: "lifecycle",
|
|
stream: "system",
|
|
level: "warn",
|
|
message: "run cancelled",
|
|
});
|
|
}
|
|
|
|
runningProcesses.delete(run.id);
|
|
await finalizeAgentStatus(run.agentId, "cancelled");
|
|
return cancelled;
|
|
},
|
|
|
|
cancelActiveForAgent: async (agentId: string) => {
|
|
const runs = await db
|
|
.select()
|
|
.from(heartbeatRuns)
|
|
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
|
|
|
|
for (const run of runs) {
|
|
await setRunStatus(run.id, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled due to agent pause",
|
|
errorCode: "cancelled",
|
|
});
|
|
|
|
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
|
|
finishedAt: new Date(),
|
|
error: "Cancelled due to agent pause",
|
|
});
|
|
|
|
const running = runningProcesses.get(run.id);
|
|
if (running) {
|
|
running.child.kill("SIGTERM");
|
|
runningProcesses.delete(run.id);
|
|
}
|
|
}
|
|
|
|
return runs.length;
|
|
},
|
|
};
|
|
}
|