Add worktree-aware workspace runtime support
This commit is contained in:
962
server/src/services/workspace-runtime.ts
Normal file
962
server/src/services/workspace-runtime.ts
Normal file
@@ -0,0 +1,962 @@
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
import fs from "node:fs/promises";
|
||||
import net from "node:net";
|
||||
import { createHash, randomUUID } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import { setTimeout as delay } from "node:timers/promises";
|
||||
import type { AdapterRuntimeServiceReport } from "@paperclipai/adapter-utils";
|
||||
import type { Db } from "@paperclipai/db";
|
||||
import { workspaceRuntimeServices } from "@paperclipai/db";
|
||||
import { and, desc, eq, inArray } from "drizzle-orm";
|
||||
import { asNumber, asString, parseObject, renderTemplate } from "../adapters/utils.js";
|
||||
import { resolveHomeAwarePath } from "../home-paths.js";
|
||||
|
||||
export interface ExecutionWorkspaceInput {
|
||||
baseCwd: string;
|
||||
source: "project_primary" | "task_session" | "agent_home";
|
||||
projectId: string | null;
|
||||
workspaceId: string | null;
|
||||
repoUrl: string | null;
|
||||
repoRef: string | null;
|
||||
}
|
||||
|
||||
export interface ExecutionWorkspaceIssueRef {
|
||||
id: string;
|
||||
identifier: string | null;
|
||||
title: string | null;
|
||||
}
|
||||
|
||||
export interface ExecutionWorkspaceAgentRef {
|
||||
id: string;
|
||||
name: string;
|
||||
companyId: string;
|
||||
}
|
||||
|
||||
export interface RealizedExecutionWorkspace extends ExecutionWorkspaceInput {
|
||||
strategy: "project_primary" | "git_worktree";
|
||||
cwd: string;
|
||||
branchName: string | null;
|
||||
worktreePath: string | null;
|
||||
warnings: string[];
|
||||
created: boolean;
|
||||
}
|
||||
|
||||
export interface RuntimeServiceRef {
|
||||
id: string;
|
||||
companyId: string;
|
||||
projectId: string | null;
|
||||
projectWorkspaceId: string | null;
|
||||
issueId: string | null;
|
||||
serviceName: string;
|
||||
status: "starting" | "running" | "stopped" | "failed";
|
||||
lifecycle: "shared" | "ephemeral";
|
||||
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
|
||||
scopeId: string | null;
|
||||
reuseKey: string | null;
|
||||
command: string | null;
|
||||
cwd: string | null;
|
||||
port: number | null;
|
||||
url: string | null;
|
||||
provider: "local_process" | "adapter_managed";
|
||||
providerRef: string | null;
|
||||
ownerAgentId: string | null;
|
||||
startedByRunId: string | null;
|
||||
lastUsedAt: string;
|
||||
startedAt: string;
|
||||
stoppedAt: string | null;
|
||||
stopPolicy: Record<string, unknown> | null;
|
||||
healthStatus: "unknown" | "healthy" | "unhealthy";
|
||||
reused: boolean;
|
||||
}
|
||||
|
||||
interface RuntimeServiceRecord extends RuntimeServiceRef {
|
||||
db?: Db;
|
||||
child: ChildProcess | null;
|
||||
leaseRunIds: Set<string>;
|
||||
idleTimer: ReturnType<typeof globalThis.setTimeout> | null;
|
||||
envFingerprint: string;
|
||||
}
|
||||
|
||||
const runtimeServicesById = new Map<string, RuntimeServiceRecord>();
|
||||
const runtimeServicesByReuseKey = new Map<string, string>();
|
||||
const runtimeServiceLeasesByRun = new Map<string, string[]>();
|
||||
|
||||
function stableStringify(value: unknown): string {
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map((entry) => stableStringify(entry)).join(",")}]`;
|
||||
}
|
||||
if (value && typeof value === "object") {
|
||||
const rec = value as Record<string, unknown>;
|
||||
return `{${Object.keys(rec).sort().map((key) => `${JSON.stringify(key)}:${stableStringify(rec[key])}`).join(",")}}`;
|
||||
}
|
||||
return JSON.stringify(value);
|
||||
}
|
||||
|
||||
function stableRuntimeServiceId(input: {
|
||||
adapterType: string;
|
||||
runId: string;
|
||||
scopeType: RuntimeServiceRef["scopeType"];
|
||||
scopeId: string | null;
|
||||
serviceName: string;
|
||||
reportId: string | null;
|
||||
providerRef: string | null;
|
||||
reuseKey: string | null;
|
||||
}) {
|
||||
if (input.reportId) return input.reportId;
|
||||
const digest = createHash("sha256")
|
||||
.update(
|
||||
stableStringify({
|
||||
adapterType: input.adapterType,
|
||||
runId: input.runId,
|
||||
scopeType: input.scopeType,
|
||||
scopeId: input.scopeId,
|
||||
serviceName: input.serviceName,
|
||||
providerRef: input.providerRef,
|
||||
reuseKey: input.reuseKey,
|
||||
}),
|
||||
)
|
||||
.digest("hex")
|
||||
.slice(0, 32);
|
||||
return `${input.adapterType}-${digest}`;
|
||||
}
|
||||
|
||||
function toRuntimeServiceRef(record: RuntimeServiceRecord, overrides?: Partial<RuntimeServiceRef>): RuntimeServiceRef {
|
||||
return {
|
||||
id: record.id,
|
||||
companyId: record.companyId,
|
||||
projectId: record.projectId,
|
||||
projectWorkspaceId: record.projectWorkspaceId,
|
||||
issueId: record.issueId,
|
||||
serviceName: record.serviceName,
|
||||
status: record.status,
|
||||
lifecycle: record.lifecycle,
|
||||
scopeType: record.scopeType,
|
||||
scopeId: record.scopeId,
|
||||
reuseKey: record.reuseKey,
|
||||
command: record.command,
|
||||
cwd: record.cwd,
|
||||
port: record.port,
|
||||
url: record.url,
|
||||
provider: record.provider,
|
||||
providerRef: record.providerRef,
|
||||
ownerAgentId: record.ownerAgentId,
|
||||
startedByRunId: record.startedByRunId,
|
||||
lastUsedAt: record.lastUsedAt,
|
||||
startedAt: record.startedAt,
|
||||
stoppedAt: record.stoppedAt,
|
||||
stopPolicy: record.stopPolicy,
|
||||
healthStatus: record.healthStatus,
|
||||
reused: record.reused,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function sanitizeSlugPart(value: string | null | undefined, fallback: string): string {
|
||||
const raw = (value ?? "").trim().toLowerCase();
|
||||
const normalized = raw
|
||||
.replace(/[^a-z0-9/_-]+/g, "-")
|
||||
.replace(/-+/g, "-")
|
||||
.replace(/^[-/]+|[-/]+$/g, "");
|
||||
return normalized.length > 0 ? normalized : fallback;
|
||||
}
|
||||
|
||||
function renderWorkspaceTemplate(template: string, input: {
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
projectId: string | null;
|
||||
repoRef: string | null;
|
||||
}) {
|
||||
const issueIdentifier = input.issue?.identifier ?? input.issue?.id ?? "issue";
|
||||
const slug = sanitizeSlugPart(input.issue?.title, sanitizeSlugPart(issueIdentifier, "issue"));
|
||||
return renderTemplate(template, {
|
||||
issue: {
|
||||
id: input.issue?.id ?? "",
|
||||
identifier: input.issue?.identifier ?? "",
|
||||
title: input.issue?.title ?? "",
|
||||
},
|
||||
agent: {
|
||||
id: input.agent.id,
|
||||
name: input.agent.name,
|
||||
},
|
||||
project: {
|
||||
id: input.projectId ?? "",
|
||||
},
|
||||
workspace: {
|
||||
repoRef: input.repoRef ?? "",
|
||||
},
|
||||
slug,
|
||||
});
|
||||
}
|
||||
|
||||
function sanitizeBranchName(value: string): string {
|
||||
return value
|
||||
.trim()
|
||||
.replace(/[^A-Za-z0-9._/-]+/g, "-")
|
||||
.replace(/-+/g, "-")
|
||||
.replace(/^[-/.]+|[-/.]+$/g, "")
|
||||
.slice(0, 120) || "paperclip-work";
|
||||
}
|
||||
|
||||
function isAbsolutePath(value: string) {
|
||||
return path.isAbsolute(value) || value.startsWith("~");
|
||||
}
|
||||
|
||||
function resolveConfiguredPath(value: string, baseDir: string): string {
|
||||
if (isAbsolutePath(value)) {
|
||||
return resolveHomeAwarePath(value);
|
||||
}
|
||||
return path.resolve(baseDir, value);
|
||||
}
|
||||
|
||||
async function runGit(args: string[], cwd: string): Promise<string> {
|
||||
const proc = await new Promise<{ stdout: string; stderr: string; code: number | null }>((resolve, reject) => {
|
||||
const child = spawn("git", args, {
|
||||
cwd,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
env: process.env,
|
||||
});
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
child.stdout?.on("data", (chunk) => {
|
||||
stdout += String(chunk);
|
||||
});
|
||||
child.stderr?.on("data", (chunk) => {
|
||||
stderr += String(chunk);
|
||||
});
|
||||
child.on("error", reject);
|
||||
child.on("close", (code) => resolve({ stdout, stderr, code }));
|
||||
});
|
||||
if (proc.code !== 0) {
|
||||
throw new Error(proc.stderr.trim() || proc.stdout.trim() || `git ${args.join(" ")} failed`);
|
||||
}
|
||||
return proc.stdout.trim();
|
||||
}
|
||||
|
||||
async function directoryExists(value: string) {
|
||||
return fs.stat(value).then((stats) => stats.isDirectory()).catch(() => false);
|
||||
}
|
||||
|
||||
export async function realizeExecutionWorkspace(input: {
|
||||
base: ExecutionWorkspaceInput;
|
||||
config: Record<string, unknown>;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
}): Promise<RealizedExecutionWorkspace> {
|
||||
const rawStrategy = parseObject(input.config.workspaceStrategy);
|
||||
const strategyType = asString(rawStrategy.type, "project_primary");
|
||||
if (strategyType !== "git_worktree") {
|
||||
return {
|
||||
...input.base,
|
||||
strategy: "project_primary",
|
||||
cwd: input.base.baseCwd,
|
||||
branchName: null,
|
||||
worktreePath: null,
|
||||
warnings: [],
|
||||
created: false,
|
||||
};
|
||||
}
|
||||
|
||||
const repoRoot = await runGit(["rev-parse", "--show-toplevel"], input.base.baseCwd);
|
||||
const branchTemplate = asString(rawStrategy.branchTemplate, "{{issue.identifier}}-{{slug}}");
|
||||
const renderedBranch = renderWorkspaceTemplate(branchTemplate, {
|
||||
issue: input.issue,
|
||||
agent: input.agent,
|
||||
projectId: input.base.projectId,
|
||||
repoRef: input.base.repoRef,
|
||||
});
|
||||
const branchName = sanitizeBranchName(renderedBranch);
|
||||
const configuredParentDir = asString(rawStrategy.worktreeParentDir, "");
|
||||
const worktreeParentDir = configuredParentDir
|
||||
? resolveConfiguredPath(configuredParentDir, repoRoot)
|
||||
: path.join(repoRoot, ".paperclip", "worktrees");
|
||||
const worktreePath = path.join(worktreeParentDir, branchName);
|
||||
const baseRef = asString(rawStrategy.baseRef, input.base.repoRef ?? "HEAD");
|
||||
|
||||
await fs.mkdir(worktreeParentDir, { recursive: true });
|
||||
|
||||
const existingWorktree = await directoryExists(worktreePath);
|
||||
if (existingWorktree) {
|
||||
const existingGitDir = await runGit(["rev-parse", "--git-dir"], worktreePath).catch(() => null);
|
||||
if (existingGitDir) {
|
||||
return {
|
||||
...input.base,
|
||||
strategy: "git_worktree",
|
||||
cwd: worktreePath,
|
||||
branchName,
|
||||
worktreePath,
|
||||
warnings: [],
|
||||
created: false,
|
||||
};
|
||||
}
|
||||
throw new Error(`Configured worktree path "${worktreePath}" already exists and is not a git worktree.`);
|
||||
}
|
||||
|
||||
await runGit(["worktree", "add", "-B", branchName, worktreePath, baseRef], repoRoot);
|
||||
|
||||
return {
|
||||
...input.base,
|
||||
strategy: "git_worktree",
|
||||
cwd: worktreePath,
|
||||
branchName,
|
||||
worktreePath,
|
||||
warnings: [],
|
||||
created: true,
|
||||
};
|
||||
}
|
||||
|
||||
async function allocatePort(): Promise<number> {
|
||||
return await new Promise<number>((resolve, reject) => {
|
||||
const server = net.createServer();
|
||||
server.listen(0, "127.0.0.1", () => {
|
||||
const address = server.address();
|
||||
server.close((err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
if (!address || typeof address === "string") {
|
||||
reject(new Error("Failed to allocate port"));
|
||||
return;
|
||||
}
|
||||
resolve(address.port);
|
||||
});
|
||||
});
|
||||
server.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
function buildTemplateData(input: {
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
adapterEnv: Record<string, string>;
|
||||
port: number | null;
|
||||
}) {
|
||||
return {
|
||||
workspace: {
|
||||
cwd: input.workspace.cwd,
|
||||
branchName: input.workspace.branchName ?? "",
|
||||
worktreePath: input.workspace.worktreePath ?? "",
|
||||
repoUrl: input.workspace.repoUrl ?? "",
|
||||
repoRef: input.workspace.repoRef ?? "",
|
||||
env: input.adapterEnv,
|
||||
},
|
||||
issue: {
|
||||
id: input.issue?.id ?? "",
|
||||
identifier: input.issue?.identifier ?? "",
|
||||
title: input.issue?.title ?? "",
|
||||
},
|
||||
agent: {
|
||||
id: input.agent.id,
|
||||
name: input.agent.name,
|
||||
},
|
||||
port: input.port ?? "",
|
||||
};
|
||||
}
|
||||
|
||||
function resolveServiceScopeId(input: {
|
||||
service: Record<string, unknown>;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
runId: string;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
}): {
|
||||
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
|
||||
scopeId: string | null;
|
||||
} {
|
||||
const scopeTypeRaw = asString(input.service.reuseScope, input.service.lifecycle === "shared" ? "project_workspace" : "run");
|
||||
const scopeType =
|
||||
scopeTypeRaw === "project_workspace" ||
|
||||
scopeTypeRaw === "execution_workspace" ||
|
||||
scopeTypeRaw === "agent"
|
||||
? scopeTypeRaw
|
||||
: "run";
|
||||
if (scopeType === "project_workspace") return { scopeType, scopeId: input.workspace.workspaceId ?? input.workspace.projectId };
|
||||
if (scopeType === "execution_workspace") return { scopeType, scopeId: input.workspace.cwd };
|
||||
if (scopeType === "agent") return { scopeType, scopeId: input.agent.id };
|
||||
return { scopeType: "run" as const, scopeId: input.runId };
|
||||
}
|
||||
|
||||
async function waitForReadiness(input: {
|
||||
service: Record<string, unknown>;
|
||||
url: string | null;
|
||||
}) {
|
||||
const readiness = parseObject(input.service.readiness);
|
||||
const readinessType = asString(readiness.type, "");
|
||||
if (readinessType !== "http" || !input.url) return;
|
||||
const timeoutSec = Math.max(1, asNumber(readiness.timeoutSec, 30));
|
||||
const intervalMs = Math.max(100, asNumber(readiness.intervalMs, 500));
|
||||
const deadline = Date.now() + timeoutSec * 1000;
|
||||
let lastError = "service did not become ready";
|
||||
while (Date.now() < deadline) {
|
||||
try {
|
||||
const response = await fetch(input.url);
|
||||
if (response.ok) return;
|
||||
lastError = `received HTTP ${response.status}`;
|
||||
} catch (err) {
|
||||
lastError = err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
await delay(intervalMs);
|
||||
}
|
||||
throw new Error(`Readiness check failed for ${input.url}: ${lastError}`);
|
||||
}
|
||||
|
||||
function toPersistedWorkspaceRuntimeService(record: RuntimeServiceRecord): typeof workspaceRuntimeServices.$inferInsert {
|
||||
return {
|
||||
id: record.id,
|
||||
companyId: record.companyId,
|
||||
projectId: record.projectId,
|
||||
projectWorkspaceId: record.projectWorkspaceId,
|
||||
issueId: record.issueId,
|
||||
scopeType: record.scopeType,
|
||||
scopeId: record.scopeId,
|
||||
serviceName: record.serviceName,
|
||||
status: record.status,
|
||||
lifecycle: record.lifecycle,
|
||||
reuseKey: record.reuseKey,
|
||||
command: record.command,
|
||||
cwd: record.cwd,
|
||||
port: record.port,
|
||||
url: record.url,
|
||||
provider: record.provider,
|
||||
providerRef: record.providerRef,
|
||||
ownerAgentId: record.ownerAgentId,
|
||||
startedByRunId: record.startedByRunId,
|
||||
lastUsedAt: new Date(record.lastUsedAt),
|
||||
startedAt: new Date(record.startedAt),
|
||||
stoppedAt: record.stoppedAt ? new Date(record.stoppedAt) : null,
|
||||
stopPolicy: record.stopPolicy,
|
||||
healthStatus: record.healthStatus,
|
||||
updatedAt: new Date(),
|
||||
};
|
||||
}
|
||||
|
||||
async function persistRuntimeServiceRecord(db: Db | undefined, record: RuntimeServiceRecord) {
|
||||
if (!db) return;
|
||||
const values = toPersistedWorkspaceRuntimeService(record);
|
||||
await db
|
||||
.insert(workspaceRuntimeServices)
|
||||
.values(values)
|
||||
.onConflictDoUpdate({
|
||||
target: workspaceRuntimeServices.id,
|
||||
set: {
|
||||
projectId: values.projectId,
|
||||
projectWorkspaceId: values.projectWorkspaceId,
|
||||
issueId: values.issueId,
|
||||
scopeType: values.scopeType,
|
||||
scopeId: values.scopeId,
|
||||
serviceName: values.serviceName,
|
||||
status: values.status,
|
||||
lifecycle: values.lifecycle,
|
||||
reuseKey: values.reuseKey,
|
||||
command: values.command,
|
||||
cwd: values.cwd,
|
||||
port: values.port,
|
||||
url: values.url,
|
||||
provider: values.provider,
|
||||
providerRef: values.providerRef,
|
||||
ownerAgentId: values.ownerAgentId,
|
||||
startedByRunId: values.startedByRunId,
|
||||
lastUsedAt: values.lastUsedAt,
|
||||
startedAt: values.startedAt,
|
||||
stoppedAt: values.stoppedAt,
|
||||
stopPolicy: values.stopPolicy,
|
||||
healthStatus: values.healthStatus,
|
||||
updatedAt: values.updatedAt,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function clearIdleTimer(record: RuntimeServiceRecord) {
|
||||
if (!record.idleTimer) return;
|
||||
clearTimeout(record.idleTimer);
|
||||
record.idleTimer = null;
|
||||
}
|
||||
|
||||
export function normalizeAdapterManagedRuntimeServices(input: {
|
||||
adapterType: string;
|
||||
runId: string;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
reports: AdapterRuntimeServiceReport[];
|
||||
now?: Date;
|
||||
}): RuntimeServiceRef[] {
|
||||
const nowIso = (input.now ?? new Date()).toISOString();
|
||||
return input.reports.map((report) => {
|
||||
const scopeType = report.scopeType ?? "run";
|
||||
const scopeId =
|
||||
report.scopeId ??
|
||||
(scopeType === "project_workspace"
|
||||
? input.workspace.workspaceId
|
||||
: scopeType === "execution_workspace"
|
||||
? input.workspace.cwd
|
||||
: scopeType === "agent"
|
||||
? input.agent.id
|
||||
: input.runId) ??
|
||||
null;
|
||||
const serviceName = asString(report.serviceName, "").trim() || "service";
|
||||
const status = report.status ?? "running";
|
||||
const lifecycle = report.lifecycle ?? "ephemeral";
|
||||
const healthStatus =
|
||||
report.healthStatus ??
|
||||
(status === "running" ? "healthy" : status === "failed" ? "unhealthy" : "unknown");
|
||||
return {
|
||||
id: stableRuntimeServiceId({
|
||||
adapterType: input.adapterType,
|
||||
runId: input.runId,
|
||||
scopeType,
|
||||
scopeId,
|
||||
serviceName,
|
||||
reportId: report.id ?? null,
|
||||
providerRef: report.providerRef ?? null,
|
||||
reuseKey: report.reuseKey ?? null,
|
||||
}),
|
||||
companyId: input.agent.companyId,
|
||||
projectId: report.projectId ?? input.workspace.projectId,
|
||||
projectWorkspaceId: report.projectWorkspaceId ?? input.workspace.workspaceId,
|
||||
issueId: report.issueId ?? input.issue?.id ?? null,
|
||||
serviceName,
|
||||
status,
|
||||
lifecycle,
|
||||
scopeType,
|
||||
scopeId,
|
||||
reuseKey: report.reuseKey ?? null,
|
||||
command: report.command ?? null,
|
||||
cwd: report.cwd ?? null,
|
||||
port: report.port ?? null,
|
||||
url: report.url ?? null,
|
||||
provider: "adapter_managed",
|
||||
providerRef: report.providerRef ?? null,
|
||||
ownerAgentId: report.ownerAgentId ?? input.agent.id,
|
||||
startedByRunId: input.runId,
|
||||
lastUsedAt: nowIso,
|
||||
startedAt: nowIso,
|
||||
stoppedAt: status === "running" || status === "starting" ? null : nowIso,
|
||||
stopPolicy: report.stopPolicy ?? null,
|
||||
healthStatus,
|
||||
reused: false,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function startLocalRuntimeService(input: {
|
||||
db?: Db;
|
||||
runId: string;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
adapterEnv: Record<string, string>;
|
||||
service: Record<string, unknown>;
|
||||
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
|
||||
reuseKey: string | null;
|
||||
scopeType: "project_workspace" | "execution_workspace" | "run" | "agent";
|
||||
scopeId: string | null;
|
||||
}): Promise<RuntimeServiceRecord> {
|
||||
const serviceName = asString(input.service.name, "service");
|
||||
const lifecycle = asString(input.service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const command = asString(input.service.command, "");
|
||||
if (!command) throw new Error(`Runtime service "${serviceName}" is missing command`);
|
||||
const serviceCwdTemplate = asString(input.service.cwd, ".");
|
||||
const portConfig = parseObject(input.service.port);
|
||||
const port = asString(portConfig.type, "") === "auto" ? await allocatePort() : null;
|
||||
const envConfig = parseObject(input.service.env);
|
||||
const templateData = buildTemplateData({
|
||||
workspace: input.workspace,
|
||||
agent: input.agent,
|
||||
issue: input.issue,
|
||||
adapterEnv: input.adapterEnv,
|
||||
port,
|
||||
});
|
||||
const serviceCwd = resolveConfiguredPath(renderTemplate(serviceCwdTemplate, templateData), input.workspace.cwd);
|
||||
const env: Record<string, string> = { ...process.env, ...input.adapterEnv } as Record<string, string>;
|
||||
for (const [key, value] of Object.entries(envConfig)) {
|
||||
if (typeof value === "string") {
|
||||
env[key] = renderTemplate(value, templateData);
|
||||
}
|
||||
}
|
||||
if (port) {
|
||||
const portEnvKey = asString(portConfig.envKey, "PORT");
|
||||
env[portEnvKey] = String(port);
|
||||
}
|
||||
const shell = process.env.SHELL?.trim() || "/bin/sh";
|
||||
const child = spawn(shell, ["-lc", command], {
|
||||
cwd: serviceCwd,
|
||||
env,
|
||||
detached: false,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
let stderrExcerpt = "";
|
||||
let stdoutExcerpt = "";
|
||||
child.stdout?.on("data", async (chunk) => {
|
||||
const text = String(chunk);
|
||||
stdoutExcerpt = (stdoutExcerpt + text).slice(-4096);
|
||||
if (input.onLog) await input.onLog("stdout", `[service:${serviceName}] ${text}`);
|
||||
});
|
||||
child.stderr?.on("data", async (chunk) => {
|
||||
const text = String(chunk);
|
||||
stderrExcerpt = (stderrExcerpt + text).slice(-4096);
|
||||
if (input.onLog) await input.onLog("stderr", `[service:${serviceName}] ${text}`);
|
||||
});
|
||||
|
||||
const expose = parseObject(input.service.expose);
|
||||
const readiness = parseObject(input.service.readiness);
|
||||
const urlTemplate =
|
||||
asString(expose.urlTemplate, "") ||
|
||||
asString(readiness.urlTemplate, "");
|
||||
const url = urlTemplate ? renderTemplate(urlTemplate, templateData) : null;
|
||||
|
||||
try {
|
||||
await waitForReadiness({ service: input.service, url });
|
||||
} catch (err) {
|
||||
child.kill("SIGTERM");
|
||||
throw new Error(
|
||||
`Failed to start runtime service "${serviceName}": ${err instanceof Error ? err.message : String(err)}${stderrExcerpt ? ` | stderr: ${stderrExcerpt.trim()}` : ""}`,
|
||||
);
|
||||
}
|
||||
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
|
||||
return {
|
||||
id: randomUUID(),
|
||||
companyId: input.agent.companyId,
|
||||
projectId: input.workspace.projectId,
|
||||
projectWorkspaceId: input.workspace.workspaceId,
|
||||
issueId: input.issue?.id ?? null,
|
||||
serviceName,
|
||||
status: "running",
|
||||
lifecycle,
|
||||
scopeType: input.scopeType,
|
||||
scopeId: input.scopeId,
|
||||
reuseKey: input.reuseKey,
|
||||
command,
|
||||
cwd: serviceCwd,
|
||||
port,
|
||||
url,
|
||||
provider: "local_process",
|
||||
providerRef: child.pid ? String(child.pid) : null,
|
||||
ownerAgentId: input.agent.id,
|
||||
startedByRunId: input.runId,
|
||||
lastUsedAt: new Date().toISOString(),
|
||||
startedAt: new Date().toISOString(),
|
||||
stoppedAt: null,
|
||||
stopPolicy: parseObject(input.service.stopPolicy),
|
||||
healthStatus: "healthy",
|
||||
reused: false,
|
||||
db: input.db,
|
||||
child,
|
||||
leaseRunIds: new Set([input.runId]),
|
||||
idleTimer: null,
|
||||
envFingerprint,
|
||||
};
|
||||
}
|
||||
|
||||
function scheduleIdleStop(record: RuntimeServiceRecord) {
|
||||
clearIdleTimer(record);
|
||||
const stopType = asString(record.stopPolicy?.type, "manual");
|
||||
if (stopType !== "idle_timeout") return;
|
||||
const idleSeconds = Math.max(1, asNumber(record.stopPolicy?.idleSeconds, 1800));
|
||||
record.idleTimer = setTimeout(() => {
|
||||
stopRuntimeService(record.id).catch(() => undefined);
|
||||
}, idleSeconds * 1000);
|
||||
}
|
||||
|
||||
async function stopRuntimeService(serviceId: string) {
|
||||
const record = runtimeServicesById.get(serviceId);
|
||||
if (!record) return;
|
||||
clearIdleTimer(record);
|
||||
record.status = "stopped";
|
||||
record.lastUsedAt = new Date().toISOString();
|
||||
record.stoppedAt = new Date().toISOString();
|
||||
if (record.child && !record.child.killed) {
|
||||
record.child.kill("SIGTERM");
|
||||
}
|
||||
runtimeServicesById.delete(serviceId);
|
||||
if (record.reuseKey) {
|
||||
runtimeServicesByReuseKey.delete(record.reuseKey);
|
||||
}
|
||||
await persistRuntimeServiceRecord(record.db, record);
|
||||
}
|
||||
|
||||
function registerRuntimeService(db: Db | undefined, record: RuntimeServiceRecord) {
|
||||
record.db = db;
|
||||
runtimeServicesById.set(record.id, record);
|
||||
if (record.reuseKey) {
|
||||
runtimeServicesByReuseKey.set(record.reuseKey, record.id);
|
||||
}
|
||||
|
||||
record.child?.on("exit", (code, signal) => {
|
||||
const current = runtimeServicesById.get(record.id);
|
||||
if (!current) return;
|
||||
clearIdleTimer(current);
|
||||
current.status = code === 0 || signal === "SIGTERM" ? "stopped" : "failed";
|
||||
current.healthStatus = current.status === "failed" ? "unhealthy" : "unknown";
|
||||
current.lastUsedAt = new Date().toISOString();
|
||||
current.stoppedAt = new Date().toISOString();
|
||||
runtimeServicesById.delete(current.id);
|
||||
if (current.reuseKey && runtimeServicesByReuseKey.get(current.reuseKey) === current.id) {
|
||||
runtimeServicesByReuseKey.delete(current.reuseKey);
|
||||
}
|
||||
void persistRuntimeServiceRecord(db, current);
|
||||
});
|
||||
}
|
||||
|
||||
export async function ensureRuntimeServicesForRun(input: {
|
||||
db?: Db;
|
||||
runId: string;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
config: Record<string, unknown>;
|
||||
adapterEnv: Record<string, string>;
|
||||
onLog?: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
|
||||
}): Promise<RuntimeServiceRef[]> {
|
||||
const runtime = parseObject(input.config.workspaceRuntime);
|
||||
const rawServices = Array.isArray(runtime.services)
|
||||
? runtime.services.filter((entry): entry is Record<string, unknown> => typeof entry === "object" && entry !== null)
|
||||
: [];
|
||||
const acquiredServiceIds: string[] = [];
|
||||
const refs: RuntimeServiceRef[] = [];
|
||||
runtimeServiceLeasesByRun.set(input.runId, acquiredServiceIds);
|
||||
|
||||
try {
|
||||
for (const service of rawServices) {
|
||||
const lifecycle = asString(service.lifecycle, "shared") === "ephemeral" ? "ephemeral" : "shared";
|
||||
const { scopeType, scopeId } = resolveServiceScopeId({
|
||||
service,
|
||||
workspace: input.workspace,
|
||||
issue: input.issue,
|
||||
runId: input.runId,
|
||||
agent: input.agent,
|
||||
});
|
||||
const envConfig = parseObject(service.env);
|
||||
const envFingerprint = createHash("sha256").update(stableStringify(envConfig)).digest("hex");
|
||||
const serviceName = asString(service.name, "service");
|
||||
const reuseKey =
|
||||
lifecycle === "shared"
|
||||
? [scopeType, scopeId ?? "", serviceName, envFingerprint].join(":")
|
||||
: null;
|
||||
|
||||
if (reuseKey) {
|
||||
const existingId = runtimeServicesByReuseKey.get(reuseKey);
|
||||
const existing = existingId ? runtimeServicesById.get(existingId) : null;
|
||||
if (existing && existing.status === "running") {
|
||||
existing.leaseRunIds.add(input.runId);
|
||||
existing.lastUsedAt = new Date().toISOString();
|
||||
existing.stoppedAt = null;
|
||||
clearIdleTimer(existing);
|
||||
await persistRuntimeServiceRecord(input.db, existing);
|
||||
acquiredServiceIds.push(existing.id);
|
||||
refs.push(toRuntimeServiceRef(existing, { reused: true }));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const record = await startLocalRuntimeService({
|
||||
db: input.db,
|
||||
runId: input.runId,
|
||||
agent: input.agent,
|
||||
issue: input.issue,
|
||||
workspace: input.workspace,
|
||||
adapterEnv: input.adapterEnv,
|
||||
service,
|
||||
onLog: input.onLog,
|
||||
reuseKey,
|
||||
scopeType,
|
||||
scopeId,
|
||||
});
|
||||
registerRuntimeService(input.db, record);
|
||||
await persistRuntimeServiceRecord(input.db, record);
|
||||
acquiredServiceIds.push(record.id);
|
||||
refs.push(toRuntimeServiceRef(record));
|
||||
}
|
||||
} catch (err) {
|
||||
await releaseRuntimeServicesForRun(input.runId);
|
||||
throw err;
|
||||
}
|
||||
|
||||
return refs;
|
||||
}
|
||||
|
||||
export async function releaseRuntimeServicesForRun(runId: string) {
|
||||
const acquired = runtimeServiceLeasesByRun.get(runId) ?? [];
|
||||
runtimeServiceLeasesByRun.delete(runId);
|
||||
for (const serviceId of acquired) {
|
||||
const record = runtimeServicesById.get(serviceId);
|
||||
if (!record) continue;
|
||||
record.leaseRunIds.delete(runId);
|
||||
record.lastUsedAt = new Date().toISOString();
|
||||
const stopType = asString(record.stopPolicy?.type, record.lifecycle === "ephemeral" ? "on_run_finish" : "manual");
|
||||
await persistRuntimeServiceRecord(record.db, record);
|
||||
if (record.leaseRunIds.size === 0) {
|
||||
if (record.lifecycle === "ephemeral" || stopType === "on_run_finish") {
|
||||
await stopRuntimeService(serviceId);
|
||||
continue;
|
||||
}
|
||||
scheduleIdleStop(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function listWorkspaceRuntimeServicesForProjectWorkspaces(
|
||||
db: Db,
|
||||
companyId: string,
|
||||
projectWorkspaceIds: string[],
|
||||
) {
|
||||
if (projectWorkspaceIds.length === 0) return new Map<string, typeof workspaceRuntimeServices.$inferSelect[]>();
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceRuntimeServices.companyId, companyId),
|
||||
inArray(workspaceRuntimeServices.projectWorkspaceId, projectWorkspaceIds),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(workspaceRuntimeServices.updatedAt), desc(workspaceRuntimeServices.createdAt));
|
||||
|
||||
const grouped = new Map<string, typeof workspaceRuntimeServices.$inferSelect[]>();
|
||||
for (const row of rows) {
|
||||
if (!row.projectWorkspaceId) continue;
|
||||
const existing = grouped.get(row.projectWorkspaceId);
|
||||
if (existing) existing.push(row);
|
||||
else grouped.set(row.projectWorkspaceId, [row]);
|
||||
}
|
||||
return grouped;
|
||||
}
|
||||
|
||||
export async function reconcilePersistedRuntimeServicesOnStartup(db: Db) {
|
||||
const staleRows = await db
|
||||
.select({ id: workspaceRuntimeServices.id })
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceRuntimeServices.provider, "local_process"),
|
||||
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
|
||||
),
|
||||
);
|
||||
|
||||
if (staleRows.length === 0) return { reconciled: 0 };
|
||||
|
||||
const now = new Date();
|
||||
await db
|
||||
.update(workspaceRuntimeServices)
|
||||
.set({
|
||||
status: "stopped",
|
||||
healthStatus: "unknown",
|
||||
stoppedAt: now,
|
||||
lastUsedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceRuntimeServices.provider, "local_process"),
|
||||
inArray(workspaceRuntimeServices.status, ["starting", "running"]),
|
||||
),
|
||||
);
|
||||
|
||||
return { reconciled: staleRows.length };
|
||||
}
|
||||
|
||||
export async function persistAdapterManagedRuntimeServices(input: {
|
||||
db: Db;
|
||||
adapterType: string;
|
||||
runId: string;
|
||||
agent: ExecutionWorkspaceAgentRef;
|
||||
issue: ExecutionWorkspaceIssueRef | null;
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
reports: AdapterRuntimeServiceReport[];
|
||||
}) {
|
||||
const refs = normalizeAdapterManagedRuntimeServices(input);
|
||||
if (refs.length === 0) return refs;
|
||||
|
||||
const existingRows = await input.db
|
||||
.select()
|
||||
.from(workspaceRuntimeServices)
|
||||
.where(inArray(workspaceRuntimeServices.id, refs.map((ref) => ref.id)));
|
||||
const existingById = new Map(existingRows.map((row) => [row.id, row]));
|
||||
|
||||
for (const ref of refs) {
|
||||
const existing = existingById.get(ref.id);
|
||||
const startedAt = existing?.startedAt ?? new Date(ref.startedAt);
|
||||
const createdAt = existing?.createdAt ?? new Date();
|
||||
await input.db
|
||||
.insert(workspaceRuntimeServices)
|
||||
.values({
|
||||
id: ref.id,
|
||||
companyId: ref.companyId,
|
||||
projectId: ref.projectId,
|
||||
projectWorkspaceId: ref.projectWorkspaceId,
|
||||
issueId: ref.issueId,
|
||||
scopeType: ref.scopeType,
|
||||
scopeId: ref.scopeId,
|
||||
serviceName: ref.serviceName,
|
||||
status: ref.status,
|
||||
lifecycle: ref.lifecycle,
|
||||
reuseKey: ref.reuseKey,
|
||||
command: ref.command,
|
||||
cwd: ref.cwd,
|
||||
port: ref.port,
|
||||
url: ref.url,
|
||||
provider: ref.provider,
|
||||
providerRef: ref.providerRef,
|
||||
ownerAgentId: ref.ownerAgentId,
|
||||
startedByRunId: ref.startedByRunId,
|
||||
lastUsedAt: new Date(ref.lastUsedAt),
|
||||
startedAt,
|
||||
stoppedAt: ref.stoppedAt ? new Date(ref.stoppedAt) : null,
|
||||
stopPolicy: ref.stopPolicy,
|
||||
healthStatus: ref.healthStatus,
|
||||
createdAt,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: workspaceRuntimeServices.id,
|
||||
set: {
|
||||
projectId: ref.projectId,
|
||||
projectWorkspaceId: ref.projectWorkspaceId,
|
||||
issueId: ref.issueId,
|
||||
scopeType: ref.scopeType,
|
||||
scopeId: ref.scopeId,
|
||||
serviceName: ref.serviceName,
|
||||
status: ref.status,
|
||||
lifecycle: ref.lifecycle,
|
||||
reuseKey: ref.reuseKey,
|
||||
command: ref.command,
|
||||
cwd: ref.cwd,
|
||||
port: ref.port,
|
||||
url: ref.url,
|
||||
provider: ref.provider,
|
||||
providerRef: ref.providerRef,
|
||||
ownerAgentId: ref.ownerAgentId,
|
||||
startedByRunId: ref.startedByRunId,
|
||||
lastUsedAt: new Date(ref.lastUsedAt),
|
||||
startedAt,
|
||||
stoppedAt: ref.stoppedAt ? new Date(ref.stoppedAt) : null,
|
||||
stopPolicy: ref.stopPolicy,
|
||||
healthStatus: ref.healthStatus,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return refs;
|
||||
}
|
||||
|
||||
export function buildWorkspaceReadyComment(input: {
|
||||
workspace: RealizedExecutionWorkspace;
|
||||
runtimeServices: RuntimeServiceRef[];
|
||||
}) {
|
||||
const lines = ["## Workspace Ready", ""];
|
||||
lines.push(`- Strategy: \`${input.workspace.strategy}\``);
|
||||
if (input.workspace.branchName) lines.push(`- Branch: \`${input.workspace.branchName}\``);
|
||||
lines.push(`- CWD: \`${input.workspace.cwd}\``);
|
||||
if (input.workspace.worktreePath && input.workspace.worktreePath !== input.workspace.cwd) {
|
||||
lines.push(`- Worktree: \`${input.workspace.worktreePath}\``);
|
||||
}
|
||||
for (const service of input.runtimeServices) {
|
||||
const detail = service.url ? `${service.serviceName}: ${service.url}` : `${service.serviceName}: running`;
|
||||
const suffix = service.reused ? " (reused)" : "";
|
||||
lines.push(`- Service: ${detail}${suffix}`);
|
||||
}
|
||||
return lines.join("\n");
|
||||
}
|
||||
Reference in New Issue
Block a user