From b327687c92f8611c7807aa24fa7524c0cf5b7123 Mon Sep 17 00:00:00 2001 From: Forgotten Date: Fri, 20 Feb 2026 10:32:17 -0600 Subject: [PATCH] feat: comment-triggered wakeups, coalescing improvements, and failed run badges Enhance heartbeat wakeup to propagate wakeCommentId, queue follow-up runs for comment wakes on already-running agents, and merge coalesced context snapshots. Add failed run count to sidebar badges and expose usage/result JSON in activity service. Co-Authored-By: Claude Opus 4.6 --- server/src/services/activity.ts | 2 + server/src/services/heartbeat.ts | 70 +++++++++++++++++++++++++-- server/src/services/sidebar-badges.ts | 28 +++++++++-- 3 files changed, 91 insertions(+), 9 deletions(-) diff --git a/server/src/services/activity.ts b/server/src/services/activity.ts index 2d4d7067..1fd6e3ff 100644 --- a/server/src/services/activity.ts +++ b/server/src/services/activity.ts @@ -70,6 +70,8 @@ export function activityService(db: Db) { finishedAt: heartbeatRuns.finishedAt, createdAt: heartbeatRuns.createdAt, invocationSource: heartbeatRuns.invocationSource, + usageJson: heartbeatRuns.usageJson, + resultJson: heartbeatRuns.resultJson, }) .from(activityLog) .innerJoin(heartbeatRuns, eq(activityLog.runId, heartbeatRuns.id)) diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index c8a120d1..e28ca94f 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -55,6 +55,35 @@ function deriveTaskKey( ); } +function deriveCommentId( + contextSnapshot: Record | null | undefined, + payload: Record | null | undefined, +) { + return ( + readNonEmptyString(contextSnapshot?.wakeCommentId) ?? + readNonEmptyString(contextSnapshot?.commentId) ?? + readNonEmptyString(payload?.commentId) ?? + null + ); +} + +function mergeCoalescedContextSnapshot( + existingRaw: unknown, + incoming: Record, +) { + const existing = parseObject(existingRaw); + const merged: Record = { + ...existing, + ...incoming, + }; + const commentId = deriveCommentId(incoming, null); + if (commentId) { + merged.commentId = commentId; + merged.wakeCommentId = commentId; + } + return merged; +} + function runTaskKey(run: typeof heartbeatRuns.$inferSelect) { return deriveTaskKey(run.contextSnapshot as Record | null, null); } @@ -914,7 +943,9 @@ export function heartbeatService(db: Db) { const reason = opts.reason ?? null; const payload = opts.payload ?? null; const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); + const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]); const taskKey = deriveTaskKey(contextSnapshot, payload); + const wakeCommentId = deriveCommentId(contextSnapshot, payload); if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { contextSnapshot.wakeReason = reason; @@ -928,6 +959,12 @@ export function heartbeatService(db: Db) { if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) { contextSnapshot.taskKey = taskKey; } + if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) { + contextSnapshot.commentId = commentIdFromPayload; + } + if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) { + contextSnapshot.wakeCommentId = wakeCommentId; + } if (!readNonEmptyString(contextSnapshot["wakeSource"])) { contextSnapshot.wakeSource = source; } @@ -978,11 +1015,34 @@ export function heartbeatService(db: Db) { .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) .orderBy(desc(heartbeatRuns.createdAt)); - const sameScopeRun = activeRuns.find((candidate) => - isSameTaskScope(runTaskKey(candidate), taskKey), + const sameScopeQueuedRun = activeRuns.find( + (candidate) => candidate.status === "queued" && isSameTaskScope(runTaskKey(candidate), taskKey), ); + const sameScopeRunningRun = activeRuns.find( + (candidate) => candidate.status === "running" && isSameTaskScope(runTaskKey(candidate), taskKey), + ); + const shouldQueueFollowupForCommentWake = + Boolean(wakeCommentId) && Boolean(sameScopeRunningRun) && !sameScopeQueuedRun; + + const coalescedTargetRun = + sameScopeQueuedRun ?? + (shouldQueueFollowupForCommentWake ? null : sameScopeRunningRun ?? null); + + if (coalescedTargetRun) { + const mergedContextSnapshot = mergeCoalescedContextSnapshot( + coalescedTargetRun.contextSnapshot, + contextSnapshot, + ); + const mergedRun = await db + .update(heartbeatRuns) + .set({ + contextSnapshot: mergedContextSnapshot, + updatedAt: new Date(), + }) + .where(eq(heartbeatRuns.id, coalescedTargetRun.id)) + .returning() + .then((rows) => rows[0] ?? coalescedTargetRun); - if (sameScopeRun) { await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, @@ -995,10 +1055,10 @@ export function heartbeatService(db: Db) { requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, - runId: sameScopeRun.id, + runId: mergedRun.id, finishedAt: new Date(), }); - return sameScopeRun; + return mergedRun; } const wakeupRequest = await db diff --git a/server/src/services/sidebar-badges.ts b/server/src/services/sidebar-badges.ts index dc0c9391..ce55764c 100644 --- a/server/src/services/sidebar-badges.ts +++ b/server/src/services/sidebar-badges.ts @@ -1,9 +1,10 @@ -import { and, eq, inArray, sql } from "drizzle-orm"; +import { and, desc, eq, inArray, not, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; -import { approvals } from "@paperclip/db"; +import { agents, approvals, heartbeatRuns } from "@paperclip/db"; import type { SidebarBadges } from "@paperclip/shared"; const ACTIONABLE_APPROVAL_STATUSES = ["pending", "revision_requested"]; +const FAILED_HEARTBEAT_STATUSES = ["failed", "timed_out"]; export function sidebarBadgeService(db: Db) { return { @@ -19,10 +20,29 @@ export function sidebarBadgeService(db: Db) { ) .then((rows) => Number(rows[0]?.count ?? 0)); + const latestRunByAgent = await db + .selectDistinctOn([heartbeatRuns.agentId], { + runStatus: heartbeatRuns.status, + }) + .from(heartbeatRuns) + .innerJoin(agents, eq(heartbeatRuns.agentId, agents.id)) + .where( + and( + eq(heartbeatRuns.companyId, companyId), + eq(agents.companyId, companyId), + not(eq(agents.status, "terminated")), + ), + ) + .orderBy(heartbeatRuns.agentId, desc(heartbeatRuns.createdAt)); + + const failedRuns = latestRunByAgent.filter((row) => + FAILED_HEARTBEAT_STATUSES.includes(row.runStatus), + ).length; + return { - // Inbox currently mirrors actionable approvals; expand as inbox categories grow. - inbox: actionableApprovals, + inbox: actionableApprovals + failedRuns, approvals: actionableApprovals, + failedRuns, }; }, };