diff --git a/doc/spec/agent-runs.md b/doc/spec/agent-runs.md index 71371597..c8acd4c6 100644 --- a/doc/spec/agent-runs.md +++ b/doc/spec/agent-runs.md @@ -423,7 +423,7 @@ This separation keeps adapter config runtime-agnostic while allowing the heartbe ## 9.1 New table: `agent_runtime_state` -One row per agent for resumable adapter state. +One row per agent for aggregate runtime counters and legacy compatibility. - `agent_id` uuid pk fk `agents.id` - `company_id` uuid fk not null @@ -441,6 +441,24 @@ One row per agent for resumable adapter state. Invariant: exactly one runtime state row per agent. +## 9.1.1 New table: `agent_task_sessions` + +One row per `(company_id, agent_id, adapter_type, task_key)` for resumable session state. + +- `id` uuid pk +- `company_id` uuid fk not null +- `agent_id` uuid fk not null +- `adapter_type` text not null +- `task_key` text not null +- `session_params_json` jsonb null (adapter-defined shape) +- `session_display_id` text null (for UI/debug) +- `last_run_id` uuid fk `heartbeat_runs.id` null +- `last_error` text null +- `created_at` timestamptz not null +- `updated_at` timestamptz not null + +Invariant: unique `(company_id, agent_id, adapter_type, task_key)`. + ## 9.2 New table: `agent_wakeup_requests` Queue + audit for wakeups. @@ -662,13 +680,15 @@ On server startup: - backward-compatible alias to wakeup API 3. `GET /agents/:agentId/runtime-state` - board-only debug view -4. `POST /agents/:agentId/runtime-state/reset-session` - - clears stored session ID -5. `GET /heartbeat-runs/:runId/events?afterSeq=:n` +4. `GET /agents/:agentId/task-sessions` + - board-only list of task-scoped adapter sessions +5. `POST /agents/:agentId/runtime-state/reset-session` + - clears all task sessions for the agent, or one when `taskKey` is provided +6. `GET /heartbeat-runs/:runId/events?afterSeq=:n` - fetch persisted lightweight timeline -6. `GET /heartbeat-runs/:runId/log` +7. `GET /heartbeat-runs/:runId/log` - reads full log stream via `RunLogStore` (or redirects/presigned URL for object store) -7. `GET /api/companies/:companyId/events/ws` +8. `GET /api/companies/:companyId/events/ws` - websocket stream ## 13.2 Mutation logging @@ -726,7 +746,7 @@ All wakeup/run state mutations must create `activity_log` entries: ## 15. Acceptance Criteria 1. Agent with `claude-local` or `codex-local` can run, exit, and persist run result. -2. Session ID is persisted and used for next run resume automatically. +2. Session parameters are persisted per task scope and reused automatically for same-task resumes. 3. Token usage is persisted per run and accumulated per agent runtime state. 4. Timer, assignment, on-demand, and automation wakeups all enqueue through one coordinator. 5. Pause/terminate interrupts running local process and prevents new wakeups. @@ -737,7 +757,6 @@ All wakeup/run state mutations must create `activity_log` entries: ## 16. Open Questions 1. Should timer default be `null` (off until enabled) or `300` seconds by default? -2. For invalid resume session errors, should default behavior be fail-fast or auto-reset-and-retry-once? -3. What should the default retention policy be for full log objects vs Postgres metadata? -4. Should agent API credentials be allowed in prompt templates by default, or require explicit opt-in toggle? -5. Should websocket be the only realtime channel, or should we also expose SSE for simpler clients? +2. What should the default retention policy be for full log objects vs Postgres metadata? +3. Should agent API credentials be allowed in prompt templates by default, or require explicit opt-in toggle? +4. Should websocket be the only realtime channel, or should we also expose SSE for simpler clients? diff --git a/doc/spec/agents-runtime.md b/doc/spec/agents-runtime.md index a12a902e..0dc7a63f 100644 --- a/doc/spec/agents-runtime.md +++ b/doc/spec/agents-runtime.md @@ -71,11 +71,13 @@ Templates support variables like `{{agent.id}}`, `{{agent.name}}`, and run conte ## 4. Session resume behavior -Paperclip stores session IDs for resumable adapters. +Paperclip stores resumable session state per `(agent, taskKey, adapterType)`. +`taskKey` is derived from wakeup context (`taskKey`, `taskId`, or `issueId`). -- Next heartbeat reuses the saved session automatically. -- This gives continuity across heartbeats. -- You can reset a session if context gets stale or confused. +- A heartbeat for the same task key reuses the previous session for that task. +- Different task keys for the same agent keep separate session state. +- If restore fails, adapters should retry once with a fresh session and continue. +- You can reset all sessions for an agent or reset one task session by task key. Use session reset when: diff --git a/server/src/routes/agents.ts b/server/src/routes/agents.ts index 246da678..50b51c62 100644 --- a/server/src/routes/agents.ts +++ b/server/src/routes/agents.ts @@ -6,6 +6,7 @@ import { createAgentKeySchema, createAgentHireSchema, createAgentSchema, + resetAgentSessionSchema, updateAgentPermissionsSchema, wakeAgentSchema, updateAgentSchema, @@ -357,7 +358,7 @@ export function agentRoutes(db: Db) { res.json(state); }); - router.post("/agents/:id/runtime-state/reset-session", async (req, res) => { + router.get("/agents/:id/task-sessions", async (req, res) => { assertBoard(req); const id = req.params.id as string; const agent = await svc.getById(id); @@ -367,7 +368,30 @@ export function agentRoutes(db: Db) { } assertCompanyAccess(req, agent.companyId); - const state = await heartbeat.resetRuntimeSession(id); + const sessions = await heartbeat.listTaskSessions(id); + res.json( + sessions.map((session) => ({ + ...session, + sessionParamsJson: redactEventPayload(session.sessionParamsJson ?? null), + })), + ); + }); + + router.post("/agents/:id/runtime-state/reset-session", validate(resetAgentSessionSchema), async (req, res) => { + assertBoard(req); + const id = req.params.id as string; + const agent = await svc.getById(id); + if (!agent) { + res.status(404).json({ error: "Agent not found" }); + return; + } + assertCompanyAccess(req, agent.companyId); + + const taskKey = + typeof req.body.taskKey === "string" && req.body.taskKey.trim().length > 0 + ? req.body.taskKey.trim() + : null; + const state = await heartbeat.resetRuntimeSession(id, { taskKey }); await logActivity(db, { companyId: agent.companyId, @@ -376,6 +400,7 @@ export function agentRoutes(db: Db) { action: "agent.runtime_session_reset", entityType: "agent", entityId: id, + details: { taskKey: taskKey ?? null }, }); res.json(state); diff --git a/server/src/services/agents.ts b/server/src/services/agents.ts index c4565768..ec32d3ed 100644 --- a/server/src/services/agents.ts +++ b/server/src/services/agents.ts @@ -6,6 +6,7 @@ import { agentConfigRevisions, agentApiKeys, agentRuntimeState, + agentTaskSessions, agentWakeupRequests, heartbeatRunEvents, heartbeatRuns, @@ -302,6 +303,7 @@ export function agentService(db: Db) { return db.transaction(async (tx) => { await tx.update(agents).set({ reportsTo: null }).where(eq(agents.reportsTo, id)); await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.agentId, id)); + await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.agentId, id)); await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.agentId, id)); await tx.delete(agentWakeupRequests).where(eq(agentWakeupRequests.agentId, id)); await tx.delete(agentApiKeys).where(eq(agentApiKeys.agentId, id)); diff --git a/server/src/services/companies.ts b/server/src/services/companies.ts index 364e8171..3e271aba 100644 --- a/server/src/services/companies.ts +++ b/server/src/services/companies.ts @@ -5,6 +5,7 @@ import { agents, agentApiKeys, agentRuntimeState, + agentTaskSessions, agentWakeupRequests, issues, issueComments, @@ -56,6 +57,7 @@ export function companyService(db: Db) { db.transaction(async (tx) => { // Delete from child tables in dependency order await tx.delete(heartbeatRunEvents).where(eq(heartbeatRunEvents.companyId, id)); + await tx.delete(agentTaskSessions).where(eq(agentTaskSessions.companyId, id)); await tx.delete(heartbeatRuns).where(eq(heartbeatRuns.companyId, id)); await tx.delete(agentWakeupRequests).where(eq(agentWakeupRequests.companyId, id)); await tx.delete(agentApiKeys).where(eq(agentApiKeys.companyId, id)); diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 45001c2c..2ded0a68 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -3,6 +3,7 @@ import type { Db } from "@paperclip/db"; import { agents, agentRuntimeState, + agentTaskSessions, agentWakeupRequests, heartbeatRunEvents, heartbeatRuns, @@ -13,7 +14,7 @@ import { logger } from "../middleware/logger.js"; import { publishLiveEvent } from "./live-events.js"; import { getRunLogStore, type RunLogHandle } from "./run-log-store.js"; import { getServerAdapter, runningProcesses } from "../adapters/index.js"; -import type { AdapterExecutionResult, AdapterInvocationMeta } from "../adapters/index.js"; +import type { AdapterExecutionResult, AdapterInvocationMeta, AdapterSessionCodec } from "../adapters/index.js"; import { createLocalAgentJwt } from "../agent-auth-jwt.js"; import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js"; @@ -38,6 +39,118 @@ function readNonEmptyString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } +function deriveTaskKey( + contextSnapshot: Record | null | undefined, + payload: Record | null | undefined, +) { + return ( + readNonEmptyString(contextSnapshot?.taskKey) ?? + readNonEmptyString(contextSnapshot?.taskId) ?? + readNonEmptyString(contextSnapshot?.issueId) ?? + readNonEmptyString(payload?.taskKey) ?? + readNonEmptyString(payload?.taskId) ?? + readNonEmptyString(payload?.issueId) ?? + null + ); +} + +function runTaskKey(run: typeof heartbeatRuns.$inferSelect) { + return deriveTaskKey(run.contextSnapshot as Record | null, null); +} + +function isSameTaskScope(left: string | null, right: string | null) { + return (left ?? null) === (right ?? null); +} + +function truncateDisplayId(value: string | null | undefined, max = 128) { + if (!value) return null; + return value.length > max ? value.slice(0, max) : value; +} + +const defaultSessionCodec: AdapterSessionCodec = { + deserialize(raw: unknown) { + const asObj = parseObject(raw); + if (Object.keys(asObj).length > 0) return asObj; + const sessionId = readNonEmptyString((raw as Record | null)?.sessionId); + if (sessionId) return { sessionId }; + return null; + }, + serialize(params: Record | null) { + if (!params || Object.keys(params).length === 0) return null; + return params; + }, + getDisplayId(params: Record | null) { + return readNonEmptyString(params?.sessionId); + }, +}; + +function getAdapterSessionCodec(adapterType: string) { + const adapter = getServerAdapter(adapterType); + return adapter.sessionCodec ?? defaultSessionCodec; +} + +function normalizeSessionParams(params: Record | null | undefined) { + if (!params) return null; + return Object.keys(params).length > 0 ? params : null; +} + +function resolveNextSessionState(input: { + codec: AdapterSessionCodec; + adapterResult: AdapterExecutionResult; + previousParams: Record | null; + previousDisplayId: string | null; + previousLegacySessionId: string | null; +}) { + const { codec, adapterResult, previousParams, previousDisplayId, previousLegacySessionId } = input; + + if (adapterResult.clearSession) { + return { + params: null as Record | null, + displayId: null as string | null, + legacySessionId: null as string | null, + }; + } + + const explicitParams = adapterResult.sessionParams; + const hasExplicitParams = adapterResult.sessionParams !== undefined; + const hasExplicitSessionId = adapterResult.sessionId !== undefined; + const explicitSessionId = readNonEmptyString(adapterResult.sessionId); + const hasExplicitDisplay = adapterResult.sessionDisplayId !== undefined; + const explicitDisplayId = readNonEmptyString(adapterResult.sessionDisplayId); + const shouldUsePrevious = !hasExplicitParams && !hasExplicitSessionId && !hasExplicitDisplay; + + const candidateParams = + hasExplicitParams + ? explicitParams + : hasExplicitSessionId + ? (explicitSessionId ? { sessionId: explicitSessionId } : null) + : previousParams; + + const serialized = normalizeSessionParams(codec.serialize(normalizeSessionParams(candidateParams) ?? null)); + const deserialized = normalizeSessionParams(codec.deserialize(serialized)); + + const displayId = truncateDisplayId( + explicitDisplayId ?? + (codec.getDisplayId ? codec.getDisplayId(deserialized) : null) ?? + readNonEmptyString(deserialized?.sessionId) ?? + (shouldUsePrevious ? previousDisplayId : null) ?? + explicitSessionId ?? + (shouldUsePrevious ? previousLegacySessionId : null), + ); + + const legacySessionId = + explicitSessionId ?? + readNonEmptyString(deserialized?.sessionId) ?? + displayId ?? + (shouldUsePrevious ? previousLegacySessionId : null); + + return { + params: serialized, + displayId, + legacySessionId, + }; +} + export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); @@ -65,6 +178,96 @@ export function heartbeatService(db: Db) { .then((rows) => rows[0] ?? null); } + async function getTaskSession( + companyId: string, + agentId: string, + adapterType: string, + taskKey: string, + ) { + return db + .select() + .from(agentTaskSessions) + .where( + and( + eq(agentTaskSessions.companyId, companyId), + eq(agentTaskSessions.agentId, agentId), + eq(agentTaskSessions.adapterType, adapterType), + eq(agentTaskSessions.taskKey, taskKey), + ), + ) + .then((rows) => rows[0] ?? null); + } + + async function upsertTaskSession(input: { + companyId: string; + agentId: string; + adapterType: string; + taskKey: string; + sessionParamsJson: Record | null; + sessionDisplayId: string | null; + lastRunId: string | null; + lastError: string | null; + }) { + const existing = await getTaskSession( + input.companyId, + input.agentId, + input.adapterType, + input.taskKey, + ); + if (existing) { + return db + .update(agentTaskSessions) + .set({ + sessionParamsJson: input.sessionParamsJson, + sessionDisplayId: input.sessionDisplayId, + lastRunId: input.lastRunId, + lastError: input.lastError, + updatedAt: new Date(), + }) + .where(eq(agentTaskSessions.id, existing.id)) + .returning() + .then((rows) => rows[0] ?? null); + } + + return db + .insert(agentTaskSessions) + .values({ + companyId: input.companyId, + agentId: input.agentId, + adapterType: input.adapterType, + taskKey: input.taskKey, + sessionParamsJson: input.sessionParamsJson, + sessionDisplayId: input.sessionDisplayId, + lastRunId: input.lastRunId, + lastError: input.lastError, + }) + .returning() + .then((rows) => rows[0] ?? null); + } + + async function clearTaskSessions( + companyId: string, + agentId: string, + opts?: { taskKey?: string | null; adapterType?: string | null }, + ) { + const conditions = [ + eq(agentTaskSessions.companyId, companyId), + eq(agentTaskSessions.agentId, agentId), + ]; + if (opts?.taskKey) { + conditions.push(eq(agentTaskSessions.taskKey, opts.taskKey)); + } + if (opts?.adapterType) { + conditions.push(eq(agentTaskSessions.adapterType, opts.adapterType)); + } + + return db + .delete(agentTaskSessions) + .where(and(...conditions)) + .returning() + .then((rows) => rows.length); + } + async function ensureRuntimeState(agent: typeof agents.$inferSelect) { const existing = await getRuntimeState(agent.id); if (existing) return existing; @@ -260,6 +463,7 @@ export function heartbeatService(db: Db) { }); } await finalizeAgentStatus(run.agentId, "failed"); + await startNextQueuedRunForAgent(run.agentId); runningProcesses.delete(run.id); reaped.push(run.id); } @@ -274,6 +478,7 @@ export function heartbeatService(db: Db) { agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, result: AdapterExecutionResult, + session: { legacySessionId: string | null }, ) { const existing = await ensureRuntimeState(agent); const usage = result.usage; @@ -286,7 +491,7 @@ export function heartbeatService(db: Db) { .update(agentRuntimeState) .set({ adapterType: agent.adapterType, - sessionId: result.clearSession ? null : (result.sessionId ?? existing.sessionId), + sessionId: session.legacySessionId, lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, @@ -320,6 +525,30 @@ export function heartbeatService(db: Db) { } } + async function startNextQueuedRunForAgent(agentId: string) { + const running = await db + .select({ id: heartbeatRuns.id }) + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))) + .limit(1) + .then((rows) => rows[0] ?? null); + if (running) return null; + + const nextQueued = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) + .orderBy(asc(heartbeatRuns.createdAt)) + .limit(1) + .then((rows) => rows[0] ?? null); + if (!nextQueued) return null; + + void executeRun(nextQueued.id).catch((err) => { + logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed"); + }); + return nextQueued; + } + async function executeRun(runId: string) { const run = await getRun(runId); if (!run) return; @@ -339,7 +568,41 @@ export function heartbeatService(db: Db) { return; } + if (run.status === "queued") { + const activeForAgent = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) + .orderBy(asc(heartbeatRuns.createdAt)); + const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id); + const first = activeForAgent[0] ?? null; + if (runningOther || (first && first.id !== run.id)) { + return; + } + } + const runtime = await ensureRuntimeState(agent); + const context = parseObject(run.contextSnapshot); + const taskKey = deriveTaskKey(context, null); + const sessionCodec = getAdapterSessionCodec(agent.adapterType); + const taskSession = taskKey + ? await getTaskSession(agent.companyId, agent.id, agent.adapterType, taskKey) + : null; + const previousSessionParams = normalizeSessionParams( + sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null), + ); + const previousSessionDisplayId = truncateDisplayId( + taskSession?.sessionDisplayId ?? + (sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ?? + readNonEmptyString(previousSessionParams?.sessionId) ?? + runtime.sessionId, + ); + const runtimeForAdapter = { + sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtime.sessionId, + sessionParams: previousSessionParams, + sessionDisplayId: previousSessionDisplayId, + taskKey, + }; let seq = 1; let handle: RunLogHandle | null = null; @@ -349,7 +612,7 @@ export function heartbeatService(db: Db) { try { await setRunStatus(runId, "running", { startedAt: new Date(), - sessionIdBefore: runtime.sessionId, + sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, }); await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() }); @@ -426,7 +689,6 @@ export function heartbeatService(db: Db) { }; const config = parseObject(agent.adapterConfig); - const context = (run.contextSnapshot ?? {}) as Record; const onAdapterMeta = async (meta: AdapterInvocationMeta) => { await appendRunEvent(currentRun, seq++, { eventType: "adapter.invoke", @@ -455,13 +717,20 @@ export function heartbeatService(db: Db) { const adapterResult = await adapter.execute({ runId: run.id, agent, - runtime, + runtime: runtimeForAdapter, config, context, onLog, onMeta: onAdapterMeta, authToken: authToken ?? undefined, }); + const nextSessionState = resolveNextSessionState({ + codec: sessionCodec, + adapterResult, + previousParams: previousSessionParams, + previousDisplayId: runtimeForAdapter.sessionDisplayId, + previousLegacySessionId: runtimeForAdapter.sessionId, + }); let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; const latestRun = await getRun(run.id); @@ -515,7 +784,7 @@ export function heartbeatService(db: Db) { signal: adapterResult.signal, usageJson, resultJson: adapterResult.resultJson ?? null, - sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId, + sessionIdAfter: nextSessionState.displayId ?? nextSessionState.legacySessionId, stdoutExcerpt, stderrExcerpt, logBytes: logSummary?.bytes, @@ -543,7 +812,28 @@ export function heartbeatService(db: Db) { } if (finalizedRun) { - await updateRuntimeState(agent, finalizedRun, adapterResult); + await updateRuntimeState(agent, finalizedRun, adapterResult, { + legacySessionId: nextSessionState.legacySessionId, + }); + if (taskKey) { + if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) { + await clearTaskSessions(agent.companyId, agent.id, { + taskKey, + adapterType: agent.adapterType, + }); + } else { + await upsertTaskSession({ + companyId: agent.companyId, + agentId: agent.id, + adapterType: agent.adapterType, + taskKey, + sessionParamsJson: nextSessionState.params, + sessionDisplayId: nextSessionState.displayId, + lastRunId: finalizedRun.id, + lastError: outcome === "succeeded" ? null : (adapterResult.errorMessage ?? "run_failed"), + }); + } + } } await finalizeAgentStatus(agent.id, outcome); } catch (err) { @@ -587,10 +877,27 @@ export function heartbeatService(db: Db) { signal: null, timedOut: false, errorMessage: message, + }, { + legacySessionId: runtimeForAdapter.sessionId, }); + + if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) { + await upsertTaskSession({ + companyId: agent.companyId, + agentId: agent.id, + adapterType: agent.adapterType, + taskKey, + sessionParamsJson: previousSessionParams, + sessionDisplayId: previousSessionDisplayId, + lastRunId: failedRun.id, + lastError: message, + }); + } } await finalizeAgentStatus(agent.id, "failed"); + } finally { + await startNextQueuedRunForAgent(agent.id); } } @@ -601,6 +908,7 @@ export function heartbeatService(db: Db) { const reason = opts.reason ?? null; const payload = opts.payload ?? null; const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); + const taskKey = deriveTaskKey(contextSnapshot, payload); if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { contextSnapshot.wakeReason = reason; @@ -611,6 +919,9 @@ export function heartbeatService(db: Db) { if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) { contextSnapshot.taskId = issueIdFromPayload; } + if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) { + contextSnapshot.taskKey = taskKey; + } if (!readNonEmptyString(contextSnapshot["wakeSource"])) { contextSnapshot.wakeSource = source; } @@ -655,14 +966,17 @@ export function heartbeatService(db: Db) { return null; } - const activeRun = await db + const activeRuns = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) - .orderBy(desc(heartbeatRuns.createdAt)) - .then((rows) => rows[0] ?? null); + .orderBy(desc(heartbeatRuns.createdAt)); - if (activeRun) { + const sameScopeRun = activeRuns.find((candidate) => + isSameTaskScope(runTaskKey(candidate), taskKey), + ); + + if (sameScopeRun) { await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, @@ -675,10 +989,10 @@ export function heartbeatService(db: Db) { requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, - runId: activeRun.id, + runId: sameScopeRun.id, finishedAt: new Date(), }); - return activeRun; + return sameScopeRun; } const wakeupRequest = await db @@ -698,7 +1012,27 @@ export function heartbeatService(db: Db) { .returning() .then((rows) => rows[0]); - const runtimeForRun = await getRuntimeState(agent.id); + let sessionBefore: string | null = null; + if (taskKey) { + const codec = getAdapterSessionCodec(agent.adapterType); + const existingTaskSession = await getTaskSession( + agent.companyId, + agent.id, + agent.adapterType, + taskKey, + ); + const parsedParams = normalizeSessionParams( + codec.deserialize(existingTaskSession?.sessionParamsJson ?? null), + ); + sessionBefore = truncateDisplayId( + existingTaskSession?.sessionDisplayId ?? + (codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ?? + readNonEmptyString(parsedParams?.sessionId), + ); + } else { + const runtimeForRun = await getRuntimeState(agent.id); + sessionBefore = runtimeForRun?.sessionId ?? null; + } const newRun = await db .insert(heartbeatRuns) @@ -710,7 +1044,7 @@ export function heartbeatService(db: Db) { status: "queued", wakeupRequestId: wakeupRequest.id, contextSnapshot, - sessionIdBefore: runtimeForRun?.sessionId ?? null, + sessionIdBefore: sessionBefore, }) .returning() .then((rows) => rows[0]); @@ -735,9 +1069,7 @@ export function heartbeatService(db: Db) { }, }); - void executeRun(newRun.id).catch((err) => { - logger.error({ err, runId: newRun.id }, "heartbeat execution failed"); - }); + await startNextQueuedRunForAgent(agent.id); return newRun; } @@ -763,29 +1095,67 @@ export function heartbeatService(db: Db) { getRuntimeState: async (agentId: string) => { const state = await getRuntimeState(agentId); - if (state) return state; - const agent = await getAgent(agentId); if (!agent) return null; - return ensureRuntimeState(agent); + const ensured = state ?? (await ensureRuntimeState(agent)); + const latestTaskSession = await db + .select() + .from(agentTaskSessions) + .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agent.id))) + .orderBy(desc(agentTaskSessions.updatedAt)) + .limit(1) + .then((rows) => rows[0] ?? null); + return { + ...ensured, + sessionDisplayId: latestTaskSession?.sessionDisplayId ?? ensured.sessionId, + sessionParamsJson: latestTaskSession?.sessionParamsJson ?? null, + }; }, - resetRuntimeSession: async (agentId: string) => { + listTaskSessions: async (agentId: string) => { + const agent = await getAgent(agentId); + if (!agent) throw notFound("Agent not found"); + + return db + .select() + .from(agentTaskSessions) + .where(and(eq(agentTaskSessions.companyId, agent.companyId), eq(agentTaskSessions.agentId, agentId))) + .orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt)); + }, + + resetRuntimeSession: async (agentId: string, opts?: { taskKey?: string | null }) => { const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); await ensureRuntimeState(agent); + const taskKey = readNonEmptyString(opts?.taskKey); + const clearedTaskSessions = await clearTaskSessions( + agent.companyId, + agent.id, + taskKey ? { taskKey, adapterType: agent.adapterType } : undefined, + ); + const runtimePatch: Partial = { + sessionId: null, + lastError: null, + updatedAt: new Date(), + }; + if (!taskKey) { + runtimePatch.stateJson = {}; + } - return db + const updated = await db .update(agentRuntimeState) - .set({ - sessionId: null, - stateJson: {}, - lastError: null, - updatedAt: new Date(), - }) + .set(runtimePatch) .where(eq(agentRuntimeState.agentId, agentId)) .returning() .then((rows) => rows[0] ?? null); + + if (!updated) return null; + return { + ...updated, + sessionDisplayId: null, + sessionParamsJson: null, + clearedTaskSessions, + }; }, listEvents: (runId: string, afterSeq = 0, limit = 200) => @@ -909,6 +1279,7 @@ export function heartbeatService(db: Db) { runningProcesses.delete(run.id); await finalizeAgentStatus(run.agentId, "cancelled"); + await startNextQueuedRunForAgent(run.agentId); return cancelled; },