Implement task-scoped sessions, queued run chaining, and session reset API

Heartbeat service now resolves session state per-task using agentTaskSessions,
with resolveNextSessionState handling codec-based serialization and fallback
to legacy sessionId. Queued runs are chained — when a run finishes or is reaped,
the next queued run for the same agent starts automatically. Queued runs for
an agent with an already-running run wait instead of failing.

Add task-sessions list endpoint and extend reset-session to accept optional
taskKey for targeted session clearing. Block pending_approval agents from
API key auth. Update agent/company delete cascades to include task sessions.
Update spec docs with task-session architecture.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Forgotten
2026-02-19 14:02:17 -06:00
parent d56618e9fe
commit 2acf28a51a
6 changed files with 467 additions and 46 deletions

View File

@@ -6,6 +6,7 @@ import {
agentConfigRevisions,
agentApiKeys,
agentRuntimeState,
agentTaskSessions,
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
@@ -302,6 +303,7 @@ export function agentService(db: Db) {
return db.transaction(async (tx) => {
await tx.update(agents).set({ reportsTo: null }).where(eq(agents.reportsTo, id));
await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.agentId, id));
await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.agentId, id));
await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.agentId, id));
await tx.delete(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, id));
await tx.delete(agentApiKeys).where(eq(agentApiKeys.agentId, id));

View File

@@ -5,6 +5,7 @@ import {
agents,
agentApiKeys,
agentRuntimeState,
agentTaskSessions,
agentWakeupRequests,
issues,
issueComments,
@@ -56,6 +57,7 @@ export function companyService(db: Db) {
db.transaction(async (tx) => {
// Delete from child tables in dependency order
await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.companyId, id));
await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.companyId, id));
await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.companyId, id));
await tx.delete(agentWakeupRequests).where(eq(agentWakeupRequests.companyId, id));
await tx.delete(agentApiKeys).where(eq(agentApiKeys.companyId, id));

View File

