Support concurrent heartbeat runs with maxConcurrentRuns policy
Add per-agent maxConcurrentRuns (1-10) controlling how many runs execute simultaneously. Implements agent-level start lock, optimistic claim-then-execute flow, atomic token accounting via SQL expressions, and proper status resolution when parallel runs finish. Updates UI config form, live run count display, and SSE invalidation to avoid unnecessary refetches on run event streams. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -20,11 +20,37 @@ import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } fr
|
||||
import { secretService } from "./secrets.js";
|
||||
|
||||
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
|
||||
const startLocksByAgent = new Map<string, Promise<void>>();
|
||||
|
||||
function appendExcerpt(prev: string, chunk: string) {
|
||||
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
|
||||
}
|
||||
|
||||
function normalizeMaxConcurrentRuns(value: unknown) {
|
||||
const parsed = Math.floor(asNumber(value, HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT));
|
||||
if (!Number.isFinite(parsed)) return HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT;
|
||||
return Math.max(HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT, Math.min(HEARTBEAT_MAX_CONCURRENT_RUNS_MAX, parsed));
|
||||
}
|
||||
|
||||
async function withAgentStartLock<T>(agentId: string, fn: () => Promise<T>) {
|
||||
const previous = startLocksByAgent.get(agentId) ?? Promise.resolve();
|
||||
const run = previous.then(fn);
|
||||
const marker = run.then(
|
||||
() => undefined,
|
||||
() => undefined,
|
||||
);
|
||||
startLocksByAgent.set(agentId, marker);
|
||||
try {
|
||||
return await run;
|
||||
} finally {
|
||||
if (startLocksByAgent.get(agentId) === marker) {
|
||||
startLocksByAgent.delete(agentId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface WakeupOptions {
|
||||
source?: "timer" | "assignment" | "on_demand" | "automation";
|
||||
triggerDetail?: "manual" | "ping" | "callback" | "system";
|
||||
@@ -410,9 +436,18 @@ export function heartbeatService(db: Db) {
|
||||
enabled: asBoolean(heartbeat.enabled, true),
|
||||
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
|
||||
wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true),
|
||||
maxConcurrentRuns: normalizeMaxConcurrentRuns(heartbeat.maxConcurrentRuns),
|
||||
};
|
||||
}
|
||||
|
||||
async function countRunningRunsForAgent(agentId: string) {
|
||||
const [{ count }] = await db
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running")));
|
||||
return Number(count ?? 0);
|
||||
}
|
||||
|
||||
async function finalizeAgentStatus(
|
||||
agentId: string,
|
||||
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
|
||||
@@ -424,8 +459,13 @@ export function heartbeatService(db: Db) {
|
||||
return;
|
||||
}
|
||||
|
||||
const runningCount = await countRunningRunsForAgent(agentId);
|
||||
const nextStatus =
|
||||
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
|
||||
runningCount > 0
|
||||
? "running"
|
||||
: outcome === "succeeded" || outcome === "cancelled"
|
||||
? "idle"
|
||||
: "error";
|
||||
|
||||
const updated = await db
|
||||
.update(agents)
|
||||
@@ -511,7 +551,7 @@ export function heartbeatService(db: Db) {
|
||||
result: AdapterExecutionResult,
|
||||
session: { legacySessionId: string | null },
|
||||
) {
|
||||
const existing = await ensureRuntimeState(agent);
|
||||
await ensureRuntimeState(agent);
|
||||
const usage = result.usage;
|
||||
const inputTokens = usage?.inputTokens ?? 0;
|
||||
const outputTokens = usage?.outputTokens ?? 0;
|
||||
@@ -526,10 +566,10 @@ export function heartbeatService(db: Db) {
|
||||
lastRunId: run.id,
|
||||
lastRunStatus: run.status,
|
||||
lastError: result.errorMessage ?? null,
|
||||
totalInputTokens: existing.totalInputTokens + inputTokens,
|
||||
totalOutputTokens: existing.totalOutputTokens + outputTokens,
|
||||
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
|
||||
totalCostCents: existing.totalCostCents + additionalCostCents,
|
||||
totalInputTokens: sql`${agentRuntimeState.totalInputTokens} + ${inputTokens}`,
|
||||
totalOutputTokens: sql`${agentRuntimeState.totalOutputTokens} + ${outputTokens}`,
|
||||
totalCachedInputTokens: sql`${agentRuntimeState.totalCachedInputTokens} + ${cachedInputTokens}`,
|
||||
totalCostCents: sql`${agentRuntimeState.totalCostCents} + ${additionalCostCents}`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agentRuntimeState.agentId, agent.id));
|
||||
@@ -557,34 +597,71 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
|
||||
async function startNextQueuedRunForAgent(agentId: string) {
|
||||
const running = await db
|
||||
.select({ id: heartbeatRuns.id })
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running")))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (running) return null;
|
||||
return withAgentStartLock(agentId, async () => {
|
||||
const agent = await getAgent(agentId);
|
||||
if (!agent) return [];
|
||||
const policy = parseHeartbeatPolicy(agent);
|
||||
const runningCount = await countRunningRunsForAgent(agentId);
|
||||
const availableSlots = Math.max(0, policy.maxConcurrentRuns - runningCount);
|
||||
if (availableSlots <= 0) return [];
|
||||
|
||||
const nextQueued = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
|
||||
.orderBy(asc(heartbeatRuns.createdAt))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!nextQueued) return null;
|
||||
const queuedRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "queued")))
|
||||
.orderBy(asc(heartbeatRuns.createdAt))
|
||||
.limit(availableSlots);
|
||||
if (queuedRuns.length === 0) return [];
|
||||
|
||||
void executeRun(nextQueued.id).catch((err) => {
|
||||
logger.error({ err, runId: nextQueued.id }, "queued heartbeat execution failed");
|
||||
for (const queuedRun of queuedRuns) {
|
||||
void executeRun(queuedRun.id).catch((err) => {
|
||||
logger.error({ err, runId: queuedRun.id }, "queued heartbeat execution failed");
|
||||
});
|
||||
}
|
||||
return queuedRuns;
|
||||
});
|
||||
return nextQueued;
|
||||
}
|
||||
|
||||
async function executeRun(runId: string) {
|
||||
const run = await getRun(runId);
|
||||
let run = await getRun(runId);
|
||||
if (!run) return;
|
||||
if (run.status !== "queued" && run.status !== "running") return;
|
||||
|
||||
if (run.status === "queued") {
|
||||
const claimedAt = new Date();
|
||||
const claimed = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "running",
|
||||
startedAt: run.startedAt ?? claimedAt,
|
||||
updatedAt: claimedAt,
|
||||
})
|
||||
.where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued")))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!claimed) {
|
||||
// Another worker has already claimed or finalized this run.
|
||||
return;
|
||||
}
|
||||
run = claimed;
|
||||
publishLiveEvent({
|
||||
companyId: run.companyId,
|
||||
type: "heartbeat.run.status",
|
||||
payload: {
|
||||
runId: run.id,
|
||||
agentId: run.agentId,
|
||||
status: run.status,
|
||||
invocationSource: run.invocationSource,
|
||||
triggerDetail: run.triggerDetail,
|
||||
error: run.error ?? null,
|
||||
errorCode: run.errorCode ?? null,
|
||||
startedAt: run.startedAt ? new Date(run.startedAt).toISOString() : null,
|
||||
finishedAt: run.finishedAt ? new Date(run.finishedAt).toISOString() : null,
|
||||
},
|
||||
});
|
||||
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt });
|
||||
}
|
||||
|
||||
const agent = await getAgent(run.agentId);
|
||||
if (!agent) {
|
||||
await setRunStatus(runId, "failed", {
|
||||
@@ -599,19 +676,6 @@ export function heartbeatService(db: Db) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (run.status === "queued") {
|
||||
const activeForAgent = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(and(eq(heartbeatRuns.agentId, run.agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
|
||||
.orderBy(asc(heartbeatRuns.createdAt));
|
||||
const runningOther = activeForAgent.some((candidate) => candidate.status === "running" && candidate.id !== run.id);
|
||||
const first = activeForAgent[0] ?? null;
|
||||
if (runningOther || (first && first.id !== run.id)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const runtime = await ensureRuntimeState(agent);
|
||||
const context = parseObject(run.contextSnapshot);
|
||||
const taskKey = deriveTaskKey(context, null);
|
||||
@@ -641,11 +705,18 @@ export function heartbeatService(db: Db) {
|
||||
let stderrExcerpt = "";
|
||||
|
||||
try {
|
||||
await setRunStatus(runId, "running", {
|
||||
startedAt: new Date(),
|
||||
sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId,
|
||||
});
|
||||
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
|
||||
const startedAt = run.startedAt ?? new Date();
|
||||
const runningWithSession = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
startedAt,
|
||||
sessionIdBefore: runtimeForAdapter.sessionDisplayId ?? runtimeForAdapter.sessionId,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(heartbeatRuns.id, run.id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (runningWithSession) run = runningWithSession;
|
||||
|
||||
const runningAgent = await db
|
||||
.update(agents)
|
||||
@@ -666,7 +737,7 @@ export function heartbeatService(db: Db) {
|
||||
});
|
||||
}
|
||||
|
||||
const currentRun = (await getRun(runId)) ?? run;
|
||||
const currentRun = run;
|
||||
await appendRunEvent(currentRun, seq++, {
|
||||
eventType: "lifecycle",
|
||||
stream: "system",
|
||||
|
||||
Reference in New Issue
Block a user