Implement issue execution lock with deferred wake promotion
Add per-issue execution lock (executionRunId, executionAgentNameKey, executionLockedAt) to prevent concurrent runs on the same issue. Same-name wakes are coalesced into the active run; different-name wakes are deferred and promoted when the lock holder finishes. Includes checkout/release run ownership enforcement, agent run ID propagation from JWT claims, wakeup deduplication across assignee and mention wakes, and claimQueuedRun extraction for reuse. Adds two DB migrations for checkoutRunId and execution lock columns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2
packages/db/src/migrations/0012_perpetual_ser_duncan.sql
Normal file
2
packages/db/src/migrations/0012_perpetual_ser_duncan.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE "issues" ADD COLUMN "checkout_run_id" uuid;--> statement-breakpoint
|
||||
ALTER TABLE "issues" ADD CONSTRAINT "issues_checkout_run_id_heartbeat_runs_id_fk" FOREIGN KEY ("checkout_run_id") REFERENCES "public"."heartbeat_runs"("id") ON DELETE set null ON UPDATE no action;
|
||||
4
packages/db/src/migrations/0013_dashing_wasp.sql
Normal file
4
packages/db/src/migrations/0013_dashing_wasp.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
ALTER TABLE "issues" ADD COLUMN "execution_run_id" uuid;--> statement-breakpoint
|
||||
ALTER TABLE "issues" ADD COLUMN "execution_agent_name_key" text;--> statement-breakpoint
|
||||
ALTER TABLE "issues" ADD COLUMN "execution_locked_at" timestamp with time zone;--> statement-breakpoint
|
||||
ALTER TABLE "issues" ADD CONSTRAINT "issues_execution_run_id_heartbeat_runs_id_fk" FOREIGN KEY ("execution_run_id") REFERENCES "public"."heartbeat_runs"("id") ON DELETE set null ON UPDATE no action;
|
||||
4087
packages/db/src/migrations/meta/0012_snapshot.json
Normal file
4087
packages/db/src/migrations/meta/0012_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
4118
packages/db/src/migrations/meta/0013_snapshot.json
Normal file
4118
packages/db/src/migrations/meta/0013_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -85,6 +85,20 @@
|
||||
"when": 1771616419708,
|
||||
"tag": "0011_windy_corsair",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 12,
|
||||
"version": "7",
|
||||
"when": 1771619674673,
|
||||
"tag": "0012_perpetual_ser_duncan",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 13,
|
||||
"version": "7",
|
||||
"when": 1771623691139,
|
||||
"tag": "0013_dashing_wasp",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import { agents } from "./agents.js";
|
||||
import { projects } from "./projects.js";
|
||||
import { goals } from "./goals.js";
|
||||
import { companies } from "./companies.js";
|
||||
import { heartbeatRuns } from "./heartbeat_runs.js";
|
||||
|
||||
export const issues = pgTable(
|
||||
"issues",
|
||||
@@ -26,6 +27,10 @@ export const issues = pgTable(
|
||||
status: text("status").notNull().default("backlog"),
|
||||
priority: text("priority").notNull().default("medium"),
|
||||
assigneeAgentId: uuid("assignee_agent_id").references(() => agents.id),
|
||||
checkoutRunId: uuid("checkout_run_id").references(() => heartbeatRuns.id, { onDelete: "set null" }),
|
||||
executionRunId: uuid("execution_run_id").references(() => heartbeatRuns.id, { onDelete: "set null" }),
|
||||
executionAgentNameKey: text("execution_agent_name_key"),
|
||||
executionLockedAt: timestamp("execution_locked_at", { withTimezone: true }),
|
||||
createdByAgentId: uuid("created_by_agent_id").references(() => agents.id),
|
||||
createdByUserId: text("created_by_user_id"),
|
||||
issueNumber: integer("issue_number"),
|
||||
|
||||
@@ -95,6 +95,7 @@ export type WakeupTriggerDetail = (typeof WAKEUP_TRIGGER_DETAILS)[number];
|
||||
|
||||
export const WAKEUP_REQUEST_STATUSES = [
|
||||
"queued",
|
||||
"deferred_issue_execution",
|
||||
"claimed",
|
||||
"coalesced",
|
||||
"skipped",
|
||||
|
||||
@@ -41,6 +41,10 @@ export interface Issue {
|
||||
status: IssueStatus;
|
||||
priority: IssuePriority;
|
||||
assigneeAgentId: string | null;
|
||||
checkoutRunId: string | null;
|
||||
executionRunId: string | null;
|
||||
executionAgentNameKey: string | null;
|
||||
executionLockedAt: Date | null;
|
||||
createdByAgentId: string | null;
|
||||
createdByUserId: string | null;
|
||||
issueNumber: number | null;
|
||||
|
||||
@@ -63,7 +63,7 @@ export function actorMiddleware(db: Db): RequestHandler {
|
||||
agentId: claims.sub,
|
||||
companyId: claims.company_id,
|
||||
keyId: undefined,
|
||||
runId: runIdHeader || undefined,
|
||||
runId: runIdHeader || claims.run_id || undefined,
|
||||
};
|
||||
next();
|
||||
return;
|
||||
|
||||
@@ -1063,23 +1063,25 @@ export function agentRoutes(db: Db) {
|
||||
}
|
||||
assertCompanyAccess(req, issue.companyId);
|
||||
|
||||
if (!issue.assigneeAgentId || issue.status !== "in_progress") {
|
||||
res.json(null);
|
||||
return;
|
||||
let run = issue.executionRunId ? await heartbeat.getRun(issue.executionRunId) : null;
|
||||
if (run && run.status !== "queued" && run.status !== "running") {
|
||||
run = null;
|
||||
}
|
||||
|
||||
const agent = await svc.getById(issue.assigneeAgentId);
|
||||
if (!agent) {
|
||||
res.json(null);
|
||||
return;
|
||||
if (!run && issue.assigneeAgentId && issue.status === "in_progress") {
|
||||
run = await heartbeat.getActiveRunForAgent(issue.assigneeAgentId);
|
||||
}
|
||||
|
||||
const run = await heartbeat.getActiveRunForAgent(issue.assigneeAgentId);
|
||||
if (!run) {
|
||||
res.json(null);
|
||||
return;
|
||||
}
|
||||
|
||||
const agent = await svc.getById(run.agentId);
|
||||
if (!agent) {
|
||||
res.json(null);
|
||||
return;
|
||||
}
|
||||
|
||||
res.json({
|
||||
...run,
|
||||
agentId: agent.id,
|
||||
|
||||
@@ -78,6 +78,34 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
return false;
|
||||
}
|
||||
|
||||
function requireAgentRunId(req: Request, res: Response) {
|
||||
if (req.actor.type !== "agent") return null;
|
||||
const runId = req.actor.runId?.trim();
|
||||
if (runId) return runId;
|
||||
res.status(401).json({ error: "Agent run id required" });
|
||||
return null;
|
||||
}
|
||||
|
||||
async function assertAgentRunCheckoutOwnership(
|
||||
req: Request,
|
||||
res: Response,
|
||||
issue: { id: string; status: string; assigneeAgentId: string | null },
|
||||
) {
|
||||
if (req.actor.type !== "agent") return true;
|
||||
const actorAgentId = req.actor.agentId;
|
||||
if (!actorAgentId) {
|
||||
res.status(403).json({ error: "Agent authentication required" });
|
||||
return false;
|
||||
}
|
||||
if (issue.status !== "in_progress" || issue.assigneeAgentId !== actorAgentId) {
|
||||
return true;
|
||||
}
|
||||
const runId = requireAgentRunId(req, res);
|
||||
if (!runId) return false;
|
||||
await svc.assertCheckoutOwner(issue.id, actorAgentId, runId);
|
||||
return true;
|
||||
}
|
||||
|
||||
router.get("/companies/:companyId/issues", async (req, res) => {
|
||||
const companyId = req.params.companyId as string;
|
||||
assertCompanyAccess(req, companyId);
|
||||
@@ -225,6 +253,7 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
return;
|
||||
}
|
||||
assertCompanyAccess(req, existing.companyId);
|
||||
if (!(await assertAgentRunCheckoutOwnership(req, res, existing))) return;
|
||||
|
||||
const { comment: commentBody, hiddenAt: hiddenAtRaw, ...updateFields } = req.body;
|
||||
if (hiddenAtRaw !== undefined) {
|
||||
@@ -276,35 +305,17 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
details: { commentId: comment.id },
|
||||
});
|
||||
|
||||
// @-mention wakeups
|
||||
svc.findMentionedAgents(issue.companyId, commentBody).then((ids) => {
|
||||
for (const mentionedId of ids) {
|
||||
heartbeat.wakeup(mentionedId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_comment_mentioned",
|
||||
payload: { issueId: id, commentId: comment!.id },
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: id,
|
||||
taskId: id,
|
||||
commentId: comment!.id,
|
||||
wakeCommentId: comment!.id,
|
||||
wakeReason: "issue_comment_mentioned",
|
||||
source: "comment.mention",
|
||||
},
|
||||
}).catch((err) => logger.warn({ err, agentId: mentionedId }, "failed to wake mentioned agent"));
|
||||
}
|
||||
}).catch((err) => logger.warn({ err, issueId: id }, "failed to resolve @-mentions"));
|
||||
}
|
||||
|
||||
const assigneeChanged =
|
||||
req.body.assigneeAgentId !== undefined && req.body.assigneeAgentId !== existing.assigneeAgentId;
|
||||
|
||||
if (assigneeChanged && issue.assigneeAgentId) {
|
||||
void heartbeat
|
||||
.wakeup(issue.assigneeAgentId, {
|
||||
// Merge all wakeups from this update into one enqueue per agent to avoid duplicate runs.
|
||||
void (async () => {
|
||||
const wakeups = new Map<string, Parameters<typeof heartbeat.wakeup>[1]>();
|
||||
|
||||
if (assigneeChanged && issue.assigneeAgentId) {
|
||||
wakeups.set(issue.assigneeAgentId, {
|
||||
source: "assignment",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_assigned",
|
||||
@@ -312,9 +323,44 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: { issueId: issue.id, source: "issue.update" },
|
||||
})
|
||||
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue update"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (commentBody && comment) {
|
||||
let mentionedIds: string[] = [];
|
||||
try {
|
||||
mentionedIds = await svc.findMentionedAgents(issue.companyId, commentBody);
|
||||
} catch (err) {
|
||||
logger.warn({ err, issueId: id }, "failed to resolve @-mentions");
|
||||
}
|
||||
|
||||
for (const mentionedId of mentionedIds) {
|
||||
if (wakeups.has(mentionedId)) continue;
|
||||
wakeups.set(mentionedId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_comment_mentioned",
|
||||
payload: { issueId: id, commentId: comment.id },
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: id,
|
||||
taskId: id,
|
||||
commentId: comment.id,
|
||||
wakeCommentId: comment.id,
|
||||
wakeReason: "issue_comment_mentioned",
|
||||
source: "comment.mention",
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (const [agentId, wakeup] of wakeups.entries()) {
|
||||
heartbeat
|
||||
.wakeup(agentId, wakeup)
|
||||
.catch((err) => logger.warn({ err, issueId: issue.id, agentId }, "failed to wake agent on issue update"));
|
||||
}
|
||||
})();
|
||||
|
||||
res.json({ ...issue, comment });
|
||||
});
|
||||
@@ -372,7 +418,9 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
return;
|
||||
}
|
||||
|
||||
const updated = await svc.checkout(id, req.body.agentId, req.body.expectedStatuses);
|
||||
const checkoutRunId = requireAgentRunId(req, res);
|
||||
if (req.actor.type === "agent" && !checkoutRunId) return;
|
||||
const updated = await svc.checkout(id, req.body.agentId, req.body.expectedStatuses, checkoutRunId);
|
||||
const actor = getActorInfo(req);
|
||||
|
||||
await logActivity(db, {
|
||||
@@ -410,8 +458,14 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
return;
|
||||
}
|
||||
assertCompanyAccess(req, existing.companyId);
|
||||
const actorRunId = requireAgentRunId(req, res);
|
||||
if (req.actor.type === "agent" && !actorRunId) return;
|
||||
|
||||
const released = await svc.release(id, req.actor.type === "agent" ? req.actor.agentId : undefined);
|
||||
const released = await svc.release(
|
||||
id,
|
||||
req.actor.type === "agent" ? req.actor.agentId : undefined,
|
||||
actorRunId,
|
||||
);
|
||||
if (!released) {
|
||||
res.status(404).json({ error: "Issue not found" });
|
||||
return;
|
||||
@@ -452,6 +506,7 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
return;
|
||||
}
|
||||
assertCompanyAccess(req, issue.companyId);
|
||||
if (!(await assertAgentRunCheckoutOwnership(req, res, issue))) return;
|
||||
|
||||
const actor = getActorInfo(req);
|
||||
const reopenRequested = req.body.reopen === true;
|
||||
@@ -505,10 +560,66 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
details: { commentId: comment.id },
|
||||
});
|
||||
|
||||
// @-mention wakeups
|
||||
svc.findMentionedAgents(issue.companyId, req.body.body).then((ids) => {
|
||||
for (const mentionedId of ids) {
|
||||
heartbeat.wakeup(mentionedId, {
|
||||
// Merge all wakeups from this comment into one enqueue per agent to avoid duplicate runs.
|
||||
void (async () => {
|
||||
const wakeups = new Map<string, Parameters<typeof heartbeat.wakeup>[1]>();
|
||||
const assigneeId = currentIssue.assigneeAgentId;
|
||||
if (assigneeId) {
|
||||
if (reopened) {
|
||||
wakeups.set(assigneeId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_reopened_via_comment",
|
||||
payload: {
|
||||
issueId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
reopenedFrom: reopenFromStatus,
|
||||
mutation: "comment",
|
||||
},
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: currentIssue.id,
|
||||
taskId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
source: "issue.comment.reopen",
|
||||
wakeReason: "issue_reopened_via_comment",
|
||||
reopenedFrom: reopenFromStatus,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
wakeups.set(assigneeId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_commented",
|
||||
payload: {
|
||||
issueId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
mutation: "comment",
|
||||
},
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: currentIssue.id,
|
||||
taskId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
source: "issue.comment",
|
||||
wakeReason: "issue_commented",
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mentionedIds: string[] = [];
|
||||
try {
|
||||
mentionedIds = await svc.findMentionedAgents(issue.companyId, req.body.body);
|
||||
} catch (err) {
|
||||
logger.warn({ err, issueId: id }, "failed to resolve @-mentions");
|
||||
}
|
||||
|
||||
for (const mentionedId of mentionedIds) {
|
||||
if (wakeups.has(mentionedId)) continue;
|
||||
wakeups.set(mentionedId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_comment_mentioned",
|
||||
@@ -523,57 +634,15 @@ export function issueRoutes(db: Db, storage: StorageService) {
|
||||
wakeReason: "issue_comment_mentioned",
|
||||
source: "comment.mention",
|
||||
},
|
||||
}).catch((err) => logger.warn({ err, agentId: mentionedId }, "failed to wake mentioned agent"));
|
||||
});
|
||||
}
|
||||
}).catch((err) => logger.warn({ err, issueId: id }, "failed to resolve @-mentions"));
|
||||
|
||||
if (reopened && currentIssue.assigneeAgentId) {
|
||||
void heartbeat
|
||||
.wakeup(currentIssue.assigneeAgentId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_reopened_via_comment",
|
||||
payload: {
|
||||
issueId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
reopenedFrom: reopenFromStatus,
|
||||
mutation: "comment",
|
||||
},
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: currentIssue.id,
|
||||
taskId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
source: "issue.comment.reopen",
|
||||
wakeReason: "issue_reopened_via_comment",
|
||||
reopenedFrom: reopenFromStatus,
|
||||
},
|
||||
})
|
||||
.catch((err) => logger.warn({ err, issueId: currentIssue.id }, "failed to wake assignee on issue reopen comment"));
|
||||
} else if (currentIssue.assigneeAgentId) {
|
||||
void heartbeat
|
||||
.wakeup(currentIssue.assigneeAgentId, {
|
||||
source: "automation",
|
||||
triggerDetail: "system",
|
||||
reason: "issue_commented",
|
||||
payload: {
|
||||
issueId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
mutation: "comment",
|
||||
},
|
||||
requestedByActorType: actor.actorType,
|
||||
requestedByActorId: actor.actorId,
|
||||
contextSnapshot: {
|
||||
issueId: currentIssue.id,
|
||||
taskId: currentIssue.id,
|
||||
commentId: comment.id,
|
||||
source: "issue.comment",
|
||||
wakeReason: "issue_commented",
|
||||
},
|
||||
})
|
||||
.catch((err) => logger.warn({ err, issueId: currentIssue.id }, "failed to wake assignee on issue comment"));
|
||||
}
|
||||
for (const [agentId, wakeup] of wakeups.entries()) {
|
||||
heartbeat
|
||||
.wakeup(agentId, wakeup)
|
||||
.catch((err) => logger.warn({ err, issueId: currentIssue.id, agentId }, "failed to wake agent on issue comment"));
|
||||
}
|
||||
})();
|
||||
|
||||
res.status(201).json(comment);
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
heartbeatRunEvents,
|
||||
heartbeatRuns,
|
||||
costEvents,
|
||||
issues,
|
||||
} from "@paperclip/db";
|
||||
import { conflict, notFound } from "../errors.js";
|
||||
import { logger } from "../middleware/logger.js";
|
||||
@@ -22,6 +23,7 @@ import { secretService } from "./secrets.js";
|
||||
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_DEFAULT = 1;
|
||||
const HEARTBEAT_MAX_CONCURRENT_RUNS_MAX = 10;
|
||||
const DEFERRED_WAKE_CONTEXT_KEY = "_paperclipWakeContext";
|
||||
const startLocksByAgent = new Map<string, Promise<void>>();
|
||||
|
||||
function appendExcerpt(prev: string, chunk: string) {
|
||||
@@ -93,6 +95,53 @@ function deriveCommentId(
|
||||
);
|
||||
}
|
||||
|
||||
function enrichWakeContextSnapshot(input: {
|
||||
contextSnapshot: Record<string, unknown>;
|
||||
reason: string | null;
|
||||
source: WakeupOptions["source"];
|
||||
triggerDetail: WakeupOptions["triggerDetail"] | null;
|
||||
payload: Record<string, unknown> | null;
|
||||
}) {
|
||||
const { contextSnapshot, reason, source, triggerDetail, payload } = input;
|
||||
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
|
||||
const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]);
|
||||
const taskKey = deriveTaskKey(contextSnapshot, payload);
|
||||
const wakeCommentId = deriveCommentId(contextSnapshot, payload);
|
||||
|
||||
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
|
||||
contextSnapshot.wakeReason = reason;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) {
|
||||
contextSnapshot.issueId = issueIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
|
||||
contextSnapshot.taskId = issueIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) {
|
||||
contextSnapshot.taskKey = taskKey;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) {
|
||||
contextSnapshot.commentId = commentIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) {
|
||||
contextSnapshot.wakeCommentId = wakeCommentId;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeSource"]) && source) {
|
||||
contextSnapshot.wakeSource = source;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) {
|
||||
contextSnapshot.wakeTriggerDetail = triggerDetail;
|
||||
}
|
||||
|
||||
return {
|
||||
contextSnapshot,
|
||||
issueIdFromPayload,
|
||||
commentIdFromPayload,
|
||||
taskKey,
|
||||
wakeCommentId,
|
||||
};
|
||||
}
|
||||
|
||||
function mergeCoalescedContextSnapshot(
|
||||
existingRaw: unknown,
|
||||
incoming: Record<string, unknown>,
|
||||
@@ -123,6 +172,12 @@ function truncateDisplayId(value: string | null | undefined, max = 128) {
|
||||
return value.length > max ? value.slice(0, max) : value;
|
||||
}
|
||||
|
||||
function normalizeAgentNameKey(value: string | null | undefined) {
|
||||
if (typeof value !== "string") return null;
|
||||
const normalized = value.trim().toLowerCase();
|
||||
return normalized.length > 0 ? normalized : null;
|
||||
}
|
||||
|
||||
const defaultSessionCodec: AdapterSessionCodec = {
|
||||
deserialize(raw: unknown) {
|
||||
const asObj = parseObject(raw);
|
||||
@@ -255,6 +310,32 @@ export function heartbeatService(db: Db) {
|
||||
.then((rows) => rows[0] ?? null);
|
||||
}
|
||||
|
||||
async function resolveSessionBeforeForWakeup(
|
||||
agent: typeof agents.$inferSelect,
|
||||
taskKey: string | null,
|
||||
) {
|
||||
if (taskKey) {
|
||||
const codec = getAdapterSessionCodec(agent.adapterType);
|
||||
const existingTaskSession = await getTaskSession(
|
||||
agent.companyId,
|
||||
agent.id,
|
||||
agent.adapterType,
|
||||
taskKey,
|
||||
);
|
||||
const parsedParams = normalizeSessionParams(
|
||||
codec.deserialize(existingTaskSession?.sessionParamsJson ?? null),
|
||||
);
|
||||
return truncateDisplayId(
|
||||
existingTaskSession?.sessionDisplayId ??
|
||||
(codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ??
|
||||
readNonEmptyString(parsedParams?.sessionId),
|
||||
);
|
||||
}
|
||||
|
||||
const runtimeForRun = await getRuntimeState(agent.id);
|
||||
return runtimeForRun?.sessionId ?? null;
|
||||
}
|
||||
|
||||
async function upsertTaskSession(input: {
|
||||
companyId: string;
|
||||
agentId: string;
|
||||
@@ -448,6 +529,41 @@ export function heartbeatService(db: Db) {
|
||||
return Number(count ?? 0);
|
||||
}
|
||||
|
||||
async function claimQueuedRun(run: typeof heartbeatRuns.$inferSelect) {
|
||||
if (run.status !== "queued") return run;
|
||||
const claimedAt = new Date();
|
||||
const claimed = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "running",
|
||||
startedAt: run.startedAt ?? claimedAt,
|
||||
updatedAt: claimedAt,
|
||||
})
|
||||
.where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued")))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (!claimed) return null;
|
||||
|
||||
publishLiveEvent({
|
||||
companyId: claimed.companyId,
|
||||
type: "heartbeat.run.status",
|
||||
payload: {
|
||||
runId: claimed.id,
|
||||
agentId: claimed.agentId,
|
||||
status: claimed.status,
|
||||
invocationSource: claimed.invocationSource,
|
||||
triggerDetail: claimed.triggerDetail,
|
||||
error: claimed.error ?? null,
|
||||
errorCode: claimed.errorCode ?? null,
|
||||
startedAt: claimed.startedAt ? new Date(claimed.startedAt).toISOString() : null,
|
||||
finishedAt: claimed.finishedAt ? new Date(claimed.finishedAt).toISOString() : null,
|
||||
},
|
||||
});
|
||||
|
||||
await setWakeupStatus(claimed.wakeupRequestId, "claimed", { claimedAt });
|
||||
return claimed;
|
||||
}
|
||||
|
||||
async function finalizeAgentStatus(
|
||||
agentId: string,
|
||||
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
|
||||
@@ -532,6 +648,7 @@ export function heartbeatService(db: Db) {
|
||||
level: "error",
|
||||
message: "Process lost -- server may have restarted",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(updatedRun);
|
||||
}
|
||||
await finalizeAgentStatus(run.agentId, "failed");
|
||||
await startNextQueuedRunForAgent(run.agentId);
|
||||
@@ -613,12 +730,19 @@ export function heartbeatService(db: Db) {
|
||||
.limit(availableSlots);
|
||||
if (queuedRuns.length === 0) return [];
|
||||
|
||||
const claimedRuns: Array<typeof heartbeatRuns.$inferSelect> = [];
|
||||
for (const queuedRun of queuedRuns) {
|
||||
void executeRun(queuedRun.id).catch((err) => {
|
||||
logger.error({ err, runId: queuedRun.id }, "queued heartbeat execution failed");
|
||||
const claimed = await claimQueuedRun(queuedRun);
|
||||
if (claimed) claimedRuns.push(claimed);
|
||||
}
|
||||
if (claimedRuns.length === 0) return [];
|
||||
|
||||
for (const claimedRun of claimedRuns) {
|
||||
void executeRun(claimedRun.id).catch((err) => {
|
||||
logger.error({ err, runId: claimedRun.id }, "queued heartbeat execution failed");
|
||||
});
|
||||
}
|
||||
return queuedRuns;
|
||||
return claimedRuns;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -628,38 +752,12 @@ export function heartbeatService(db: Db) {
|
||||
if (run.status !== "queued" && run.status !== "running") return;
|
||||
|
||||
if (run.status === "queued") {
|
||||
const claimedAt = new Date();
|
||||
const claimed = await db
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
status: "running",
|
||||
startedAt: run.startedAt ?? claimedAt,
|
||||
updatedAt: claimedAt,
|
||||
})
|
||||
.where(and(eq(heartbeatRuns.id, run.id), eq(heartbeatRuns.status, "queued")))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
const claimed = await claimQueuedRun(run);
|
||||
if (!claimed) {
|
||||
// Another worker has already claimed or finalized this run.
|
||||
return;
|
||||
}
|
||||
run = claimed;
|
||||
publishLiveEvent({
|
||||
companyId: run.companyId,
|
||||
type: "heartbeat.run.status",
|
||||
payload: {
|
||||
runId: run.id,
|
||||
agentId: run.agentId,
|
||||
status: run.status,
|
||||
invocationSource: run.invocationSource,
|
||||
triggerDetail: run.triggerDetail,
|
||||
error: run.error ?? null,
|
||||
errorCode: run.errorCode ?? null,
|
||||
startedAt: run.startedAt ? new Date(run.startedAt).toISOString() : null,
|
||||
finishedAt: run.finishedAt ? new Date(run.finishedAt).toISOString() : null,
|
||||
},
|
||||
});
|
||||
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt });
|
||||
}
|
||||
|
||||
const agent = await getAgent(run.agentId);
|
||||
@@ -673,6 +771,8 @@ export function heartbeatService(db: Db) {
|
||||
finishedAt: new Date(),
|
||||
error: "Agent not found",
|
||||
});
|
||||
const failedRun = await getRun(runId);
|
||||
if (failedRun) await releaseIssueExecutionAndPromote(failedRun);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -686,14 +786,15 @@ export function heartbeatService(db: Db) {
|
||||
const previousSessionParams = normalizeSessionParams(
|
||||
sessionCodec.deserialize(taskSession?.sessionParamsJson ?? null),
|
||||
);
|
||||
const runtimeSessionFallback = taskKey ? null : runtime.sessionId;
|
||||
const previousSessionDisplayId = truncateDisplayId(
|
||||
taskSession?.sessionDisplayId ??
|
||||
(sessionCodec.getDisplayId ? sessionCodec.getDisplayId(previousSessionParams) : null) ??
|
||||
readNonEmptyString(previousSessionParams?.sessionId) ??
|
||||
runtime.sessionId,
|
||||
runtimeSessionFallback,
|
||||
);
|
||||
const runtimeForAdapter = {
|
||||
sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtime.sessionId,
|
||||
sessionId: readNonEmptyString(previousSessionParams?.sessionId) ?? runtimeSessionFallback,
|
||||
sessionParams: previousSessionParams,
|
||||
sessionDisplayId: previousSessionDisplayId,
|
||||
taskKey,
|
||||
@@ -915,6 +1016,7 @@ export function heartbeatService(db: Db) {
|
||||
exitCode: adapterResult.exitCode,
|
||||
},
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(finalizedRun);
|
||||
}
|
||||
|
||||
if (finalizedRun) {
|
||||
@@ -977,6 +1079,7 @@ export function heartbeatService(db: Db) {
|
||||
level: "error",
|
||||
message,
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(failedRun);
|
||||
|
||||
await updateRuntimeState(agent, failedRun, {
|
||||
exitCode: null,
|
||||
@@ -1007,41 +1110,177 @@ export function heartbeatService(db: Db) {
|
||||
}
|
||||
}
|
||||
|
||||
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
||||
const promotedRun = await db.transaction(async (tx) => {
|
||||
await tx.execute(
|
||||
sql`select id from issues where company_id = ${run.companyId} and execution_run_id = ${run.id} for update`,
|
||||
);
|
||||
|
||||
const issue = await tx
|
||||
.select({
|
||||
id: issues.id,
|
||||
companyId: issues.companyId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(and(eq(issues.companyId, run.companyId), eq(issues.executionRunId, run.id)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!issue) return;
|
||||
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: null,
|
||||
executionAgentNameKey: null,
|
||||
executionLockedAt: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
|
||||
while (true) {
|
||||
const deferred = await tx
|
||||
.select()
|
||||
.from(agentWakeupRequests)
|
||||
.where(
|
||||
and(
|
||||
eq(agentWakeupRequests.companyId, issue.companyId),
|
||||
eq(agentWakeupRequests.status, "deferred_issue_execution"),
|
||||
sql`${agentWakeupRequests.payload} ->> 'issueId' = ${issue.id}`,
|
||||
),
|
||||
)
|
||||
.orderBy(asc(agentWakeupRequests.requestedAt))
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!deferred) return null;
|
||||
|
||||
const deferredAgent = await tx
|
||||
.select()
|
||||
.from(agents)
|
||||
.where(eq(agents.id, deferred.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (
|
||||
!deferredAgent ||
|
||||
deferredAgent.companyId !== issue.companyId ||
|
||||
deferredAgent.status === "paused" ||
|
||||
deferredAgent.status === "terminated" ||
|
||||
deferredAgent.status === "pending_approval"
|
||||
) {
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "failed",
|
||||
finishedAt: new Date(),
|
||||
error: "Deferred wake could not be promoted: agent is not invokable",
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, deferred.id));
|
||||
continue;
|
||||
}
|
||||
|
||||
const deferredPayload = parseObject(deferred.payload);
|
||||
const deferredContextSeed = parseObject(deferredPayload[DEFERRED_WAKE_CONTEXT_KEY]);
|
||||
const promotedContextSeed: Record<string, unknown> = { ...deferredContextSeed };
|
||||
const promotedReason = readNonEmptyString(deferred.reason) ?? "issue_execution_promoted";
|
||||
const promotedSource =
|
||||
(readNonEmptyString(deferred.source) as WakeupOptions["source"]) ?? "automation";
|
||||
const promotedTriggerDetail =
|
||||
(readNonEmptyString(deferred.triggerDetail) as WakeupOptions["triggerDetail"]) ?? null;
|
||||
const promotedPayload = deferredPayload;
|
||||
delete promotedPayload[DEFERRED_WAKE_CONTEXT_KEY];
|
||||
|
||||
const {
|
||||
contextSnapshot: promotedContextSnapshot,
|
||||
taskKey: promotedTaskKey,
|
||||
} = enrichWakeContextSnapshot({
|
||||
contextSnapshot: promotedContextSeed,
|
||||
reason: promotedReason,
|
||||
source: promotedSource,
|
||||
triggerDetail: promotedTriggerDetail,
|
||||
payload: promotedPayload,
|
||||
});
|
||||
|
||||
const sessionBefore = await resolveSessionBeforeForWakeup(deferredAgent, promotedTaskKey);
|
||||
const now = new Date();
|
||||
const newRun = await tx
|
||||
.insert(heartbeatRuns)
|
||||
.values({
|
||||
companyId: deferredAgent.companyId,
|
||||
agentId: deferredAgent.id,
|
||||
invocationSource: promotedSource,
|
||||
triggerDetail: promotedTriggerDetail,
|
||||
status: "queued",
|
||||
wakeupRequestId: deferred.id,
|
||||
contextSnapshot: promotedContextSnapshot,
|
||||
sessionIdBefore: sessionBefore,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
status: "queued",
|
||||
reason: "issue_execution_promoted",
|
||||
runId: newRun.id,
|
||||
claimedAt: null,
|
||||
finishedAt: null,
|
||||
error: null,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, deferred.id));
|
||||
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: newRun.id,
|
||||
executionAgentNameKey: normalizeAgentNameKey(deferredAgent.name),
|
||||
executionLockedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
|
||||
return newRun;
|
||||
}
|
||||
});
|
||||
|
||||
if (!promotedRun) return;
|
||||
|
||||
publishLiveEvent({
|
||||
companyId: promotedRun.companyId,
|
||||
type: "heartbeat.run.queued",
|
||||
payload: {
|
||||
runId: promotedRun.id,
|
||||
agentId: promotedRun.agentId,
|
||||
invocationSource: promotedRun.invocationSource,
|
||||
triggerDetail: promotedRun.triggerDetail,
|
||||
wakeupRequestId: promotedRun.wakeupRequestId,
|
||||
},
|
||||
});
|
||||
|
||||
await startNextQueuedRunForAgent(promotedRun.agentId);
|
||||
}
|
||||
|
||||
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
|
||||
const source = opts.source ?? "on_demand";
|
||||
const triggerDetail = opts.triggerDetail ?? null;
|
||||
const contextSnapshot: Record<string, unknown> = { ...(opts.contextSnapshot ?? {}) };
|
||||
const reason = opts.reason ?? null;
|
||||
const payload = opts.payload ?? null;
|
||||
const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]);
|
||||
const commentIdFromPayload = readNonEmptyString(payload?.["commentId"]);
|
||||
const taskKey = deriveTaskKey(contextSnapshot, payload);
|
||||
const wakeCommentId = deriveCommentId(contextSnapshot, payload);
|
||||
|
||||
if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) {
|
||||
contextSnapshot.wakeReason = reason;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) {
|
||||
contextSnapshot.issueId = issueIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) {
|
||||
contextSnapshot.taskId = issueIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["taskKey"]) && taskKey) {
|
||||
contextSnapshot.taskKey = taskKey;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["commentId"]) && commentIdFromPayload) {
|
||||
contextSnapshot.commentId = commentIdFromPayload;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeCommentId"]) && wakeCommentId) {
|
||||
contextSnapshot.wakeCommentId = wakeCommentId;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeSource"])) {
|
||||
contextSnapshot.wakeSource = source;
|
||||
}
|
||||
if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) {
|
||||
contextSnapshot.wakeTriggerDetail = triggerDetail;
|
||||
}
|
||||
const {
|
||||
contextSnapshot: enrichedContextSnapshot,
|
||||
issueIdFromPayload,
|
||||
taskKey,
|
||||
wakeCommentId,
|
||||
} = enrichWakeContextSnapshot({
|
||||
contextSnapshot,
|
||||
reason,
|
||||
source,
|
||||
triggerDetail,
|
||||
payload,
|
||||
});
|
||||
const issueId = readNonEmptyString(enrichedContextSnapshot.issueId) ?? issueIdFromPayload;
|
||||
|
||||
const agent = await getAgent(agentId);
|
||||
if (!agent) throw notFound("Agent not found");
|
||||
@@ -1080,6 +1319,243 @@ export function heartbeatService(db: Db) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (issueId) {
|
||||
const agentNameKey = normalizeAgentNameKey(agent.name);
|
||||
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
|
||||
|
||||
const outcome = await db.transaction(async (tx) => {
|
||||
await tx.execute(
|
||||
sql`select id from issues where id = ${issueId} and company_id = ${agent.companyId} for update`,
|
||||
);
|
||||
|
||||
const issue = await tx
|
||||
.select({
|
||||
id: issues.id,
|
||||
companyId: issues.companyId,
|
||||
executionRunId: issues.executionRunId,
|
||||
executionAgentNameKey: issues.executionAgentNameKey,
|
||||
})
|
||||
.from(issues)
|
||||
.where(and(eq(issues.id, issueId), eq(issues.companyId, agent.companyId)))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!issue) {
|
||||
await tx.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: "issue_execution_issue_not_found",
|
||||
payload,
|
||||
status: "skipped",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
return { kind: "skipped" as const };
|
||||
}
|
||||
|
||||
let activeExecutionRun = issue.executionRunId
|
||||
? await tx
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(eq(heartbeatRuns.id, issue.executionRunId))
|
||||
.then((rows) => rows[0] ?? null)
|
||||
: null;
|
||||
|
||||
if (activeExecutionRun && activeExecutionRun.status !== "queued" && activeExecutionRun.status !== "running") {
|
||||
activeExecutionRun = null;
|
||||
}
|
||||
|
||||
if (!activeExecutionRun && issue.executionRunId) {
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: null,
|
||||
executionAgentNameKey: null,
|
||||
executionLockedAt: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
}
|
||||
|
||||
if (!activeExecutionRun) {
|
||||
const legacyRun = await tx
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
.where(
|
||||
and(
|
||||
eq(heartbeatRuns.companyId, issue.companyId),
|
||||
inArray(heartbeatRuns.status, ["queued", "running"]),
|
||||
sql`${heartbeatRuns.contextSnapshot} ->> 'issueId' = ${issue.id}`,
|
||||
),
|
||||
)
|
||||
.orderBy(
|
||||
sql`case when ${heartbeatRuns.status} = 'running' then 0 else 1 end`,
|
||||
asc(heartbeatRuns.createdAt),
|
||||
)
|
||||
.limit(1)
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (legacyRun) {
|
||||
activeExecutionRun = legacyRun;
|
||||
const legacyAgent = await tx
|
||||
.select({ name: agents.name })
|
||||
.from(agents)
|
||||
.where(eq(agents.id, legacyRun.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: legacyRun.id,
|
||||
executionAgentNameKey: normalizeAgentNameKey(legacyAgent?.name),
|
||||
executionLockedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
}
|
||||
}
|
||||
|
||||
if (activeExecutionRun) {
|
||||
const executionAgent = await tx
|
||||
.select({ name: agents.name })
|
||||
.from(agents)
|
||||
.where(eq(agents.id, activeExecutionRun.agentId))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
const executionAgentNameKey =
|
||||
normalizeAgentNameKey(issue.executionAgentNameKey) ??
|
||||
normalizeAgentNameKey(executionAgent?.name);
|
||||
|
||||
if (executionAgentNameKey && executionAgentNameKey === agentNameKey) {
|
||||
const mergedContextSnapshot = mergeCoalescedContextSnapshot(
|
||||
activeExecutionRun.contextSnapshot,
|
||||
enrichedContextSnapshot,
|
||||
);
|
||||
const mergedRun = await tx
|
||||
.update(heartbeatRuns)
|
||||
.set({
|
||||
contextSnapshot: mergedContextSnapshot,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(heartbeatRuns.id, activeExecutionRun.id))
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? activeExecutionRun);
|
||||
|
||||
await tx.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: "issue_execution_same_name",
|
||||
payload,
|
||||
status: "coalesced",
|
||||
coalescedCount: 1,
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
runId: mergedRun.id,
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
|
||||
return { kind: "coalesced" as const, run: mergedRun };
|
||||
}
|
||||
|
||||
const deferredPayload = {
|
||||
...(payload ?? {}),
|
||||
issueId,
|
||||
[DEFERRED_WAKE_CONTEXT_KEY]: enrichedContextSnapshot,
|
||||
};
|
||||
|
||||
await tx.insert(agentWakeupRequests).values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason: "issue_execution_deferred",
|
||||
payload: deferredPayload,
|
||||
status: "deferred_issue_execution",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
});
|
||||
|
||||
return { kind: "deferred" as const };
|
||||
}
|
||||
|
||||
const wakeupRequest = await tx
|
||||
.insert(agentWakeupRequests)
|
||||
.values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
source,
|
||||
triggerDetail,
|
||||
reason,
|
||||
payload,
|
||||
status: "queued",
|
||||
requestedByActorType: opts.requestedByActorType ?? null,
|
||||
requestedByActorId: opts.requestedByActorId ?? null,
|
||||
idempotencyKey: opts.idempotencyKey ?? null,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
const newRun = await tx
|
||||
.insert(heartbeatRuns)
|
||||
.values({
|
||||
companyId: agent.companyId,
|
||||
agentId,
|
||||
invocationSource: source,
|
||||
triggerDetail,
|
||||
status: "queued",
|
||||
wakeupRequestId: wakeupRequest.id,
|
||||
contextSnapshot: enrichedContextSnapshot,
|
||||
sessionIdBefore: sessionBefore,
|
||||
})
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
await tx
|
||||
.update(agentWakeupRequests)
|
||||
.set({
|
||||
runId: newRun.id,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
|
||||
|
||||
await tx
|
||||
.update(issues)
|
||||
.set({
|
||||
executionRunId: newRun.id,
|
||||
executionAgentNameKey: agentNameKey,
|
||||
executionLockedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, issue.id));
|
||||
|
||||
return { kind: "queued" as const, run: newRun };
|
||||
});
|
||||
|
||||
if (outcome.kind === "deferred" || outcome.kind === "skipped") return null;
|
||||
if (outcome.kind === "coalesced") return outcome.run;
|
||||
|
||||
const newRun = outcome.run;
|
||||
publishLiveEvent({
|
||||
companyId: newRun.companyId,
|
||||
type: "heartbeat.run.queued",
|
||||
payload: {
|
||||
runId: newRun.id,
|
||||
agentId: newRun.agentId,
|
||||
invocationSource: newRun.invocationSource,
|
||||
triggerDetail: newRun.triggerDetail,
|
||||
wakeupRequestId: newRun.wakeupRequestId,
|
||||
},
|
||||
});
|
||||
|
||||
await startNextQueuedRunForAgent(agent.id);
|
||||
return newRun;
|
||||
}
|
||||
|
||||
const activeRuns = await db
|
||||
.select()
|
||||
.from(heartbeatRuns)
|
||||
@@ -1149,27 +1625,7 @@ export function heartbeatService(db: Db) {
|
||||
.returning()
|
||||
.then((rows) => rows[0]);
|
||||
|
||||
let sessionBefore: string | null = null;
|
||||
if (taskKey) {
|
||||
const codec = getAdapterSessionCodec(agent.adapterType);
|
||||
const existingTaskSession = await getTaskSession(
|
||||
agent.companyId,
|
||||
agent.id,
|
||||
agent.adapterType,
|
||||
taskKey,
|
||||
);
|
||||
const parsedParams = normalizeSessionParams(
|
||||
codec.deserialize(existingTaskSession?.sessionParamsJson ?? null),
|
||||
);
|
||||
sessionBefore = truncateDisplayId(
|
||||
existingTaskSession?.sessionDisplayId ??
|
||||
(codec.getDisplayId ? codec.getDisplayId(parsedParams) : null) ??
|
||||
readNonEmptyString(parsedParams?.sessionId),
|
||||
);
|
||||
} else {
|
||||
const runtimeForRun = await getRuntimeState(agent.id);
|
||||
sessionBefore = runtimeForRun?.sessionId ?? null;
|
||||
}
|
||||
const sessionBefore = await resolveSessionBeforeForWakeup(agent, taskKey);
|
||||
|
||||
const newRun = await db
|
||||
.insert(heartbeatRuns)
|
||||
@@ -1180,7 +1636,7 @@ export function heartbeatService(db: Db) {
|
||||
triggerDetail,
|
||||
status: "queued",
|
||||
wakeupRequestId: wakeupRequest.id,
|
||||
contextSnapshot,
|
||||
contextSnapshot: enrichedContextSnapshot,
|
||||
sessionIdBefore: sessionBefore,
|
||||
})
|
||||
.returning()
|
||||
@@ -1412,6 +1868,7 @@ export function heartbeatService(db: Db) {
|
||||
level: "warn",
|
||||
message: "run cancelled",
|
||||
});
|
||||
await releaseIssueExecutionAndPromote(cancelled);
|
||||
}
|
||||
|
||||
runningProcesses.delete(run.id);
|
||||
@@ -1443,6 +1900,7 @@ export function heartbeatService(db: Db) {
|
||||
running.child.kill("SIGTERM");
|
||||
runningProcesses.delete(run.id);
|
||||
}
|
||||
await releaseIssueExecutionAndPromote(run);
|
||||
}
|
||||
|
||||
return runs.length;
|
||||
|
||||
@@ -45,6 +45,11 @@ export interface IssueFilters {
|
||||
projectId?: string;
|
||||
}
|
||||
|
||||
function sameRunLock(checkoutRunId: string | null, actorRunId: string | null) {
|
||||
if (actorRunId) return checkoutRunId === actorRunId;
|
||||
return checkoutRunId == null;
|
||||
}
|
||||
|
||||
export function issueService(db: Db) {
|
||||
async function assertAssignableAgent(companyId: string, agentId: string) {
|
||||
const assignee = await db
|
||||
@@ -161,6 +166,12 @@ export function issueService(db: Db) {
|
||||
if (data.status && data.status !== "cancelled") {
|
||||
patch.cancelledAt = null;
|
||||
}
|
||||
if (data.status && data.status !== "in_progress") {
|
||||
patch.checkoutRunId = null;
|
||||
}
|
||||
if (data.assigneeAgentId !== undefined && data.assigneeAgentId !== existing.assigneeAgentId) {
|
||||
patch.checkoutRunId = null;
|
||||
}
|
||||
|
||||
return db
|
||||
.update(issues)
|
||||
@@ -192,7 +203,7 @@ export function issueService(db: Db) {
|
||||
return removedIssue;
|
||||
}),
|
||||
|
||||
checkout: async (id: string, agentId: string, expectedStatuses: string[]) => {
|
||||
checkout: async (id: string, agentId: string, expectedStatuses: string[], checkoutRunId: string | null) => {
|
||||
const issueCompany = await db
|
||||
.select({ companyId: issues.companyId })
|
||||
.from(issues)
|
||||
@@ -202,10 +213,21 @@ export function issueService(db: Db) {
|
||||
await assertAssignableAgent(issueCompany.companyId, agentId);
|
||||
|
||||
const now = new Date();
|
||||
const sameRunAssigneeCondition = checkoutRunId
|
||||
? and(
|
||||
eq(issues.assigneeAgentId, agentId),
|
||||
or(isNull(issues.checkoutRunId), eq(issues.checkoutRunId, checkoutRunId)),
|
||||
)
|
||||
: and(eq(issues.assigneeAgentId, agentId), isNull(issues.checkoutRunId));
|
||||
const executionLockCondition = checkoutRunId
|
||||
? or(isNull(issues.executionRunId), eq(issues.executionRunId, checkoutRunId))
|
||||
: isNull(issues.executionRunId);
|
||||
const updated = await db
|
||||
.update(issues)
|
||||
.set({
|
||||
assigneeAgentId: agentId,
|
||||
checkoutRunId,
|
||||
executionRunId: checkoutRunId,
|
||||
status: "in_progress",
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
@@ -214,7 +236,8 @@ export function issueService(db: Db) {
|
||||
and(
|
||||
eq(issues.id, id),
|
||||
inArray(issues.status, expectedStatuses),
|
||||
or(isNull(issues.assigneeAgentId), eq(issues.assigneeAgentId, agentId)),
|
||||
or(isNull(issues.assigneeAgentId), sameRunAssigneeCondition),
|
||||
executionLockCondition,
|
||||
),
|
||||
)
|
||||
.returning()
|
||||
@@ -227,6 +250,8 @@ export function issueService(db: Db) {
|
||||
id: issues.id,
|
||||
status: issues.status,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
checkoutRunId: issues.checkoutRunId,
|
||||
executionRunId: issues.executionRunId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(eq(issues.id, id))
|
||||
@@ -234,8 +259,40 @@ export function issueService(db: Db) {
|
||||
|
||||
if (!current) throw notFound("Issue not found");
|
||||
|
||||
// If this agent already owns it and it's in_progress, return it (no self-409)
|
||||
if (current.assigneeAgentId === agentId && current.status === "in_progress") {
|
||||
if (
|
||||
current.assigneeAgentId === agentId &&
|
||||
current.status === "in_progress" &&
|
||||
current.checkoutRunId == null &&
|
||||
(current.executionRunId == null || current.executionRunId === checkoutRunId) &&
|
||||
checkoutRunId
|
||||
) {
|
||||
const adopted = await db
|
||||
.update(issues)
|
||||
.set({
|
||||
checkoutRunId,
|
||||
executionRunId: checkoutRunId,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(issues.id, id),
|
||||
eq(issues.status, "in_progress"),
|
||||
eq(issues.assigneeAgentId, agentId),
|
||||
isNull(issues.checkoutRunId),
|
||||
or(isNull(issues.executionRunId), eq(issues.executionRunId, checkoutRunId)),
|
||||
),
|
||||
)
|
||||
.returning()
|
||||
.then((rows) => rows[0] ?? null);
|
||||
if (adopted) return adopted;
|
||||
}
|
||||
|
||||
// If this run already owns it and it's in_progress, return it (no self-409)
|
||||
if (
|
||||
current.assigneeAgentId === agentId &&
|
||||
current.status === "in_progress" &&
|
||||
sameRunLock(current.checkoutRunId, checkoutRunId)
|
||||
) {
|
||||
return db.select().from(issues).where(eq(issues.id, id)).then((rows) => rows[0]!);
|
||||
}
|
||||
|
||||
@@ -243,10 +300,44 @@ export function issueService(db: Db) {
|
||||
issueId: current.id,
|
||||
status: current.status,
|
||||
assigneeAgentId: current.assigneeAgentId,
|
||||
checkoutRunId: current.checkoutRunId,
|
||||
executionRunId: current.executionRunId,
|
||||
});
|
||||
},
|
||||
|
||||
release: async (id: string, actorAgentId?: string) => {
|
||||
assertCheckoutOwner: async (id: string, actorAgentId: string, actorRunId: string | null) => {
|
||||
const current = await db
|
||||
.select({
|
||||
id: issues.id,
|
||||
status: issues.status,
|
||||
assigneeAgentId: issues.assigneeAgentId,
|
||||
checkoutRunId: issues.checkoutRunId,
|
||||
})
|
||||
.from(issues)
|
||||
.where(eq(issues.id, id))
|
||||
.then((rows) => rows[0] ?? null);
|
||||
|
||||
if (!current) throw notFound("Issue not found");
|
||||
|
||||
if (
|
||||
current.status === "in_progress" &&
|
||||
current.assigneeAgentId === actorAgentId &&
|
||||
sameRunLock(current.checkoutRunId, actorRunId)
|
||||
) {
|
||||
return current;
|
||||
}
|
||||
|
||||
throw conflict("Issue run ownership conflict", {
|
||||
issueId: current.id,
|
||||
status: current.status,
|
||||
assigneeAgentId: current.assigneeAgentId,
|
||||
checkoutRunId: current.checkoutRunId,
|
||||
actorAgentId,
|
||||
actorRunId,
|
||||
});
|
||||
},
|
||||
|
||||
release: async (id: string, actorAgentId?: string, actorRunId?: string | null) => {
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(issues)
|
||||
@@ -257,12 +348,27 @@ export function issueService(db: Db) {
|
||||
if (actorAgentId && existing.assigneeAgentId && existing.assigneeAgentId !== actorAgentId) {
|
||||
throw conflict("Only assignee can release issue");
|
||||
}
|
||||
if (
|
||||
actorAgentId &&
|
||||
existing.status === "in_progress" &&
|
||||
existing.assigneeAgentId === actorAgentId &&
|
||||
existing.checkoutRunId &&
|
||||
!sameRunLock(existing.checkoutRunId, actorRunId ?? null)
|
||||
) {
|
||||
throw conflict("Only checkout run can release issue", {
|
||||
issueId: existing.id,
|
||||
assigneeAgentId: existing.assigneeAgentId,
|
||||
checkoutRunId: existing.checkoutRunId,
|
||||
actorRunId: actorRunId ?? null,
|
||||
});
|
||||
}
|
||||
|
||||
return db
|
||||
.update(issues)
|
||||
.set({
|
||||
status: "todo",
|
||||
assigneeAgentId: null,
|
||||
checkoutRunId: null,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(issues.id, id))
|
||||
|
||||
Reference in New Issue
Block a user