@@ -3,6 +3,7 @@ import type { Db } from "@paperclip/db";
import {
agents,
agentRuntimeState,
agentTaskSessions,
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
@@ -13,7 +14,7 @@ import { logger } from "../middleware/logger.js";
import { publishLiveEvent } from "./live-events.js";
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
import { getServerAdapter, runningProcesses } from "../adapters/index.js";
import type { AdapterExecutionResult, AdapterInvocationMeta } from "../adapters/index.js";
import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec } from "../adapters/index.js";
import { createLocalAgentJwt } from "../agent-auth-jwt.js";
import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js";
@@ -38,6 +39,118 @@ function readNonEmptyString(value: unknown): string | null {
return typeof value === "string" && value.trim().length > 0 ? value : null;
}
function deriveTaskKey(
contextSnapshot: Record<string, unknown> | null | undefined,
payload: Record<string, unknown> | null | undefined,
) {
return (
readNonEmptyString(contextSnapshot?.taskKey) ??
readNonEmptyString(contextSnapshot?.taskId) ??
readNonEmptyString(contextSnapshot?.issueId) ??
readNonEmptyString(payload?.taskKey) ??
readNonEmptyString(payload?.taskId) ??
readNonEmptyString(payload?.issueId) ??
null
);
}
function runTaskKey(run: typeof heartbeatRuns.$inferSelect) {
return deriveTaskKey(run.contextSnapshot as Record<string, unknown> | null, null);
}
function isSameTaskScope(left: string | null, right: string | null) {
return (left ?? null) === (right ?? null);
}
function truncateDisplayId(value: string | null | undefined, max = 128) {
if (!value) return null;
return value.length > max ? value.slice(0, max) : value;
}
const defaultSessionCodec: AdapterSessionCodec = {
deserialize(raw: unknown) {
const asObj = parseObject(raw);
if (Object.keys(asObj).length > 0) return asObj;
const sessionId = readNonEmptyString((raw as Record<string, unknown> | null)?.sessionId);
if (sessionId) return { sessionId };
return null;
},
serialize(params: Record<string, unknown> | null) {
if (!params || Object.keys(params).length === 0) return null;
return params;
},
getDisplayId(params: Record<string, unknown> | null) {
return readNonEmptyString(params?.sessionId);
},
};
function getAdapterSessionCodec(adapterType: string) {
const adapter = getServerAdapter(adapterType);
return adapter.sessionCodec ?? defaultSessionCodec;
}
function normalizeSessionParams(params: Record<string, unknown> | null | undefined) {
if (!params) return null;
return Object.keys(params).length > 0 ? params : null;
}
function resolveNextSessionState(input: {
codec: AdapterSessionCodec;
adapterResult: AdapterExecutionResult;
previousParams: Record<string, unknown> | null;
previousDisplayId: string | null;
previousLegacySessionId: string | null;
}) {
const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input;
if (adapterResult.clearSession) {
return {
params: null as Record<string, unknown> | null,
displayId: null as string | null,
legacySessionId: null as string | null,
};
}
const explicitParams = adapterResult.sessionParams;
const hasExplicitParams = adapterResult.sessionParams !== undefined;
const hasExplicitSessionId = adapterResult.sessionId !== undefined;
const explicitSessionId = readNonEmptyString(adapterResult.sessionId);
const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined;
const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId);
const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay;
const candidateParams =
hasExplicitParams
? explicitParams
: hasExplicitSessionId
? (explicitSessionId ? { sessionId: explicitSessionId } : null)
: previousParams;
const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null));
const deserialized = normalizeSessionParams(codec.deserialize(serialized));
const displayId = truncateDisplayId(
explicitDisplayId ??
(codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ??
readNonEmptyString(deserialized?.sessionId) ??
(shouldUsePrevious ? previousDisplayId : null) ??
explicitSessionId ??
(shouldUsePrevious ? previousLegacySessionId : null),
);
const legacySessionId =
explicitSessionId ??
readNonEmptyString(deserialized?.sessionId) ??
displayId ??
(shouldUsePrevious ? previousLegacySessionId : null);
return {
params: serialized,
displayId,
legacySessionId,
};
}
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
@@ -65,6 +178,96 @@ export function heartbeatService(db: Db) {
.then((rows) => rows[0] ?? null);
}
async function getTaskSession(
companyId: string,
agentId: string,
adapterType: string,
taskKey: string,
) {
return db
.select()
.from(agentTaskSessions)
.where(
and(
eq(agentTaskSessions.companyId, companyId),
eq(agentTaskSessions.agentId, agentId),
eq(agentTaskSessions.adapterType, adapterType),
eq(agentTaskSessions.taskKey, taskKey),
),
)
.then((rows) => rows[0] ?? null);
}
async function upsertTaskSession(input: {
companyId: string;
agentId: string;
adapterType: string;
taskKey: string;
sessionParamsJson: Record<string, unknown> | null;
sessionDisplayId: string | null;
lastRunId: string | null;
lastError: string | null;
}) {
const existing = await getTaskSession(
input.companyId,
input.agentId,
input.adapterType,
input.taskKey,
);
if (existing) {
return db
.update(agentTaskSessions)
.set({
sessionParamsJson: input.sessionParamsJson,
sessionDisplayId: input.sessionDisplayId,
lastRunId: input.lastRunId,
lastError: input.lastError,
updatedAt: new Date(),
})
.where(eq(agentTaskSessions.id, existing.id))
.returning()
.then((rows) => rows[0] ?? null);
}
return db
.insert(agentTaskSessions)
.values({
companyId: input.companyId,
agentId: input.agentId,
adapterType: input.adapterType,
taskKey: input.taskKey,
sessionParamsJson: input.sessionParamsJson,
sessionDisplayId: input.sessionDisplayId,
lastRunId: input.lastRunId,
lastError: input.lastError,
})
.returning()
.then((rows) => rows[0] ?? null);
}
async function clearTaskSessions(
companyId: string,
agentId: string,
opts?: { taskKey?: string | null; adapterType?: string | null },
) {
const conditions = [
eq(agentTaskSessions.companyId, companyId),
eq(agentTaskSessions.agentId, agentId),
];
if (opts?.taskKey) {
conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey));
}
if (opts?.adapterType) {
conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType));
}
return db
.delete(agentTaskSessions)
.where(and(...conditions))
.returning()
.then((rows) => rows.length);
}
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
const existing = await getRuntimeState(agent.id);
if (existing) return existing;
@@ -260,6 +463,7 @@ export function heartbeatService(db: Db) {
});
}
await finalizeAgentStatus(run.agentId, "failed");
await startNextQueuedRunForAgent(run.agentId);
runningProcesses.delete(run.id);
reaped.push(run.id);
}
@@ -274,6 +478,7 @@ export function heartbeatService(db: Db) {
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
result: AdapterExecutionResult,
session: { legacySessionId: string | null },
) {
const existing = await ensureRuntimeState(agent);
const usage = result.usage;
@@ -286,7 +491,7 @@ export function heartbeatService(db: Db) {
.update(agentRuntimeState)
.set({
adapterType: agent.adapterType,
sessionId: result.clearSession ? null : (result.sessionId ?? existing.sessionId),
sessionId: session.legacySessionId,
lastRunId: run.id,
lastRunStatus: run.status,
lastError: result.errorMessage ?? null,
@@ -320,6 +525,30 @@ export function heartbeatService(db: Db) {
}
}
async function startNextQueuedRunForAgent(agentId: string) {
const running = await db
.select({ id: heartbeatRuns.id })
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running")))
.limit(1)
.then((rows) => rows[0] ?? null);
if (running) return null;
const nextQueued = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
.orderBy(asc(heartbeatRuns.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
if (!nextQueued) return null;
void executeRun(nextQueued.id).catch((err) => {
logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed");
});
return nextQueued;
}
async function executeRun(runId: string) {
const run = await getRun(runId);
if (!run) return;
@@ -339,7 +568,41 @@ export function heartbeatService(db: Db) {
return;
}
if (run.status === "queued") {
const activeForAgent = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(asc(heartbeatRuns.createdAt));
const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id);
const first = activeForAgent[0] ?? null;
if (runningOther || (first && first.id !== run.id)) {
return;
}
}
const runtime = await ensureRuntimeState(agent);
const context = parseObject(run.contextSnapshot);
const taskKey = deriveTaskKey(context, null);
const sessionCodec = getAdapterSessionCodec(agent.adapterType);
const taskSession = taskKey
? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey)
: null;
const previousSessionParams = normalizeSessionParams(
sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null),
);
const previousSessionDisplayId = truncateDisplayId(
taskSession?.sessionDisplayId ??
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ??
readNonEmptyString(previousSessionParams?.sessionId) ??
runtime.sessionId,
);
const runtimeForAdapter = {
sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtime.sessionId,
sessionParams: previousSessionParams,
sessionDisplayId: previousSessionDisplayId,
taskKey,
};
let seq = 1;
let handle: RunLogHandle | null = null;
@@ -349,7 +612,7 @@ export function heartbeatService(db: Db) {
try {
await setRunStatus(runId, "running", {
startedAt: new Date(),
sessionIdBefore: runtime.sessionId,
sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId,
});
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
@@ -426,7 +689,6 @@ export function heartbeatService(db: Db) {
};
const config = parseObject(agent.adapterConfig);
const context = (run.contextSnapshot ?? {}) as Record<string, unknown>;
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
await appendRunEvent(currentRun, seq++, {
eventType: "adapter.invoke",
@@ -455,13 +717,20 @@ export function heartbeatService(db: Db) {
const adapterResult = await adapter.execute({
runId: run.id,
agent,
runtime,
runtime: runtimeForAdapter,
config,
context,
onLog,
onMeta: onAdapterMeta,
authToken: authToken ?? undefined,
});
const nextSessionState = resolveNextSessionState({
codec: sessionCodec,
adapterResult,
previousParams: previousSessionParams,
previousDisplayId: runtimeForAdapter.sessionDisplayId,
previousLegacySessionId: runtimeForAdapter.sessionId,
});
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
const latestRun = await getRun(run.id);
@@ -515,7 +784,7 @@ export function heartbeatService(db: Db) {
signal: adapterResult.signal,
usageJson,
resultJson: adapterResult.resultJson ?? null,
sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId,
sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId,
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
@@ -543,7 +812,28 @@ export function heartbeatService(db: Db) {
}
if (finalizedRun) {
await updateRuntimeState(agent, finalizedRun, adapterResult);
await updateRuntimeState(agent, finalizedRun, adapterResult, {
legacySessionId: nextSessionState.legacySessionId,
});
if (taskKey) {
if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) {
await clearTaskSessions(agent.companyId, agent.id, {
taskKey,
adapterType: agent.adapterType,
});
} else {
await upsertTaskSession({
companyId: agent.companyId,
agentId: agent.id,
adapterType: agent.adapterType,
taskKey,
sessionParamsJson: nextSessionState.params,
sessionDisplayId: nextSessionState.displayId,
lastRunId: finalizedRun.id,
lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"),
});
}
}
}
await finalizeAgentStatus(agent.id, outcome);
} catch (err) {
@@ -587,10 +877,27 @@ export function heartbeatService(db: Db) {
signal: null,
timedOut: false,
errorMessage: message,
}, {
legacySessionId: runtimeForAdapter.sessionId,
});
if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) {
await upsertTaskSession({
companyId: agent.companyId,
agentId: agent.id,
adapterType: agent.adapterType,
taskKey,
sessionParamsJson: previousSessionParams,
sessionDisplayId: previousSessionDisplayId,
lastRunId: failedRun.id,
lastError: message,
});
}
}
await finalizeAgentStatus(agent.id, "failed");
} finally {
await startNextQueuedRunForAgent(agent.id);
}
}
@@ -601,6 +908,7 @@ export function heartbeatService(db: Db) {
const reason = opts.reason ?? null;
const payload = opts.payload ?? null;
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
const taskKey = deriveTaskKey(contextSnapshot, payload);
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
contextSnapshot.wakeReason = reason;
@@ -611,6 +919,9 @@ export function heartbeatService(db: Db) {
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
contextSnapshot.taskId = issueIdFromPayload;
}
if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) {
contextSnapshot.taskKey = taskKey;
}
if (!readNonEmptyString(contextSnapshot["wakeSource"])) {
contextSnapshot.wakeSource = source;
}
@@ -655,14 +966,17 @@ export function heartbeatService(db: Db) {
return null;
}
const activeRun = await db
const activeRuns = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(desc(heartbeatRuns.createdAt))
.then((rows) => rows[0] ?? null);
.orderBy(desc(heartbeatRuns.createdAt));
if (activeRun) {
const sameScopeRun = activeRuns.find((candidate) =>
isSameTaskScope(runTaskKey(candidate), taskKey),
);
if (sameScopeRun) {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
@@ -675,10 +989,10 @@ export function heartbeatService(db: Db) {
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
runId: activeRun.id,
runId: sameScopeRun.id,
finishedAt: new Date(),
});
return activeRun;
return sameScopeRun;
}
const wakeupRequest = await db
@@ -698,7 +1012,27 @@ export function heartbeatService(db: Db) {
.returning()
.then((rows) => rows[0]);
const runtimeForRun = await getRuntimeState(agent.id);
let sessionBefore: string | null = null;
if (taskKey) {
const codec = getAdapterSessionCodec(agent.adapterType);
const existingTaskSession = await getTaskSession(
agent.companyId,
agent.id,
agent.adapterType,
taskKey,
);
const parsedParams = normalizeSessionParams(
codec.deserialize(existingTaskSession?.sessionParamsJson ?? null),
);
sessionBefore = truncateDisplayId(
existingTaskSession?.sessionDisplayId ??
(codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ??
readNonEmptyString(parsedParams?.sessionId),
);
} else {
const runtimeForRun = await getRuntimeState(agent.id);
sessionBefore = runtimeForRun?.sessionId ?? null;
}
const newRun = await db
.insert(heartbeatRuns)
@@ -710,7 +1044,7 @@ export function heartbeatService(db: Db) {
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot,
sessionIdBefore: runtimeForRun?.sessionId ?? null,
sessionIdBefore: sessionBefore,
})
.returning()
.then((rows) => rows[0]);
@@ -735,9 +1069,7 @@ export function heartbeatService(db: Db) {
},
});
void executeRun(newRun.id).catch((err) => {
logger.error({ err, runId: newRun.id }, "heartbeat execution failed");
});
await startNextQueuedRunForAgent(agent.id);
return newRun;
}
@@ -763,29 +1095,67 @@ export function heartbeatService(db: Db) {
getRuntimeState: async (agentId: string) => {
const state = await getRuntimeState(agentId);
if (state) return state;
const agent = await getAgent(agentId);
if (!agent) return null;
return ensureRuntimeState(agent);
const ensured = state ?? (await ensureRuntimeState(agent));
const latestTaskSession = await db
.select()
.from(agentTaskSessions)
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id)))
.orderBy(desc(agentTaskSessions.updatedAt))
.limit(1)
.then((rows) => rows[0] ?? null);
return {
...ensured,
sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId,
sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null,
};
},
resetRuntimeSession: async (agentId: string) => {
listTaskSessions: async (agentId: string) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
return db
.select()
.from(agentTaskSessions)
.where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId)))
.orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt));
},
resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
await ensureRuntimeState(agent);
const taskKey = readNonEmptyString(opts?.taskKey);
const clearedTaskSessions = await clearTaskSessions(
agent.companyId,
agent.id,
taskKey ? { taskKey, adapterType: agent.adapterType } : undefined,
);
const runtimePatch: Partial<typeof agentRuntimeState.$inferInsert> = {
sessionId: null,
lastError: null,
updatedAt: new Date(),
};
if (!taskKey) {
runtimePatch.stateJson = {};
}
return db
const updated = await db
.update(agentRuntimeState)
.set({
sessionId: null,
stateJson: {},
lastError: null,
updatedAt: new Date(),
})
.set(runtimePatch)
.where(eq(agentRuntimeState.agentId, agentId))
.returning()
.then((rows) => rows[0] ?? null);
if (!updated) return null;
return {
...updated,
sessionDisplayId: null,
sessionParamsJson: null,
clearedTaskSessions,
};
},
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
@@ -909,6 +1279,7 @@ export function heartbeatService(db: Db) {
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
await startNextQueuedRunForAgent(run.agentId);
return cancelled;
},