From 0131cf34490cabaafeef3d9b1e30529cd3f9d8bb Mon Sep 17 00:00:00 2001 From: Forgotten Date: Fri, 20 Feb 2026 12:50:34 -0600 Subject: [PATCH] Support concurrent heartbeat runs with maxConcurrentRuns policy Add per-agent maxConcurrentRuns (1-10) controlling how many runs execute simultaneously. Implements agent-level start lock, optimistic claim-then-execute flow, atomic token accounting via SQL expressions, and proper status resolution when parallel runs finish. Updates UI config form, live run count display, and SSE invalidation to avoid unnecessary refetches on run event streams. Co-Authored-By: Claude Opus 4.6 --- server/src/services/heartbeat.ts | 159 +++++++++++++----- ui/src/components/ActiveAgentsPanel.tsx | 1 - ui/src/components/NewAgentDialog.tsx | 1 + ui/src/components/OnboardingWizard.tsx | 1 + ui/src/components/agent-config-primitives.tsx | 1 + ui/src/context/LiveUpdatesProvider.tsx | 7 +- ui/src/pages/AgentDetail.tsx | 9 +- ui/src/pages/Agents.tsx | 24 ++- 8 files changed, 150 insertions(+), 53 deletions(-) diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index e28ca94f..0bf6f5c2 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -20,11 +20,37 @@ import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } fr import { secretService } from "./secrets.js"; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; +const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1; +const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10; +const startLocksByAgent = new Map>(); function appendExcerpt(prev: string, chunk: string) { return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); } +function normalizeMaxConcurrentRuns(value: unknown) { + const parsed = Math.floor(asNumber(value, HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT)); + if (!Number.isFinite(parsed)) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT; + return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed)); +} + +async function withAgentStartLock(agentId: string, fn: () => Promise) { + const previous = startLocksByAgent.get(agentId) ?? Promise.resolve(); + const run = previous.then(fn); + const marker = run.then( + () => undefined, + () => undefined, + ); + startLocksByAgent.set(agentId, marker); + try { + return await run; + } finally { + if (startLocksByAgent.get(agentId) === marker) { + startLocksByAgent.delete(agentId); + } + } +} + interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; @@ -410,9 +436,18 @@ export function heartbeatService(db: Db) { enabled: asBoolean(heartbeat.enabled, true), intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true), + maxConcurrentRuns: normalizeMaxConcurrentRuns(heartbeat.maxConcurrentRuns), }; } + async function countRunningRunsForAgent(agentId: string) { + const [{ count }] = await db + .select({ count: sql`count(*)` }) + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))); + return Number(count ?? 0); + } + async function finalizeAgentStatus( agentId: string, outcome: "succeeded" | "failed" | "cancelled" | "timed_out", @@ -424,8 +459,13 @@ export function heartbeatService(db: Db) { return; } + const runningCount = await countRunningRunsForAgent(agentId); const nextStatus = - outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error"; + runningCount > 0 + ? "running" + : outcome === "succeeded" || outcome === "cancelled" + ? "idle" + : "error"; const updated = await db .update(agents) @@ -511,7 +551,7 @@ export function heartbeatService(db: Db) { result: AdapterExecutionResult, session: { legacySessionId: string | null }, ) { - const existing = await ensureRuntimeState(agent); + await ensureRuntimeState(agent); const usage = result.usage; const inputTokens = usage?.inputTokens ?? 0; const outputTokens = usage?.outputTokens ?? 0; @@ -526,10 +566,10 @@ export function heartbeatService(db: Db) { lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, - totalInputTokens: existing.totalInputTokens + inputTokens, - totalOutputTokens: existing.totalOutputTokens + outputTokens, - totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens, - totalCostCents: existing.totalCostCents + additionalCostCents, + totalInputTokens: sql`${agentRuntimeState.totalInputTokens} + ${inputTokens}`, + totalOutputTokens: sql`${agentRuntimeState.totalOutputTokens} + ${outputTokens}`, + totalCachedInputTokens: sql`${agentRuntimeState.totalCachedInputTokens} + ${cachedInputTokens}`, + totalCostCents: sql`${agentRuntimeState.totalCostCents} + ${additionalCostCents}`, updatedAt: new Date(), }) .where(eq(agentRuntimeState.agentId, agent.id)); @@ -557,34 +597,71 @@ export function heartbeatService(db: Db) { } async function startNextQueuedRunForAgent(agentId: string) { - const running = await db - .select({ id: heartbeatRuns.id }) - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))) - .limit(1) - .then((rows) => rows[0] ?? null); - if (running) return null; + return withAgentStartLock(agentId, async () => { + const agent = await getAgent(agentId); + if (!agent) return []; + const policy = parseHeartbeatPolicy(agent); + const runningCount = await countRunningRunsForAgent(agentId); + const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount); + if (availableSlots <= 0) return []; - const nextQueued = await db - .select() - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) - .orderBy(asc(heartbeatRuns.createdAt)) - .limit(1) - .then((rows) => rows[0] ?? null); - if (!nextQueued) return null; + const queuedRuns = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) + .orderBy(asc(heartbeatRuns.createdAt)) + .limit(availableSlots); + if (queuedRuns.length === 0) return []; - void executeRun(nextQueued.id).catch((err) => { - logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed"); + for (const queuedRun of queuedRuns) { + void executeRun(queuedRun.id).catch((err) => { + logger.error({ err, runId: queuedRun.id }, "queued heartbeat execution failed"); + }); + } + return queuedRuns; }); - return nextQueued; } async function executeRun(runId: string) { - const run = await getRun(runId); + let run = await getRun(runId); if (!run) return; if (run.status !== "queued" && run.status !== "running") return; + if (run.status === "queued") { + const claimedAt = new Date(); + const claimed = await db + .update(heartbeatRuns) + .set({ + status: "running", + startedAt: run.startedAt ?? claimedAt, + updatedAt: claimedAt, + }) + .where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued"))) + .returning() + .then((rows) => rows[0] ?? null); + if (!claimed) { + // Another worker has already claimed or finalized this run. + return; + } + run = claimed; + publishLiveEvent({ + companyId: run.companyId, + type: "heartbeat.run.status", + payload: { + runId: run.id, + agentId: run.agentId, + status: run.status, + invocationSource: run.invocationSource, + triggerDetail: run.triggerDetail, + error: run.error ?? null, + errorCode: run.errorCode ?? null, + startedAt: run.startedAt ? new Date(run.startedAt).toISOString() : null, + finishedAt: run.finishedAt ? new Date(run.finishedAt).toISOString() : null, + }, + }); + await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt }); + } + const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { @@ -599,19 +676,6 @@ export function heartbeatService(db: Db) { return; } - if (run.status === "queued") { - const activeForAgent = await db - .select() - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) - .orderBy(asc(heartbeatRuns.createdAt)); - const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id); - const first = activeForAgent[0] ?? null; - if (runningOther || (first && first.id !== run.id)) { - return; - } - } - const runtime = await ensureRuntimeState(agent); const context = parseObject(run.contextSnapshot); const taskKey = deriveTaskKey(context, null); @@ -641,11 +705,18 @@ export function heartbeatService(db: Db) { let stderrExcerpt = ""; try { - await setRunStatus(runId, "running", { - startedAt: new Date(), - sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, - }); - await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() }); + const startedAt = run.startedAt ?? new Date(); + const runningWithSession = await db + .update(heartbeatRuns) + .set({ + startedAt, + sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, + updatedAt: new Date(), + }) + .where(eq(heartbeatRuns.id, run.id)) + .returning() + .then((rows) => rows[0] ?? null); + if (runningWithSession) run = runningWithSession; const runningAgent = await db .update(agents) @@ -666,7 +737,7 @@ export function heartbeatService(db: Db) { }); } - const currentRun = (await getRun(runId)) ?? run; + const currentRun = run; await appendRunEvent(currentRun, seq++, { eventType: "lifecycle", stream: "system", diff --git a/ui/src/components/ActiveAgentsPanel.tsx b/ui/src/components/ActiveAgentsPanel.tsx index 11a1a9f8..d13ab088 100644 --- a/ui/src/components/ActiveAgentsPanel.tsx +++ b/ui/src/components/ActiveAgentsPanel.tsx @@ -155,7 +155,6 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { const { data: liveRuns } = useQuery({ queryKey: queryKeys.liveRuns(companyId), queryFn: () => heartbeatsApi.liveRunsForCompany(companyId), - refetchInterval: 5000, }); const runs = liveRuns ?? []; diff --git a/ui/src/components/NewAgentDialog.tsx b/ui/src/components/NewAgentDialog.tsx index caae770e..82f7cd9b 100644 --- a/ui/src/components/NewAgentDialog.tsx +++ b/ui/src/components/NewAgentDialog.tsx @@ -112,6 +112,7 @@ export function NewAgentDialog() { intervalSec: configValues.intervalSec, wakeOnDemand: true, cooldownSec: 10, + maxConcurrentRuns: 1, }, }, budgetMonthlyCents: 0, diff --git a/ui/src/components/OnboardingWizard.tsx b/ui/src/components/OnboardingWizard.tsx index bd02396d..e8cbad69 100644 --- a/ui/src/components/OnboardingWizard.tsx +++ b/ui/src/components/OnboardingWizard.tsx @@ -154,6 +154,7 @@ export function OnboardingWizard() { intervalSec: 300, wakeOnDemand: true, cooldownSec: 10, + maxConcurrentRuns: 1, }, }, }); diff --git a/ui/src/components/agent-config-primitives.tsx b/ui/src/components/agent-config-primitives.tsx index e4949ffb..c5e78f3b 100644 --- a/ui/src/components/agent-config-primitives.tsx +++ b/ui/src/components/agent-config-primitives.tsx @@ -36,6 +36,7 @@ export const help: Record = { graceSec: "Seconds to wait after sending interrupt before force-killing the process.", wakeOnDemand: "Allow this agent to be woken by assignments, API calls, UI actions, or automated systems.", cooldownSec: "Minimum seconds between consecutive heartbeat runs.", + maxConcurrentRuns: "Maximum number of heartbeat runs that can execute simultaneously for this agent.", budgetMonthlyCents: "Monthly spending limit in cents. 0 means no limit.", }; diff --git a/ui/src/context/LiveUpdatesProvider.tsx b/ui/src/context/LiveUpdatesProvider.tsx index cd2124b2..18a00757 100644 --- a/ui/src/context/LiveUpdatesProvider.tsx +++ b/ui/src/context/LiveUpdatesProvider.tsx @@ -13,6 +13,7 @@ function invalidateHeartbeatQueries( companyId: string, payload: Record, ) { + queryClient.invalidateQueries({ queryKey: queryKeys.liveRuns(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.heartbeats(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.agents.list(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.dashboard(companyId) }); @@ -100,11 +101,15 @@ function handleLiveEvent( return; } - if (event.type === "heartbeat.run.queued" || event.type === "heartbeat.run.status" || event.type === "heartbeat.run.event") { + if (event.type === "heartbeat.run.queued" || event.type === "heartbeat.run.status") { invalidateHeartbeatQueries(queryClient, expectedCompanyId, payload); return; } + if (event.type === "heartbeat.run.event") { + return; + } + if (event.type === "agent.status") { queryClient.invalidateQueries({ queryKey: queryKeys.agents.list(expectedCompanyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.dashboard(expectedCompanyId) }); diff --git a/ui/src/pages/AgentDetail.tsx b/ui/src/pages/AgentDetail.tsx index 73c96e87..711d3157 100644 --- a/ui/src/pages/AgentDetail.tsx +++ b/ui/src/pages/AgentDetail.tsx @@ -510,7 +510,14 @@ export function AgentDetail() { const hb = (agent.runtimeConfig as Record).heartbeat as Record; if (!hb.enabled) return Disabled; const sec = Number(hb.intervalSec) || 300; - return Every {sec >= 60 ? `${Math.round(sec / 60)} min` : `${sec}s`}; + const maxConcurrentRuns = Math.max(1, Math.floor(Number(hb.maxConcurrentRuns) || 1)); + const intervalLabel = sec >= 60 ? `${Math.round(sec / 60)} min` : `${sec}s`; + return ( + + Every {intervalLabel} + {maxConcurrentRuns > 1 ? ` (max ${maxConcurrentRuns} concurrent)` : ""} + + ); })() : Not configured } diff --git a/ui/src/pages/Agents.tsx b/ui/src/pages/Agents.tsx index 9e08dd06..532f2ce4 100644 --- a/ui/src/pages/Agents.tsx +++ b/ui/src/pages/Agents.tsx @@ -90,13 +90,17 @@ export function Agents() { refetchInterval: 15_000, }); - // Map agentId -> first live run (running or queued) + // Map agentId -> first live run + live run count const liveRunByAgent = useMemo(() => { - const map = new Map(); + const map = new Map(); for (const r of runs ?? []) { - if ((r.status === "running" || r.status === "queued") && !map.has(r.agentId)) { - map.set(r.agentId, { runId: r.id }); + if (r.status !== "running" && r.status !== "queued") continue; + const existing = map.get(r.agentId); + if (existing) { + existing.liveCount += 1; + continue; } + map.set(r.agentId, { runId: r.id, liveCount: 1 }); } return map; }, [runs]); @@ -246,6 +250,7 @@ export function Agents() { ) : ( @@ -257,6 +262,7 @@ export function Agents() { )} @@ -319,7 +325,7 @@ function OrgTreeNode({ depth: number; navigate: (path: string) => void; agentMap: Map; - liveRunByAgent: Map; + liveRunByAgent: Map; }) { const agent = agentMap.get(node.id); @@ -358,6 +364,7 @@ function OrgTreeNode({ ) : ( @@ -369,6 +376,7 @@ function OrgTreeNode({ )} @@ -402,10 +410,12 @@ function OrgTreeNode({ function LiveRunIndicator({ agentId, runId, + liveCount, navigate, }: { agentId: string; runId: string; + liveCount: number; navigate: (path: string) => void; }) { return ( @@ -420,7 +430,9 @@ function LiveRunIndicator({ - Live + + Live{liveCount > 1 ? ` (${liveCount})` : ""} + ); }