diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index 9d4f8782..c2ff7e89 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -1,4 +1,6 @@ import { spawn, type ChildProcess } from "node:child_process"; +import { constants as fsConstants, promises as fs } from "node:fs"; +import path from "node:path"; import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; import { @@ -47,6 +49,16 @@ interface AdapterExecutionResult { summary?: string | null; } +interface AdapterInvocationMeta { + adapterType: string; + command: string; + cwd?: string; + commandArgs?: string[]; + env?: Record; + prompt?: string; + context?: Record; +} + interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; @@ -62,6 +74,7 @@ const runningProcesses = new Map(); const MAX_CAPTURE_BYTES = 4 * 1024 * 1024; const MAX_EXCERPT_BYTES = 32 * 1024; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; +const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i; function parseObject(value: unknown): Record { if (typeof value !== "object" || value === null || Array.isArray(value)) { @@ -175,6 +188,85 @@ function parseCodexJsonl(stdout: string) { }; } +function parseClaudeStreamJson(stdout: string) { + let sessionId: string | null = null; + let model = ""; + let finalResult: Record | null = null; + const assistantTexts: string[] = []; + + for (const rawLine of stdout.split(/\r?\n/)) { + const line = rawLine.trim(); + if (!line) continue; + const event = parseJson(line); + if (!event) continue; + + const type = asString(event.type, ""); + if (type === "system" && asString(event.subtype, "") === "init") { + sessionId = asString(event.session_id, sessionId ?? "") || sessionId; + model = asString(event.model, model); + continue; + } + + if (type === "assistant") { + sessionId = asString(event.session_id, sessionId ?? "") || sessionId; + const message = parseObject(event.message); + const content = Array.isArray(message.content) ? message.content : []; + for (const entry of content) { + if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue; + const block = entry as Record; + if (asString(block.type, "") === "text") { + const text = asString(block.text, ""); + if (text) assistantTexts.push(text); + } + } + continue; + } + + if (type === "result") { + finalResult = event; + sessionId = asString(event.session_id, sessionId ?? "") || sessionId; + } + } + + if (!finalResult) { + return { + sessionId, + model, + costUsd: null as number | null, + usage: null as UsageSummary | null, + summary: assistantTexts.join("\n\n").trim(), + resultJson: null as Record | null, + }; + } + + const usageObj = parseObject(finalResult.usage); + const usage: UsageSummary = { + inputTokens: asNumber(usageObj.input_tokens, 0), + cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0), + outputTokens: asNumber(usageObj.output_tokens, 0), + }; + const costRaw = finalResult.total_cost_usd; + const costUsd = typeof costRaw === "number" && Number.isFinite(costRaw) ? costRaw : null; + const summary = asString(finalResult.result, assistantTexts.join("\n\n")).trim(); + + return { + sessionId, + model, + costUsd, + usage, + summary, + resultJson: finalResult, + }; +} + +function redactEnvForLogs(env: Record): Record { + const redacted: Record = {}; + for (const [key, value] of Object.entries(env)) { + redacted[key] = SENSITIVE_ENV_KEY.test(key) ? "***REDACTED***" : value; + } + return redacted; +} + async function runChildProcess( runId: string, command: string, @@ -188,9 +280,10 @@ async function runChildProcess( }, ): Promise { return new Promise((resolve, reject) => { + const mergedEnv = ensurePathInEnv({ ...process.env, ...opts.env }); const child = spawn(command, args, { cwd: opts.cwd, - env: { ...process.env, ...opts.env }, + env: mergedEnv, shell: false, stdio: ["ignore", "pipe", "pipe"], }); @@ -231,7 +324,13 @@ async function runChildProcess( child.on("error", (err) => { clearTimeout(timeout); runningProcesses.delete(runId); - reject(err); + const errno = (err as NodeJS.ErrnoException).code; + const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? ""; + const msg = + errno === "ENOENT" + ? `Failed to start command "${command}" in "${opts.cwd}". Verify adapter command, working directory, and PATH (${pathValue}).` + : `Failed to start command "${command}" in "${opts.cwd}": ${err.message}`; + reject(new Error(msg)); }); child.on("close", (code, signal) => { @@ -260,6 +359,70 @@ function buildPaperclipEnv(agent: { id: string; companyId: string }): Record 0) return env; + if (typeof env.Path === "string" && env.Path.length > 0) return env; + return { ...env, PATH: defaultPathForPlatform() }; +} + +async function ensureAbsoluteDirectory(cwd: string) { + if (!path.isAbsolute(cwd)) { + throw new Error(`Working directory must be an absolute path: "${cwd}"`); + } + + let stats; + try { + stats = await fs.stat(cwd); + } catch { + throw new Error(`Working directory does not exist: "${cwd}"`); + } + + if (!stats.isDirectory()) { + throw new Error(`Working directory is not a directory: "${cwd}"`); + } +} + +async function ensureCommandResolvable(command: string, cwd: string, env: NodeJS.ProcessEnv) { + const hasPathSeparator = command.includes("/") || command.includes("\\"); + if (hasPathSeparator) { + const absolute = path.isAbsolute(command) ? command : path.resolve(cwd, command); + try { + await fs.access(absolute, fsConstants.X_OK); + } catch { + throw new Error(`Command is not executable: "${command}" (resolved: "${absolute}")`); + } + return; + } + + const pathValue = env.PATH ?? env.Path ?? ""; + const delimiter = process.platform === "win32" ? ";" : ":"; + const dirs = pathValue.split(delimiter).filter(Boolean); + const windowsExt = process.platform === "win32" + ? (env.PATHEXT ?? ".EXE;.CMD;.BAT;.COM").split(";") + : [""]; + + for (const dir of dirs) { + for (const ext of windowsExt) { + const candidate = path.join(dir, process.platform === "win32" ? `${command}${ext}` : command); + try { + await fs.access(candidate, fsConstants.X_OK); + return; + } catch { + // continue scanning PATH + } + } + } + + throw new Error(`Command not found in PATH: "${command}"`); +} + export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); @@ -543,6 +706,7 @@ export function heartbeatService(db: Db) { agent: typeof agents.$inferSelect, config: Record, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + onMeta?: (meta: AdapterInvocationMeta) => Promise, ): Promise { const command = asString(config.command, ""); if (!command) throw new Error("Process adapter missing command"); @@ -558,6 +722,16 @@ export function heartbeatService(db: Db) { const timeoutSec = asNumber(config.timeoutSec, 900); const graceSec = asNumber(config.graceSec, 15); + if (onMeta) { + await onMeta({ + adapterType: "process", + command, + cwd, + commandArgs: args, + env: redactEnvForLogs(env), + }); + } + const proc = await runChildProcess(runId, command, args, { cwd, env, @@ -606,26 +780,35 @@ export function heartbeatService(db: Db) { config: Record, context: Record, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + onMeta?: (meta: AdapterInvocationMeta) => Promise, ): Promise { const promptTemplate = asString( config.promptTemplate, "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", ); const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); + const command = asString(config.command, "claude"); const model = asString(config.model, ""); const maxTurns = asNumber(config.maxTurnsPerRun, 0); const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false); const cwd = asString(config.cwd, process.cwd()); + await ensureAbsoluteDirectory(cwd); const envConfig = parseObject(config.env); const env: Record = { ...buildPaperclipEnv(agent) }; for (const [k, v] of Object.entries(envConfig)) { if (typeof v === "string") env[k] = v; } + const runtimeEnv = ensurePathInEnv({ ...process.env, ...env }); + await ensureCommandResolvable(command, cwd, runtimeEnv); const timeoutSec = asNumber(config.timeoutSec, 1800); const graceSec = asNumber(config.graceSec, 20); - const extraArgs = asStringArray(config.extraArgs); + const extraArgs = (() => { + const fromExtraArgs = asStringArray(config.extraArgs); + if (fromExtraArgs.length > 0) return fromExtraArgs; + return asStringArray(config.args); + })(); const sessionId = runtime.sessionId; const template = sessionId ? promptTemplate : bootstrapTemplate; @@ -636,14 +819,26 @@ export function heartbeatService(db: Db) { context, }); - const args = ["--print", prompt, "--output-format", "json"]; + const args = ["--print", prompt, "--output-format", "stream-json", "--verbose"]; if (sessionId) args.push("--resume", sessionId); if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions"); if (model) args.push("--model", model); if (maxTurns > 0) args.push("--max-turns", String(maxTurns)); if (extraArgs.length > 0) args.push(...extraArgs); - const proc = await runChildProcess(runId, "claude", args, { + if (onMeta) { + await onMeta({ + adapterType: "claude_local", + command, + cwd, + commandArgs: args.map((value, idx) => (idx === 1 ? `` : value)), + env: redactEnvForLogs(env), + prompt, + context, + }); + } + + const proc = await runChildProcess(runId, command, args, { cwd, env, timeoutSec, @@ -660,7 +855,8 @@ export function heartbeatService(db: Db) { }; } - const parsed = parseJson(proc.stdout); + const parsedStream = parseClaudeStreamJson(proc.stdout); + const parsed = parsedStream.resultJson ?? parseJson(proc.stdout); if (!parsed) { return { exitCode: proc.exitCode, @@ -677,12 +873,16 @@ export function heartbeatService(db: Db) { }; } - const usageObj = parseObject(parsed.usage); - const usage: UsageSummary = { - inputTokens: asNumber(usageObj.input_tokens, 0), - cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0), - outputTokens: asNumber(usageObj.output_tokens, 0), - }; + const usage = + parsedStream.usage ?? + (() => { + const usageObj = parseObject(parsed.usage); + return { + inputTokens: asNumber(usageObj.input_tokens, 0), + cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0), + outputTokens: asNumber(usageObj.output_tokens, 0), + }; + })(); return { exitCode: proc.exitCode, @@ -690,12 +890,14 @@ export function heartbeatService(db: Db) { timedOut: false, errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Claude exited with code ${proc.exitCode ?? -1}`, usage, - sessionId: asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId, + sessionId: + parsedStream.sessionId ?? + (asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId), provider: "anthropic", - model: asString(parsed.model, model), - costUsd: asNumber(parsed.total_cost_usd, 0), + model: parsedStream.model || asString(parsed.model, model), + costUsd: parsedStream.costUsd ?? asNumber(parsed.total_cost_usd, 0), resultJson: parsed, - summary: asString(parsed.result, ""), + summary: parsedStream.summary || asString(parsed.result, ""), }; } @@ -706,26 +908,35 @@ export function heartbeatService(db: Db) { config: Record, context: Record, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise, + onMeta?: (meta: AdapterInvocationMeta) => Promise, ): Promise { const promptTemplate = asString( config.promptTemplate, "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", ); const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); + const command = asString(config.command, "codex"); const model = asString(config.model, ""); const search = asBoolean(config.search, false); const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false); const cwd = asString(config.cwd, process.cwd()); + await ensureAbsoluteDirectory(cwd); const envConfig = parseObject(config.env); const env: Record = { ...buildPaperclipEnv(agent) }; for (const [k, v] of Object.entries(envConfig)) { if (typeof v === "string") env[k] = v; } + const runtimeEnv = ensurePathInEnv({ ...process.env, ...env }); + await ensureCommandResolvable(command, cwd, runtimeEnv); const timeoutSec = asNumber(config.timeoutSec, 1800); const graceSec = asNumber(config.graceSec, 20); - const extraArgs = asStringArray(config.extraArgs); + const extraArgs = (() => { + const fromExtraArgs = asStringArray(config.extraArgs); + if (fromExtraArgs.length > 0) return fromExtraArgs; + return asStringArray(config.args); + })(); const sessionId = runtime.sessionId; const template = sessionId ? promptTemplate : bootstrapTemplate; @@ -744,7 +955,23 @@ export function heartbeatService(db: Db) { if (sessionId) args.push("resume", sessionId, prompt); else args.push(prompt); - const proc = await runChildProcess(runId, "codex", args, { + if (onMeta) { + await onMeta({ + adapterType: "codex_local", + command, + cwd, + commandArgs: args.map((value, idx) => { + if (!sessionId && idx === args.length - 1) return ``; + if (sessionId && idx === args.length - 1) return ``; + return value; + }), + env: redactEnvForLogs(env), + prompt, + context, + }); + } + + const proc = await runChildProcess(runId, command, args, { cwd, env, timeoutSec, @@ -888,16 +1115,25 @@ export function heartbeatService(db: Db) { const config = parseObject(agent.adapterConfig); const context = (run.contextSnapshot ?? {}) as Record; + const onAdapterMeta = async (meta: AdapterInvocationMeta) => { + await appendRunEvent(currentRun, seq++, { + eventType: "adapter.invoke", + stream: "system", + level: "info", + message: "adapter invocation", + payload: meta as Record, + }); + }; let adapterResult: AdapterExecutionResult; if (agent.adapterType === "http") { adapterResult = await executeHttpRun(run.id, agent.id, config, context); } else if (agent.adapterType === "claude_local") { - adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog); + adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta); } else if (agent.adapterType === "codex_local") { - adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog); + adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta); } else { - adapterResult = await executeProcessRun(run.id, agent, config, onLog); + adapterResult = await executeProcessRun(run.id, agent, config, onLog, onAdapterMeta); } let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; @@ -926,6 +1162,14 @@ export function heartbeatService(db: Db) { ? "timed_out" : "failed"; + const usageJson = + adapterResult.usage || adapterResult.costUsd != null + ? ({ + ...(adapterResult.usage ?? {}), + ...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}), + } as Record) + : null; + await setRunStatus(run.id, status, { finishedAt: new Date(), error: @@ -942,7 +1186,7 @@ export function heartbeatService(db: Db) { : null, exitCode: adapterResult.exitCode, signal: adapterResult.signal, - usageJson: (adapterResult.usage ?? null) as Record | null, + usageJson, resultJson: adapterResult.resultJson ?? null, sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId, stdoutExcerpt,