From a6b5f12daf8f92ae7cdc996b234226834a7b5655 Mon Sep 17 00:00:00 2001 From: Richard Anaya Date: Sat, 7 Mar 2026 07:23:44 -0800 Subject: [PATCH] feat(pi-local): fix bugs, add RPC mode, improve cost tracking and output handling Major improvements to the Pi local adapter: Bug Fixes (Greptile-identified): - Fix string interpolation in models.ts error message (was showing literal ${detail}) - Fix tool matching in parse.ts to use toolCallId instead of toolName for correct multi-call handling and result assignment - Fix dead code in execute.ts by tracking instructionsReadFailed flag Feature Improvements: - Switch from print mode (-p) to RPC mode (--mode rpc) to prevent agent from exiting prematurely and ensure proper lifecycle completion - Add stdin command sending via JSON-RPC format for prompt delivery - Add line buffering in execute.ts to handle partial JSON chunks correctly - Filter RPC protocol messages (response, extension_ui_request, etc.) from transcript Cost Tracking: - Extract cost and usage data from turn_end assistant messages - Support both Pi format (input/output/cacheRead/cost.total) and generic format - Add tests for cost extraction and accumulation across multiple turns All tests pass (12/12), typecheck clean, server builds successfully. --- .../adapters/pi-local/src/server/execute.ts | 57 +++++++-- .../adapters/pi-local/src/server/models.ts | 2 +- .../pi-local/src/server/parse.test.ts | 109 ++++++++++++++++++ .../adapters/pi-local/src/server/parse.ts | 53 +++++++-- .../adapters/pi-local/src/ui/parse-stdout.ts | 5 + 5 files changed, 206 insertions(+), 20 deletions(-) diff --git a/packages/adapters/pi-local/src/server/execute.ts b/packages/adapters/pi-local/src/server/execute.ts index 3ea848f5..23cad28b 100644 --- a/packages/adapters/pi-local/src/server/execute.ts +++ b/packages/adapters/pi-local/src/server/execute.ts @@ -246,6 +246,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise { if (!resolvedInstructionsFilePath) return [] as string[]; - if (systemPromptExtension.length > 0) { + if (instructionsReadFailed) { return [ - `Loaded agent instructions from ${resolvedInstructionsFilePath}`, - `Appended instructions + path directive to system prompt (relative references from ${instructionsFileDir}).`, + `Configured instructionsFilePath ${resolvedInstructionsFilePath}, but file could not be read; continuing without injected instructions.`, ]; } return [ - `Configured instructionsFilePath ${resolvedInstructionsFilePath}, but file could not be read; continuing without injected instructions.`, + `Loaded agent instructions from ${resolvedInstructionsFilePath}`, + `Appended instructions + path directive to system prompt (relative references from ${instructionsFileDir}).`, ]; })(); const buildArgs = (sessionFile: string): string[] => { - const args: string[] = ["-p", userPrompt]; + const args: string[] = []; + + // Use RPC mode for proper lifecycle management (waits for agent completion) + args.push("--mode", "rpc"); // Use --append-system-prompt to extend Pi's default system prompt args.push("--append-system-prompt", renderedSystemPromptExtension); @@ -315,7 +320,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise { + // Send the prompt as an RPC command + const promptCommand = { + type: "prompt", + message: userPrompt, + }; + return JSON.stringify(promptCommand) + "\n"; + }; + const runAttempt = async (sessionFile: string) => { const args = buildArgs(sessionFile); if (onMeta) { @@ -339,13 +352,43 @@ export async function execute(ctx: AdapterExecutionContext): Promise { + if (stream === "stderr") { + // Pass stderr through immediately (not JSONL) + await onLog(stream, chunk); + return; + } + + // Buffer stdout and emit only complete lines + stdoutBuffer += chunk; + const lines = stdoutBuffer.split("\n"); + // Keep the last (potentially incomplete) line in the buffer + stdoutBuffer = lines.pop() || ""; + + // Emit complete lines + for (const line of lines) { + if (line) { + await onLog(stream, line + "\n"); + } + } + }; + const proc = await runChildProcess(runId, command, args, { cwd, env: runtimeEnv, timeoutSec, graceSec, - onLog, + onLog: bufferedOnLog, + stdin: buildRpcStdin(), }); + + // Flush any remaining buffer content + if (stdoutBuffer) { + await onLog("stdout", stdoutBuffer); + } + return { proc, rawStderr: proc.stderr, diff --git a/packages/adapters/pi-local/src/server/models.ts b/packages/adapters/pi-local/src/server/models.ts index 4a85a457..3212312a 100644 --- a/packages/adapters/pi-local/src/server/models.ts +++ b/packages/adapters/pi-local/src/server/models.ts @@ -128,7 +128,7 @@ export async function discoverPiModels(input: { } if ((result.exitCode ?? 1) !== 0) { const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout); - throw new Error(detail ? "`pi --list-models` failed: ${detail}" : "`pi --list-models` failed."); + throw new Error(detail ? `\`pi --list-models\` failed: ${detail}` : "`pi --list-models` failed."); } return sortModels(dedupeModels(parseModelsOutput(result.stdout))); diff --git a/packages/adapters/pi-local/src/server/parse.test.ts b/packages/adapters/pi-local/src/server/parse.test.ts index d1d695b3..6a3eef4d 100644 --- a/packages/adapters/pi-local/src/server/parse.test.ts +++ b/packages/adapters/pi-local/src/server/parse.test.ts @@ -100,6 +100,115 @@ describe("parsePiJsonl", () => { expect(parsed.toolCalls[0].isError).toBe(true); expect(parsed.toolCalls[0].result).toBe("File not found"); }); + + it("extracts usage and cost from turn_end events", () => { + const stdout = [ + JSON.stringify({ + type: "turn_end", + message: { + role: "assistant", + content: "Response with usage", + usage: { + input: 100, + output: 50, + cacheRead: 20, + totalTokens: 170, + cost: { + input: 0.001, + output: 0.0015, + cacheRead: 0.0001, + cacheWrite: 0, + total: 0.0026, + }, + }, + }, + toolResults: [], + }), + ].join("\n"); + + const parsed = parsePiJsonl(stdout); + expect(parsed.usage.inputTokens).toBe(100); + expect(parsed.usage.outputTokens).toBe(50); + expect(parsed.usage.cachedInputTokens).toBe(20); + expect(parsed.usage.costUsd).toBeCloseTo(0.0026, 4); + }); + + it("accumulates usage from multiple turns", () => { + const stdout = [ + JSON.stringify({ + type: "turn_end", + message: { + role: "assistant", + content: "First response", + usage: { + input: 50, + output: 25, + cacheRead: 0, + cost: { total: 0.001 }, + }, + }, + }), + JSON.stringify({ + type: "turn_end", + message: { + role: "assistant", + content: "Second response", + usage: { + input: 30, + output: 20, + cacheRead: 10, + cost: { total: 0.0015 }, + }, + }, + }), + ].join("\n"); + + const parsed = parsePiJsonl(stdout); + expect(parsed.usage.inputTokens).toBe(80); + expect(parsed.usage.outputTokens).toBe(45); + expect(parsed.usage.cachedInputTokens).toBe(10); + expect(parsed.usage.costUsd).toBeCloseTo(0.0025, 4); + }); + + it("handles standalone usage events with Pi format", () => { + const stdout = [ + JSON.stringify({ + type: "usage", + usage: { + input: 200, + output: 100, + cacheRead: 50, + cost: { total: 0.005 }, + }, + }), + ].join("\n"); + + const parsed = parsePiJsonl(stdout); + expect(parsed.usage.inputTokens).toBe(200); + expect(parsed.usage.outputTokens).toBe(100); + expect(parsed.usage.cachedInputTokens).toBe(50); + expect(parsed.usage.costUsd).toBe(0.005); + }); + + it("handles standalone usage events with generic format", () => { + const stdout = [ + JSON.stringify({ + type: "usage", + usage: { + inputTokens: 150, + outputTokens: 75, + cachedInputTokens: 25, + costUsd: 0.003, + }, + }), + ].join("\n"); + + const parsed = parsePiJsonl(stdout); + expect(parsed.usage.inputTokens).toBe(150); + expect(parsed.usage.outputTokens).toBe(75); + expect(parsed.usage.cachedInputTokens).toBe(25); + expect(parsed.usage.costUsd).toBe(0.003); + }); }); describe("isPiUnknownSessionError", () => { diff --git a/packages/adapters/pi-local/src/server/parse.ts b/packages/adapters/pi-local/src/server/parse.ts index 095e814e..3ba50d8b 100644 --- a/packages/adapters/pi-local/src/server/parse.ts +++ b/packages/adapters/pi-local/src/server/parse.ts @@ -11,7 +11,7 @@ interface ParsedPiOutput { costUsd: number; }; finalMessage: string | null; - toolCalls: Array<{ toolName: string; args: unknown; result: string | null; isError: boolean }>; + toolCalls: Array<{ toolCallId: string; toolName: string; args: unknown; result: string | null; isError: boolean }>; } function asRecord(value: unknown): Record | null { @@ -43,7 +43,7 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { toolCalls: [], }; - let currentToolCall: { toolName: string; args: unknown } | null = null; + let currentToolCall: { toolCallId: string; toolName: string; args: unknown } | null = null; for (const rawLine of stdout.split(/\r?\n/)) { const line = rawLine.trim(); @@ -54,6 +54,11 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { const eventType = asString(event.type, ""); + // RPC protocol messages - skip these (internal implementation detail) + if (eventType === "response" || eventType === "extension_ui_request" || eventType === "extension_ui_response" || eventType === "extension_error") { + continue; + } + // Agent lifecycle if (eventType === "agent_start") { continue; @@ -85,6 +90,20 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { result.finalMessage = text; result.messages.push(text); } + + // Extract usage and cost from assistant message + const usage = asRecord(message.usage); + if (usage) { + result.usage.inputTokens += asNumber(usage.input, 0); + result.usage.outputTokens += asNumber(usage.output, 0); + result.usage.cachedInputTokens += asNumber(usage.cacheRead, 0); + + // Pi stores cost in usage.cost.total (and broken down in usage.cost.input, etc.) + const cost = asRecord(usage.cost); + if (cost) { + result.usage.costUsd += asNumber(cost.total, 0); + } + } } // Tool results are in toolResults array @@ -95,8 +114,8 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { const content = tr.content; const isError = tr.isError === true; - // Find matching tool call - const existingCall = result.toolCalls.find((tc) => tc.toolName === toolCallId); + // Find matching tool call by toolCallId + const existingCall = result.toolCalls.find((tc) => tc.toolCallId === toolCallId); if (existingCall) { existingCall.result = typeof content === "string" ? content : JSON.stringify(content); existingCall.isError = isError; @@ -128,10 +147,12 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { // Tool execution if (eventType === "tool_execution_start") { + const toolCallId = asString(event.toolCallId, ""); const toolName = asString(event.toolName, ""); const args = event.args; - currentToolCall = { toolName, args }; + currentToolCall = { toolCallId, toolName, args }; result.toolCalls.push({ + toolCallId, toolName, args, result: null, @@ -146,8 +167,8 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { const toolResult = event.result; const isError = event.isError === true; - // Find the tool call - const existingCall = result.toolCalls.find((tc) => tc.toolName === toolName); + // Find the tool call by toolCallId (not toolName, to handle multiple calls to same tool) + const existingCall = result.toolCalls.find((tc) => tc.toolCallId === toolCallId); if (existingCall) { existingCall.result = typeof toolResult === "string" ? toolResult : JSON.stringify(toolResult); existingCall.isError = isError; @@ -156,14 +177,22 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput { continue; } - // Usage tracking if available in the event + // Usage tracking if available in the event (fallback for standalone usage events) if (eventType === "usage" || event.usage) { const usage = asRecord(event.usage); if (usage) { - result.usage.inputTokens += asNumber(usage.inputTokens, 0); - result.usage.outputTokens += asNumber(usage.outputTokens, 0); - result.usage.cachedInputTokens += asNumber(usage.cachedInputTokens, 0); - result.usage.costUsd += asNumber(usage.costUsd, 0); + // Support both Pi format (input/output/cacheRead) and generic format (inputTokens/outputTokens/cachedInputTokens) + result.usage.inputTokens += asNumber(usage.inputTokens ?? usage.input, 0); + result.usage.outputTokens += asNumber(usage.outputTokens ?? usage.output, 0); + result.usage.cachedInputTokens += asNumber(usage.cachedInputTokens ?? usage.cacheRead, 0); + + // Cost may be in usage.costUsd (direct) or usage.cost.total (Pi format) + const cost = asRecord(usage.cost); + if (cost) { + result.usage.costUsd += asNumber(cost.total ?? usage.costUsd, 0); + } else { + result.usage.costUsd += asNumber(usage.costUsd, 0); + } } } } diff --git a/packages/adapters/pi-local/src/ui/parse-stdout.ts b/packages/adapters/pi-local/src/ui/parse-stdout.ts index 08e13290..b80fe5f1 100644 --- a/packages/adapters/pi-local/src/ui/parse-stdout.ts +++ b/packages/adapters/pi-local/src/ui/parse-stdout.ts @@ -34,6 +34,11 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] { const type = asString(parsed.type); + // RPC protocol messages - filter these out (internal implementation detail) + if (type === "response" || type === "extension_ui_request" || type === "extension_ui_response" || type === "extension_error") { + return []; + } + // Agent lifecycle if (type === "agent_start") { return [{ kind: "system", ts, text: "Pi agent started" }];