Files
paperclip/server/src/services/heartbeat.ts
Forgotten b327687c92 feat: comment-triggered wakeups, coalescing improvements, and failed run badges
Enhance heartbeat wakeup to propagate wakeCommentId, queue follow-up runs
for comment wakes on already-running agents, and merge coalesced context
snapshots. Add failed run count to sidebar badges and expose usage/result
JSON in activity service.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 10:32:17 -06:00

1396 lines
44 KiB
TypeScript

import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import {
agents,
agentRuntimeState,
agentTaskSessions,
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
costEvents,
} from "@paperclip/db";
import { conflict, notFound } from "../errors.js";
import { logger } from "../middleware/logger.js";
import { publishLiveEvent } from "./live-events.js";
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
import { getServerAdapter, runningProcesses } from "../adapters/index.js";
import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec } from "../adapters/index.js";
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
import { secretService } from "./secrets.js";
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
function appendExcerpt(prev: string, chunk: string) {
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
}
interface WakeupOptions {
source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system";
reason?: string | null;
payload?: Record<string, unknown> | null;
idempotencyKey?: string | null;
requestedByActorType?: "user" | "agent" | "system";
requestedByActorId?: string | null;
contextSnapshot?: Record<string, unknown>;
}
function readNonEmptyString(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value : null;
}
function deriveTaskKey(
contextSnapshot: Record<string, unknown> | null | undefined,
payload: Record<string, unknown> | null | undefined,
) {
return (
readNonEmptyString(contextSnapshot?.taskKey) ??
readNonEmptyString(contextSnapshot?.taskId) ??
readNonEmptyString(contextSnapshot?.issueId) ??
readNonEmptyString(payload?.taskKey) ??
readNonEmptyString(payload?.taskId) ??
readNonEmptyString(payload?.issueId) ??
null
);
}
function deriveCommentId(
contextSnapshot: Record<string, unknown> | null | undefined,
payload: Record<string, unknown> | null | undefined,
) {
return (
readNonEmptyString(contextSnapshot?.wakeCommentId) ??
readNonEmptyString(contextSnapshot?.commentId) ??
readNonEmptyString(payload?.commentId) ??
null
);
}
function mergeCoalescedContextSnapshot(
existingRaw: unknown,
incoming: Record<string, unknown>,
) {
const existing = parseObject(existingRaw);
const merged: Record<string, unknown> = {
...existing,
...incoming,
};
const commentId = deriveCommentId(incoming, null);
if (commentId) {
merged.commentId = commentId;
merged.wakeCommentId = commentId;
}
return merged;
}
function runTaskKey(run: typeof heartbeatRuns.$inferSelect) {
return deriveTaskKey(run.contextSnapshot as Record<string, unknown> | null, null);
}
function isSameTaskScope(left: string | null, right: string | null) {
return (left ?? null) === (right ?? null);
}
function truncateDisplayId(value: string | null | undefined, max = 128) {
if (!value) return null;
return value.length > max ? value.slice(0, max) : value;
}
const defaultSessionCodec: AdapterSessionCodec = {
deserialize(raw: unknown) {
const asObj = parseObject(raw);
if (Object.keys(asObj).length > 0) return asObj;
const sessionId = readNonEmptyString((raw as Record<string, unknown> | null)?.sessionId);
if (sessionId) return { sessionId };
return null;
},
serialize(params: Record<string, unknown> | null) {
if (!params || Object.keys(params).length === 0) return null;
return params;
},
getDisplayId(params: Record<string, unknown> | null) {
return readNonEmptyString(params?.sessionId);
},
};
function getAdapterSessionCodec(adapterType: string) {
const adapter = getServerAdapter(adapterType);
return adapter.sessionCodec ?? defaultSessionCodec;
}
function normalizeSessionParams(params: Record<string, unknown> | null | undefined) {
if (!params) return null;
return Object.keys(params).length > 0 ? params : null;
}
function resolveNextSessionState(input: {
codec: AdapterSessionCodec;
adapterResult: AdapterExecutionResult;
previousParams: Record<string, unknown> | null;
previousDisplayId: string | null;
previousLegacySessionId: string | null;
}) {
const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input;
if (adapterResult.clearSession) {
return {
params: null as Record<string, unknown> | null,
displayId: null as string | null,
legacySessionId: null as string | null,
};
}
const explicitParams = adapterResult.sessionParams;
const hasExplicitParams = adapterResult.sessionParams !== undefined;
const hasExplicitSessionId = adapterResult.sessionId !== undefined;
const explicitSessionId = readNonEmptyString(adapterResult.sessionId);
const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined;
const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId);
const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay;
const candidateParams =
hasExplicitParams
? explicitParams
: hasExplicitSessionId
? (explicitSessionId ? { sessionId: explicitSessionId } : null)
: previousParams;
const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null));
const deserialized = normalizeSessionParams(codec.deserialize(serialized));
const displayId = truncateDisplayId(
explicitDisplayId ??
(codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ??
readNonEmptyString(deserialized?.sessionId) ??
(shouldUsePrevious ? previousDisplayId : null) ??
explicitSessionId ??
(shouldUsePrevious ? previousLegacySessionId : null),
);
const legacySessionId =
explicitSessionId ??
readNonEmptyString(deserialized?.sessionId) ??
displayId ??
(shouldUsePrevious ? previousLegacySessionId : null);
return {
params: serialized,
displayId,
legacySessionId,
};
}
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
const secretsSvc = secretService(db);
async function getAgent(agentId: string) {
return db
.select()
.from(agents)
.where(eq(agents.id, agentId))
.then((rows) => rows[0] ?? null);
}
async function getRun(runId: string) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null);
}
async function getRuntimeState(agentId: string) {
return db
.select()
.from(agentRuntimeState)
.where(eq(agentRuntimeState.agentId, agentId))
.then((rows) => rows[0] ?? null);
}
async function getTaskSession(
companyId: string,
agentId: string,
adapterType: string,
taskKey: string,
) {
return db
.select()
.from(agentTaskSessions)
.where(
and(
eq(agentTaskSessions.companyId, companyId),
eq(agentTaskSessions.agentId, agentId),
eq(agentTaskSessions.adapterType, adapterType),
eq(agentTaskSessions.taskKey, taskKey),
),
)
.then((rows) => rows[0] ?? null);
}
async function upsertTaskSession(input: {
companyId: string;
agentId: string;
adapterType: string;
taskKey: string;
sessionParamsJson: Record<string, unknown> | null;
sessionDisplayId: string | null;
lastRunId: string | null;
lastError: string | null;
}) {
const existing = await getTaskSession(
input.companyId,
input.agentId,
input.adapterType,
input.taskKey,
);
if (existing) {
return db
.update(agentTaskSessions)
.set({
sessionParamsJson: input.sessionParamsJson,
sessionDisplayId: input.sessionDisplayId,
lastRunId: input.lastRunId,
lastError: input.lastError,
updatedAt: new Date(),
})
.where(eq(agentTaskSessions.id, existing.id))
.returning()
.then((rows) => rows[0] ?? null);
}
return db
.insert(agentTaskSessions)
.values({
companyId: input.companyId,
agentId: input.agentId,
adapterType: input.adapterType,
taskKey: input.taskKey,
sessionParamsJson: input.sessionParamsJson,
sessionDisplayId: input.sessionDisplayId,
lastRunId: input.lastRunId,
lastError: input.lastError,
})
.returning()
.then((rows) => rows[0] ?? null);
}
async function clearTaskSessions(
companyId: string,
agentId: string,
opts?: { taskKey?: string | null; adapterType?: string | null },
) {
const conditions = [
eq(agentTaskSessions.companyId, companyId),
eq(agentTaskSessions.agentId, agentId),
];
if (opts?.taskKey) {
conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey));
}
if (opts?.adapterType) {
conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType));
}
return db
.delete(agentTaskSessions)
.where(and(...conditions))
.returning()
.then((rows) => rows.length);
}
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
const existing = await getRuntimeState(agent.id);
if (existing) return existing;
return db
.insert(agentRuntimeState)
.values({
agentId: agent.id,
companyId: agent.companyId,
adapterType: agent.adapterType,
stateJson: {},
})
.returning()
.then((rows) => rows[0]);
}
async function setRunStatus(
runId: string,
status: string,
patch?: Partial<typeof heartbeatRuns.$inferInsert>,
) {
const updated = await db
.update(heartbeatRuns)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(heartbeatRuns.id, runId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "heartbeat.run.status",
payload: {
runId: updated.id,
agentId: updated.agentId,
status: updated.status,
invocationSource: updated.invocationSource,
triggerDetail: updated.triggerDetail,
error: updated.error ?? null,
errorCode: updated.errorCode ?? null,
startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null,
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
},
});
}
return updated;
}
async function setWakeupStatus(
wakeupRequestId: string | null | undefined,
status: string,
patch?: Partial<typeof agentWakeupRequests.$inferInsert>,
) {
if (!wakeupRequestId) return;
await db
.update(agentWakeupRequests)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(agentWakeupRequests.id, wakeupRequestId));
}
async function appendRunEvent(
run: typeof heartbeatRuns.$inferSelect,
seq: number,
event: {
eventType: string;
stream?: "system" | "stdout" | "stderr";
level?: "info" | "warn" | "error";
color?: string;
message?: string;
payload?: Record<string, unknown>;
},
) {
await db.insert(heartbeatRunEvents).values({
companyId: run.companyId,
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream,
level: event.level,
color: event.color,
message: event.message,
payload: event.payload,
});
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.event",
payload: {
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream ?? null,
level: event.level ?? null,
color: event.color ?? null,
message: event.message ?? null,
payload: event.payload ?? null,
},
});
}
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
const runtimeConfig = parseObject(agent.runtimeConfig);
const heartbeat = parseObject(runtimeConfig.heartbeat);
return {
enabled: asBoolean(heartbeat.enabled, true),
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true),
};
}
async function finalizeAgentStatus(
agentId: string,
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
) {
const existing = await getAgent(agentId);
if (!existing) return;
if (existing.status === "paused" || existing.status === "terminated") {
return;
}
const nextStatus =
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
const updated = await db
.update(agents)
.set({
status: nextStatus,
lastHeartbeatAt: new Date(),
updatedAt: new Date(),
})
.where(eq(agents.id, agentId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "agent.status",
payload: {
agentId: updated.id,
status: updated.status,
lastHeartbeatAt: updated.lastHeartbeatAt
? new Date(updated.lastHeartbeatAt).toISOString()
: null,
outcome,
},
});
}
}
async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) {
const staleThresholdMs = opts?.staleThresholdMs ?? 0;
const now = new Date();
// Find all runs in "queued" or "running" state
const activeRuns = await db
.select()
.from(heartbeatRuns)
.where(inArray(heartbeatRuns.status, ["queued", "running"]));
const reaped: string[] = [];
for (const run of activeRuns) {
if (runningProcesses.has(run.id)) continue;
// Apply staleness threshold to avoid false positives
if (staleThresholdMs > 0) {
const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0;
if (now.getTime() - refTime < staleThresholdMs) continue;
}
await setRunStatus(run.id, "failed", {
error: "Process lost -- server may have restarted",
errorCode: "process_lost",
finishedAt: now,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: now,
error: "Process lost -- server may have restarted",
});
const updatedRun = await getRun(run.id);
if (updatedRun) {
await appendRunEvent(updatedRun, 1, {
eventType: "lifecycle",
stream: "system",
level: "error",
message: "Process lost -- server may have restarted",
});
}
await finalizeAgentStatus(run.agentId, "failed");
await startNextQueuedRunForAgent(run.agentId);
runningProcesses.delete(run.id);
reaped.push(run.id);
}
if (reaped.length > 0) {
logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs");
}
return { reaped: reaped.length, runIds: reaped };
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
result: AdapterExecutionResult,
session: { legacySessionId: string | null },
) {
const existing = await ensureRuntimeState(agent);
const usage = result.usage;
const inputTokens = usage?.inputTokens ?? 0;
const outputTokens = usage?.outputTokens ?? 0;
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
await db
.update(agentRuntimeState)
.set({
adapterType: agent.adapterType,
sessionId: session.legacySessionId,
lastRunId: run.id,
lastRunStatus: run.status,
lastError: result.errorMessage ?? null,
totalInputTokens: existing.totalInputTokens + inputTokens,
totalOutputTokens: existing.totalOutputTokens + outputTokens,
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
totalCostCents: existing.totalCostCents + additionalCostCents,
updatedAt: new Date(),
})
.where(eq(agentRuntimeState.agentId, agent.id));
if (additionalCostCents > 0) {
await db.insert(costEvents).values({
companyId: agent.companyId,
agentId: agent.id,
provider: result.provider ?? "unknown",
model: result.model ?? "unknown",
inputTokens,
outputTokens,
costCents: additionalCostCents,
occurredAt: new Date(),
});
await db
.update(agents)
.set({
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`,
updatedAt: new Date(),
})
.where(eq(agents.id, agent.id));
}
}
async function startNextQueuedRunForAgent(agentId: string) {
const running = await db
.select({ id: heartbeatRuns.id })
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running")))
.limit(1)
.then((rows) => rows[0] ?? null);
if (running) return null;
const nextQueued = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
.orderBy(asc(heartbeatRuns.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
if (!nextQueued) return null;
void executeRun(nextQueued.id).catch((err) => {
logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed");
});
return nextQueued;
}
async function executeRun(runId: string) {
const run = await getRun(runId);
if (!run) return;
if (run.status !== "queued" && run.status !== "running") return;
const agent = await getAgent(run.agentId);
if (!agent) {
await setRunStatus(runId, "failed", {
error: "Agent not found",
errorCode: "agent_not_found",
finishedAt: new Date(),
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: "Agent not found",
});
return;
}
if (run.status === "queued") {
const activeForAgent = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(asc(heartbeatRuns.createdAt));
const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id);
const first = activeForAgent[0] ?? null;
if (runningOther || (first && first.id !== run.id)) {
return;
}
}
const runtime = await ensureRuntimeState(agent);
const context = parseObject(run.contextSnapshot);
const taskKey = deriveTaskKey(context, null);
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
const taskSession = taskKey
? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey)
: null;
const previousSessionParams = normalizeSessionParams(
sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null),
);
const previousSessionDisplayId = truncateDisplayId(
taskSession?.sessionDisplayId ??
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ??
readNonEmptyString(previousSessionParams?.sessionId) ??
runtime.sessionId,
);
const runtimeForAdapter = {
sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtime.sessionId,
sessionParams: previousSessionParams,
sessionDisplayId: previousSessionDisplayId,
taskKey,
};
let seq = 1;
let handle: RunLogHandle | null = null;
let stdoutExcerpt = "";
let stderrExcerpt = "";
try {
await setRunStatus(runId, "running", {
startedAt: new Date(),
sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId,
});
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
const runningAgent = await db
.update(agents)
.set({ status: "running", updatedAt: new Date() })
.where(eq(agents.id, agent.id))
.returning()
.then((rows) => rows[0] ?? null);
if (runningAgent) {
publishLiveEvent({
companyId: runningAgent.companyId,
type: "agent.status",
payload: {
agentId: runningAgent.id,
status: runningAgent.status,
outcome: "running",
},
});
}
const currentRun = (await getRun(runId)) ?? run;
await appendRunEvent(currentRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: "info",
message: "run started",
});
handle = await runLogStore.begin({
companyId: run.companyId,
agentId: run.agentId,
runId,
});
await db
.update(heartbeatRuns)
.set({
logStore: handle.store,
logRef: handle.logRef,
updatedAt: new Date(),
})
.where(eq(heartbeatRuns.id, runId));
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk);
if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk);
if (handle) {
await runLogStore.append(handle, {
stream,
chunk,
ts: new Date().toISOString(),
});
}
const payloadChunk =
chunk.length > MAX_LIVE_LOG_CHUNK_BYTES
? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES)
: chunk;
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.log",
payload: {
runId: run.id,
agentId: run.agentId,
stream,
chunk: payloadChunk,
truncated: payloadChunk.length !== chunk.length,
},
});
};
const config = parseObject(agent.adapterConfig);
const resolvedConfig = await secretsSvc.resolveAdapterConfigForRuntime(
agent.companyId,
config,
);
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
await appendRunEvent(currentRun, seq++, {
eventType: "adapter.invoke",
stream: "system",
level: "info",
message: "adapter invocation",
payload: meta as unknown as Record<string, unknown>,
});
};
const adapter = getServerAdapter(agent.adapterType);
const authToken = adapter.supportsLocalAgentJwt
? createLocalAgentJwt(agent.id, agent.companyId, agent.adapterType, run.id)
: null;
if (adapter.supportsLocalAgentJwt && !authToken) {
logger.warn(
{
companyId: agent.companyId,
agentId: agent.id,
runId: run.id,
adapterType: agent.adapterType,
},
"local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY",
);
}
const adapterResult = await adapter.execute({
runId: run.id,
agent,
runtime: runtimeForAdapter,
config: resolvedConfig,
context,
onLog,
onMeta: onAdapterMeta,
authToken: authToken ?? undefined,
});
const nextSessionState = resolveNextSessionState({
codec: sessionCodec,
adapterResult,
previousParams: previousSessionParams,
previousDisplayId: runtimeForAdapter.sessionDisplayId,
previousLegacySessionId: runtimeForAdapter.sessionId,
});
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
const latestRun = await getRun(run.id);
if (latestRun?.status === "cancelled") {
outcome = "cancelled";
} else if (adapterResult.timedOut) {
outcome = "timed_out";
} else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) {
outcome = "succeeded";
} else {
outcome = "failed";
}
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
logSummary = await runLogStore.finalize(handle);
}
const status =
outcome === "succeeded"
? "succeeded"
: outcome === "cancelled"
? "cancelled"
: outcome === "timed_out"
? "timed_out"
: "failed";
const usageJson =
adapterResult.usage || adapterResult.costUsd != null
? ({
...(adapterResult.usage ?? {}),
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
} as Record<string, unknown>)
: null;
await setRunStatus(run.id, status, {
finishedAt: new Date(),
error:
outcome === "succeeded"
? null
: adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"),
errorCode:
outcome === "timed_out"
? "timeout"
: outcome === "cancelled"
? "cancelled"
: outcome === "failed"
? "adapter_failed"
: null,
exitCode: adapterResult.exitCode,
signal: adapterResult.signal,
usageJson,
resultJson: adapterResult.resultJson ?? null,
sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId,
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, {
finishedAt: new Date(),
error: adapterResult.errorMessage ?? null,
});
const finalizedRun = await getRun(run.id);
if (finalizedRun) {
await appendRunEvent(finalizedRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: outcome === "succeeded" ? "info" : "error",
message: `run ${outcome}`,
payload: {
status,
exitCode: adapterResult.exitCode,
},
});
}
if (finalizedRun) {
await updateRuntimeState(agent, finalizedRun, adapterResult, {
legacySessionId: nextSessionState.legacySessionId,
});
if (taskKey) {
if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) {
await clearTaskSessions(agent.companyId, agent.id, {
taskKey,
adapterType: agent.adapterType,
});
} else {
await upsertTaskSession({
companyId: agent.companyId,
agentId: agent.id,
adapterType: agent.adapterType,
taskKey,
sessionParamsJson: nextSessionState.params,
sessionDisplayId: nextSessionState.displayId,
lastRunId: finalizedRun.id,
lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"),
});
}
}
}
await finalizeAgentStatus(agent.id, outcome);
} catch (err) {
const message = err instanceof Error ? err.message : "Unknown adapter failure";
logger.error({ err, runId }, "heartbeat execution failed");
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
try {
logSummary = await runLogStore.finalize(handle);
} catch (finalizeErr) {
logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error");
}
}
const failedRun = await setRunStatus(run.id, "failed", {
error: message,
errorCode: "adapter_failed",
finishedAt: new Date(),
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: message,
});
if (failedRun) {
await appendRunEvent(failedRun, seq++, {
eventType: "error",
stream: "system",
level: "error",
message,
});
await updateRuntimeState(agent, failedRun, {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: message,
}, {
legacySessionId: runtimeForAdapter.sessionId,
});
if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) {
await upsertTaskSession({
companyId: agent.companyId,
agentId: agent.id,
adapterType: agent.adapterType,
taskKey,
sessionParamsJson: previousSessionParams,
sessionDisplayId: previousSessionDisplayId,
lastRunId: failedRun.id,
lastError: message,
});
}
}
await finalizeAgentStatus(agent.id, "failed");
} finally {
await startNextQueuedRunForAgent(agent.id);
}
}
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
const source = opts.source ?? "on_demand";
const triggerDetail = opts.triggerDetail ?? null;
const contextSnapshot: Record<string, unknown> = { ...(opts.contextSnapshot ?? {}) };
const reason = opts.reason ?? null;
const payload = opts.payload ?? null;
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]);
const taskKey = deriveTaskKey(contextSnapshot, payload);
const wakeCommentId = deriveCommentId(contextSnapshot, payload);
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
contextSnapshot.wakeReason = reason;
}
if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) {
contextSnapshot.issueId = issueIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
contextSnapshot.taskId = issueIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) {
contextSnapshot.taskKey = taskKey;
}
if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) {
contextSnapshot.commentId = commentIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) {
contextSnapshot.wakeCommentId = wakeCommentId;
}
if (!readNonEmptyString(contextSnapshot["wakeSource"])) {
contextSnapshot.wakeSource = source;
}
if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) {
contextSnapshot.wakeTriggerDetail = triggerDetail;
}
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
if (
agent.status === "paused" ||
agent.status === "terminated" ||
agent.status === "pending_approval"
) {
throw conflict("Agent is not invokable in its current state", { status: agent.status });
}
const policy = parseHeartbeatPolicy(agent);
const writeSkippedRequest = async (reason: string) => {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason,
payload,
status: "skipped",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
finishedAt: new Date(),
});
};
if (source === "timer" && !policy.enabled) {
await writeSkippedRequest("heartbeat.disabled");
return null;
}
if (source !== "timer" && !policy.wakeOnDemand) {
await writeSkippedRequest("heartbeat.wakeOnDemand.disabled");
return null;
}
const activeRuns = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(desc(heartbeatRuns.createdAt));
const sameScopeQueuedRun = activeRuns.find(
(candidate) => candidate.status === "queued" && isSameTaskScope(runTaskKey(candidate), taskKey),
);
const sameScopeRunningRun = activeRuns.find(
(candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey),
);
const shouldQueueFollowupForCommentWake =
Boolean(wakeCommentId) && Boolean(sameScopeRunningRun) && !sameScopeQueuedRun;
const coalescedTargetRun =
sameScopeQueuedRun ??
(shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ?? null);
if (coalescedTargetRun) {
const mergedContextSnapshot = mergeCoalescedContextSnapshot(
coalescedTargetRun.contextSnapshot,
contextSnapshot,
);
const mergedRun = await db
.update(heartbeatRuns)
.set({
contextSnapshot: mergedContextSnapshot,
updatedAt: new Date(),
})
.where(eq(heartbeatRuns.id, coalescedTargetRun.id))
.returning()
.then((rows) => rows[0] ?? coalescedTargetRun);
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason,
payload,
status: "coalesced",
coalescedCount: 1,
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
runId: mergedRun.id,
finishedAt: new Date(),
});
return mergedRun;
}
const wakeupRequest = await db
.insert(agentWakeupRequests)
.values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason,
payload,
status: "queued",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
})
.returning()
.then((rows) => rows[0]);
let sessionBefore: string | null = null;
if (taskKey) {
const codec = getAdapterSessionCodec(agent.adapterType);
const existingTaskSession = await getTaskSession(
agent.companyId,
agent.id,
agent.adapterType,
taskKey,
);
const parsedParams = normalizeSessionParams(
codec.deserialize(existingTaskSession?.sessionParamsJson ?? null),
);
sessionBefore = truncateDisplayId(
existingTaskSession?.sessionDisplayId ??
(codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ??
readNonEmptyString(parsedParams?.sessionId),
);
} else {
const runtimeForRun = await getRuntimeState(agent.id);
sessionBefore = runtimeForRun?.sessionId ?? null;
}
const newRun = await db
.insert(heartbeatRuns)
.values({
companyId: agent.companyId,
agentId,
invocationSource: source,
triggerDetail,
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot,
sessionIdBefore: sessionBefore,
})
.returning()
.then((rows) => rows[0]);
await db
.update(agentWakeupRequests)
.set({
runId: newRun.id,
updatedAt: new Date(),
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
publishLiveEvent({
companyId: newRun.companyId,
type: "heartbeat.run.queued",
payload: {
runId: newRun.id,
agentId: newRun.agentId,
invocationSource: newRun.invocationSource,
triggerDetail: newRun.triggerDetail,
wakeupRequestId: newRun.wakeupRequestId,
},
});
await startNextQueuedRunForAgent(agent.id);
return newRun;
}
return {
list: (companyId: string, agentId?: string) => {
if (!agentId) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.companyId, companyId))
.orderBy(desc(heartbeatRuns.createdAt));
}
return db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)))
.orderBy(desc(heartbeatRuns.createdAt));
},
getRun,
getRuntimeState: async (agentId: string) => {
const state = await getRuntimeState(agentId);
const agent = await getAgent(agentId);
if (!agent) return null;
const ensured = state ?? (await ensureRuntimeState(agent));
const latestTaskSession = await db
.select()
.from(agentTaskSessions)
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id)))
.orderBy(desc(agentTaskSessions.updatedAt))
.limit(1)
.then((rows) => rows[0] ?? null);
return {
...ensured,
sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId,
sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null,
};
},
listTaskSessions: async (agentId: string) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
return db
.select()
.from(agentTaskSessions)
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId)))
.orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt));
},
resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
await ensureRuntimeState(agent);
const taskKey = readNonEmptyString(opts?.taskKey);
const clearedTaskSessions = await clearTaskSessions(
agent.companyId,
agent.id,
taskKey ? { taskKey, adapterType: agent.adapterType } : undefined,
);
const runtimePatch: Partial<typeof agentRuntimeState.$inferInsert> = {
sessionId: null,
lastError: null,
updatedAt: new Date(),
};
if (!taskKey) {
runtimePatch.stateJson = {};
}
const updated = await db
.update(agentRuntimeState)
.set(runtimePatch)
.where(eq(agentRuntimeState.agentId, agentId))
.returning()
.then((rows) => rows[0] ?? null);
if (!updated) return null;
return {
...updated,
sessionDisplayId: null,
sessionParamsJson: null,
clearedTaskSessions,
};
},
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
db
.select()
.from(heartbeatRunEvents)
.where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq)))
.orderBy(asc(heartbeatRunEvents.seq))
.limit(Math.max(1, Math.min(limit, 1000))),
readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (!run.logStore || !run.logRef) throw notFound("Run log not found");
const result = await runLogStore.read(
{
store: run.logStore as "local_file",
logRef: run.logRef,
},
opts,
);
return {
runId,
store: run.logStore,
logRef: run.logRef,
...result,
};
},
invoke: async (
agentId: string,
source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand",
contextSnapshot: Record<string, unknown> = {},
triggerDetail: "manual" | "ping" | "callback" | "system" = "manual",
actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null },
) =>
enqueueWakeup(agentId, {
source,
triggerDetail,
contextSnapshot,
requestedByActorType: actor?.actorType,
requestedByActorId: actor?.actorId ?? null,
}),
wakeup: enqueueWakeup,
reapOrphanedRuns,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;
let enqueued = 0;
let skipped = 0;
for (const agent of allAgents) {
if (agent.status === "paused" || agent.status === "terminated") continue;
const policy = parseHeartbeatPolicy(agent);
if (!policy.enabled || policy.intervalSec <= 0) continue;
checked += 1;
const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0;
const elapsedMs = now.getTime() - last;
if (last && elapsedMs < policy.intervalSec * 1000) continue;
const run = await enqueueWakeup(agent.id, {
source: "timer",
triggerDetail: "system",
reason: "heartbeat_timer",
requestedByActorType: "system",
requestedByActorId: "heartbeat_scheduler",
contextSnapshot: {
source: "scheduler",
reason: "interval_elapsed",
now: now.toISOString(),
},
});
if (run) enqueued += 1;
else skipped += 1;
}
return { checked, enqueued, skipped };
},
cancelRun: async (runId: string) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (run.status !== "running" && run.status !== "queued") return run;
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
}
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
});
if (cancelled) {
await appendRunEvent(cancelled, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "run cancelled",
});
}
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
await startNextQueuedRunForAgent(run.agentId);
return cancelled;
},
cancelActiveForAgent: async (agentId: string) => {
const runs = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
for (const run of runs) {
await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
});
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
runningProcesses.delete(run.id);
}
}
return runs.length;
},
getActiveRunForAgent: async (agentId: string) => {
const [run] = await db
.select()
.from(heartbeatRuns)
.where(
and(
eq(heartbeatRuns.agentId, agentId),
eq(heartbeatRuns.status, "running"),
),
)
.orderBy(desc(heartbeatRuns.startedAt))
.limit(1);
return run ?? null;
},
};
}