diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index e28ca94f..0bf6f5c2 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -20,11 +20,37 @@ import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } fr import { secretService } from "./secrets.js"; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; +const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1; +const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10; +const startLocksByAgent = new Map>(); function appendExcerpt(prev: string, chunk: string) { return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); } +function normalizeMaxConcurrentRuns(value: unknown) { + const parsed = Math.floor(asNumber(value, HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT)); + if (!Number.isFinite(parsed)) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT; + return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed)); +} + +async function withAgentStartLock(agentId: string, fn: () => Promise) { + const previous = startLocksByAgent.get(agentId) ?? Promise.resolve(); + const run = previous.then(fn); + const marker = run.then( + () => undefined, + () => undefined, + ); + startLocksByAgent.set(agentId, marker); + try { + return await run; + } finally { + if (startLocksByAgent.get(agentId) === marker) { + startLocksByAgent.delete(agentId); + } + } +} + interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; @@ -410,9 +436,18 @@ export function heartbeatService(db: Db) { enabled: asBoolean(heartbeat.enabled, true), intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true), + maxConcurrentRuns: normalizeMaxConcurrentRuns(heartbeat.maxConcurrentRuns), }; } + async function countRunningRunsForAgent(agentId: string) { + const [{ count }] = await db + .select({ count: sql`count(*)` }) + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))); + return Number(count ?? 0); + } + async function finalizeAgentStatus( agentId: string, outcome: "succeeded" | "failed" | "cancelled" | "timed_out", @@ -424,8 +459,13 @@ export function heartbeatService(db: Db) { return; } + const runningCount = await countRunningRunsForAgent(agentId); const nextStatus = - outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error"; + runningCount > 0 + ? "running" + : outcome === "succeeded" || outcome === "cancelled" + ? "idle" + : "error"; const updated = await db .update(agents) @@ -511,7 +551,7 @@ export function heartbeatService(db: Db) { result: AdapterExecutionResult, session: { legacySessionId: string | null }, ) { - const existing = await ensureRuntimeState(agent); + await ensureRuntimeState(agent); const usage = result.usage; const inputTokens = usage?.inputTokens ?? 0; const outputTokens = usage?.outputTokens ?? 0; @@ -526,10 +566,10 @@ export function heartbeatService(db: Db) { lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, - totalInputTokens: existing.totalInputTokens + inputTokens, - totalOutputTokens: existing.totalOutputTokens + outputTokens, - totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens, - totalCostCents: existing.totalCostCents + additionalCostCents, + totalInputTokens: sql`${agentRuntimeState.totalInputTokens} + ${inputTokens}`, + totalOutputTokens: sql`${agentRuntimeState.totalOutputTokens} + ${outputTokens}`, + totalCachedInputTokens: sql`${agentRuntimeState.totalCachedInputTokens} + ${cachedInputTokens}`, + totalCostCents: sql`${agentRuntimeState.totalCostCents} + ${additionalCostCents}`, updatedAt: new Date(), }) .where(eq(agentRuntimeState.agentId, agent.id)); @@ -557,34 +597,71 @@ export function heartbeatService(db: Db) { } async function startNextQueuedRunForAgent(agentId: string) { - const running = await db - .select({ id: heartbeatRuns.id }) - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"))) - .limit(1) - .then((rows) => rows[0] ?? null); - if (running) return null; + return withAgentStartLock(agentId, async () => { + const agent = await getAgent(agentId); + if (!agent) return []; + const policy = parseHeartbeatPolicy(agent); + const runningCount = await countRunningRunsForAgent(agentId); + const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount); + if (availableSlots <= 0) return []; - const nextQueued = await db - .select() - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) - .orderBy(asc(heartbeatRuns.createdAt)) - .limit(1) - .then((rows) => rows[0] ?? null); - if (!nextQueued) return null; + const queuedRuns = await db + .select() + .from(heartbeatRuns) + .where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued"))) + .orderBy(asc(heartbeatRuns.createdAt)) + .limit(availableSlots); + if (queuedRuns.length === 0) return []; - void executeRun(nextQueued.id).catch((err) => { - logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed"); + for (const queuedRun of queuedRuns) { + void executeRun(queuedRun.id).catch((err) => { + logger.error({ err, runId: queuedRun.id }, "queued heartbeat execution failed"); + }); + } + return queuedRuns; }); - return nextQueued; } async function executeRun(runId: string) { - const run = await getRun(runId); + let run = await getRun(runId); if (!run) return; if (run.status !== "queued" && run.status !== "running") return; + if (run.status === "queued") { + const claimedAt = new Date(); + const claimed = await db + .update(heartbeatRuns) + .set({ + status: "running", + startedAt: run.startedAt ?? claimedAt, + updatedAt: claimedAt, + }) + .where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued"))) + .returning() + .then((rows) => rows[0] ?? null); + if (!claimed) { + // Another worker has already claimed or finalized this run. + return; + } + run = claimed; + publishLiveEvent({ + companyId: run.companyId, + type: "heartbeat.run.status", + payload: { + runId: run.id, + agentId: run.agentId, + status: run.status, + invocationSource: run.invocationSource, + triggerDetail: run.triggerDetail, + error: run.error ?? null, + errorCode: run.errorCode ?? null, + startedAt: run.startedAt ? new Date(run.startedAt).toISOString() : null, + finishedAt: run.finishedAt ? new Date(run.finishedAt).toISOString() : null, + }, + }); + await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt }); + } + const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { @@ -599,19 +676,6 @@ export function heartbeatService(db: Db) { return; } - if (run.status === "queued") { - const activeForAgent = await db - .select() - .from(heartbeatRuns) - .where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) - .orderBy(asc(heartbeatRuns.createdAt)); - const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id); - const first = activeForAgent[0] ?? null; - if (runningOther || (first && first.id !== run.id)) { - return; - } - } - const runtime = await ensureRuntimeState(agent); const context = parseObject(run.contextSnapshot); const taskKey = deriveTaskKey(context, null); @@ -641,11 +705,18 @@ export function heartbeatService(db: Db) { let stderrExcerpt = ""; try { - await setRunStatus(runId, "running", { - startedAt: new Date(), - sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, - }); - await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() }); + const startedAt = run.startedAt ?? new Date(); + const runningWithSession = await db + .update(heartbeatRuns) + .set({ + startedAt, + sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId, + updatedAt: new Date(), + }) + .where(eq(heartbeatRuns.id, run.id)) + .returning() + .then((rows) => rows[0] ?? null); + if (runningWithSession) run = runningWithSession; const runningAgent = await db .update(agents) @@ -666,7 +737,7 @@ export function heartbeatService(db: Db) { }); } - const currentRun = (await getRun(runId)) ?? run; + const currentRun = run; await appendRunEvent(currentRun, seq++, { eventType: "lifecycle", stream: "system", diff --git a/ui/src/components/ActiveAgentsPanel.tsx b/ui/src/components/ActiveAgentsPanel.tsx index 11a1a9f8..d13ab088 100644 --- a/ui/src/components/ActiveAgentsPanel.tsx +++ b/ui/src/components/ActiveAgentsPanel.tsx @@ -155,7 +155,6 @@ export function ActiveAgentsPanel({ companyId }: ActiveAgentsPanelProps) { const { data: liveRuns } = useQuery({ queryKey: queryKeys.liveRuns(companyId), queryFn: () => heartbeatsApi.liveRunsForCompany(companyId), - refetchInterval: 5000, }); const runs = liveRuns ?? []; diff --git a/ui/src/components/NewAgentDialog.tsx b/ui/src/components/NewAgentDialog.tsx index caae770e..82f7cd9b 100644 --- a/ui/src/components/NewAgentDialog.tsx +++ b/ui/src/components/NewAgentDialog.tsx @@ -112,6 +112,7 @@ export function NewAgentDialog() { intervalSec: configValues.intervalSec, wakeOnDemand: true, cooldownSec: 10, + maxConcurrentRuns: 1, }, }, budgetMonthlyCents: 0, diff --git a/ui/src/components/OnboardingWizard.tsx b/ui/src/components/OnboardingWizard.tsx index bd02396d..e8cbad69 100644 --- a/ui/src/components/OnboardingWizard.tsx +++ b/ui/src/components/OnboardingWizard.tsx @@ -154,6 +154,7 @@ export function OnboardingWizard() { intervalSec: 300, wakeOnDemand: true, cooldownSec: 10, + maxConcurrentRuns: 1, }, }, }); diff --git a/ui/src/components/agent-config-primitives.tsx b/ui/src/components/agent-config-primitives.tsx index e4949ffb..c5e78f3b 100644 --- a/ui/src/components/agent-config-primitives.tsx +++ b/ui/src/components/agent-config-primitives.tsx @@ -36,6 +36,7 @@ export const help: Record = { graceSec: "Seconds to wait after sending interrupt before force-killing the process.", wakeOnDemand: "Allow this agent to be woken by assignments, API calls, UI actions, or automated systems.", cooldownSec: "Minimum seconds between consecutive heartbeat runs.", + maxConcurrentRuns: "Maximum number of heartbeat runs that can execute simultaneously for this agent.", budgetMonthlyCents: "Monthly spending limit in cents. 0 means no limit.", }; diff --git a/ui/src/context/LiveUpdatesProvider.tsx b/ui/src/context/LiveUpdatesProvider.tsx index cd2124b2..18a00757 100644 --- a/ui/src/context/LiveUpdatesProvider.tsx +++ b/ui/src/context/LiveUpdatesProvider.tsx @@ -13,6 +13,7 @@ function invalidateHeartbeatQueries( companyId: string, payload: Record, ) { + queryClient.invalidateQueries({ queryKey: queryKeys.liveRuns(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.heartbeats(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.agents.list(companyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.dashboard(companyId) }); @@ -100,11 +101,15 @@ function handleLiveEvent( return; } - if (event.type === "heartbeat.run.queued" || event.type === "heartbeat.run.status" || event.type === "heartbeat.run.event") { + if (event.type === "heartbeat.run.queued" || event.type === "heartbeat.run.status") { invalidateHeartbeatQueries(queryClient, expectedCompanyId, payload); return; } + if (event.type === "heartbeat.run.event") { + return; + } + if (event.type === "agent.status") { queryClient.invalidateQueries({ queryKey: queryKeys.agents.list(expectedCompanyId) }); queryClient.invalidateQueries({ queryKey: queryKeys.dashboard(expectedCompanyId) }); diff --git a/ui/src/pages/AgentDetail.tsx b/ui/src/pages/AgentDetail.tsx index 73c96e87..711d3157 100644 --- a/ui/src/pages/AgentDetail.tsx +++ b/ui/src/pages/AgentDetail.tsx @@ -510,7 +510,14 @@ export function AgentDetail() { const hb = (agent.runtimeConfig as Record).heartbeat as Record; if (!hb.enabled) return Disabled; const sec = Number(hb.intervalSec) || 300; - return Every {sec >= 60 ? `${Math.round(sec / 60)} min` : `${sec}s`}; + const maxConcurrentRuns = Math.max(1, Math.floor(Number(hb.maxConcurrentRuns) || 1)); + const intervalLabel = sec >= 60 ? `${Math.round(sec / 60)} min` : `${sec}s`; + return ( + + Every {intervalLabel} + {maxConcurrentRuns > 1 ? ` (max ${maxConcurrentRuns} concurrent)` : ""} + + ); })() : Not configured } diff --git a/ui/src/pages/Agents.tsx b/ui/src/pages/Agents.tsx index 9e08dd06..532f2ce4 100644 --- a/ui/src/pages/Agents.tsx +++ b/ui/src/pages/Agents.tsx @@ -90,13 +90,17 @@ export function Agents() { refetchInterval: 15_000, }); - // Map agentId -> first live run (running or queued) + // Map agentId -> first live run + live run count const liveRunByAgent = useMemo(() => { - const map = new Map(); + const map = new Map(); for (const r of runs ?? []) { - if ((r.status === "running" || r.status === "queued") && !map.has(r.agentId)) { - map.set(r.agentId, { runId: r.id }); + if (r.status !== "running" && r.status !== "queued") continue; + const existing = map.get(r.agentId); + if (existing) { + existing.liveCount += 1; + continue; } + map.set(r.agentId, { runId: r.id, liveCount: 1 }); } return map; }, [runs]); @@ -246,6 +250,7 @@ export function Agents() { ) : ( @@ -257,6 +262,7 @@ export function Agents() { )} @@ -319,7 +325,7 @@ function OrgTreeNode({ depth: number; navigate: (path: string) => void; agentMap: Map; - liveRunByAgent: Map; + liveRunByAgent: Map; }) { const agent = agentMap.get(node.id); @@ -358,6 +364,7 @@ function OrgTreeNode({ ) : ( @@ -369,6 +376,7 @@ function OrgTreeNode({ )} @@ -402,10 +410,12 @@ function OrgTreeNode({ function LiveRunIndicator({ agentId, runId, + liveCount, navigate, }: { agentId: string; runId: string; + liveCount: number; navigate: (path: string) => void; }) { return ( @@ -420,7 +430,9 @@ function LiveRunIndicator({ - Live + + Live{liveCount > 1 ? ` (${liveCount})` : ""} + ); }