feat(pi-local): fix bugs, add RPC mode, improve cost tracking and output handling

Major improvements to the Pi local adapter:

Bug Fixes (Greptile-identified):
- Fix string interpolation in models.ts error message (was showing literal ${detail})
- Fix tool matching in parse.ts to use toolCallId instead of toolName for correct
  multi-call handling and result assignment
- Fix dead code in execute.ts by tracking instructionsReadFailed flag

Feature Improvements:
- Switch from print mode (-p) to RPC mode (--mode rpc) to prevent agent from
  exiting prematurely and ensure proper lifecycle completion
- Add stdin command sending via JSON-RPC format for prompt delivery
- Add line buffering in execute.ts to handle partial JSON chunks correctly
- Filter RPC protocol messages (response, extension_ui_request, etc.) from transcript

Cost Tracking:
- Extract cost and usage data from turn_end assistant messages
- Support both Pi format (input/output/cacheRead/cost.total) and generic format
- Add tests for cost extraction and accumulation across multiple turns

All tests pass (12/12), typecheck clean, server builds successfully.
This commit is contained in:
Richard Anaya
2026-03-07 07:23:44 -08:00
parent 6077ae6064
commit a6b5f12daf
5 changed files with 206 additions and 20 deletions

View File

@@ -246,6 +246,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const instructionsFileDir = instructionsFilePath ? `${path.dirname(instructionsFilePath)}/` : "";
let systemPromptExtension = "";
let instructionsReadFailed = false;
if (resolvedInstructionsFilePath) {
try {
const instructionsContents = await fs.readFile(resolvedInstructionsFilePath, "utf8");
@@ -259,6 +260,7 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
`[paperclip] Loaded agent instructions file: ${resolvedInstructionsFilePath}\n`,
);
} catch (err) {
instructionsReadFailed = true;
const reason = err instanceof Error ? err.message : String(err);
await onLog(
"stderr",
@@ -294,19 +296,22 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
const commandNotes = (() => {
if (!resolvedInstructionsFilePath) return [] as string[];
if (systemPromptExtension.length > 0) {
if (instructionsReadFailed) {
return [
`Loaded agent instructions from ${resolvedInstructionsFilePath}`,
`Appended instructions + path directive to system prompt (relative references from ${instructionsFileDir}).`,
`Configured instructionsFilePath ${resolvedInstructionsFilePath}, but file could not be read; continuing without injected instructions.`,
];
}
return [
`Configured instructionsFilePath ${resolvedInstructionsFilePath}, but file could not be read; continuing without injected instructions.`,
`Loaded agent instructions from ${resolvedInstructionsFilePath}`,
`Appended instructions + path directive to system prompt (relative references from ${instructionsFileDir}).`,
];
})();
const buildArgs = (sessionFile: string): string[] => {
const args: string[] = ["-p", userPrompt];
const args: string[] = [];
// Use RPC mode for proper lifecycle management (waits for agent completion)
args.push("--mode", "rpc");
// Use --append-system-prompt to extend Pi's default system prompt
args.push("--append-system-prompt", renderedSystemPromptExtension);
@@ -315,7 +320,6 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
if (modelId) args.push("--model", modelId);
if (thinking) args.push("--thinking", thinking);
args.push("--mode", "json");
args.push("--tools", "read,bash,edit,write,grep,find,ls");
args.push("--session", sessionFile);
@@ -324,6 +328,15 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
return args;
};
const buildRpcStdin = (): string => {
// Send the prompt as an RPC command
const promptCommand = {
type: "prompt",
message: userPrompt,
};
return JSON.stringify(promptCommand) + "\n";
};
const runAttempt = async (sessionFile: string) => {
const args = buildArgs(sessionFile);
if (onMeta) {
@@ -339,13 +352,43 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
});
}
// Buffer stdout by lines to handle partial JSON chunks
let stdoutBuffer = "";
const bufferedOnLog = async (stream: "stdout" | "stderr", chunk: string) => {
if (stream === "stderr") {
// Pass stderr through immediately (not JSONL)
await onLog(stream, chunk);
return;
}
// Buffer stdout and emit only complete lines
stdoutBuffer += chunk;
const lines = stdoutBuffer.split("\n");
// Keep the last (potentially incomplete) line in the buffer
stdoutBuffer = lines.pop() || "";
// Emit complete lines
for (const line of lines) {
if (line) {
await onLog(stream, line + "\n");
}
}
};
const proc = await runChildProcess(runId, command, args, {
cwd,
env: runtimeEnv,
timeoutSec,
graceSec,
onLog,
onLog: bufferedOnLog,
stdin: buildRpcStdin(),
});
// Flush any remaining buffer content
if (stdoutBuffer) {
await onLog("stdout", stdoutBuffer);
}
return {
proc,
rawStderr: proc.stderr,

View File

@@ -128,7 +128,7 @@ export async function discoverPiModels(input: {
}
if ((result.exitCode ?? 1) !== 0) {
const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout);
throw new Error(detail ? "`pi --list-models` failed: ${detail}" : "`pi --list-models` failed.");
throw new Error(detail ? `\`pi --list-models\` failed: ${detail}` : "`pi --list-models` failed.");
}
return sortModels(dedupeModels(parseModelsOutput(result.stdout)));

View File

@@ -100,6 +100,115 @@ describe("parsePiJsonl", () => {
expect(parsed.toolCalls[0].isError).toBe(true);
expect(parsed.toolCalls[0].result).toBe("File not found");
});
it("extracts usage and cost from turn_end events", () => {
const stdout = [
JSON.stringify({
type: "turn_end",
message: {
role: "assistant",
content: "Response with usage",
usage: {
input: 100,
output: 50,
cacheRead: 20,
totalTokens: 170,
cost: {
input: 0.001,
output: 0.0015,
cacheRead: 0.0001,
cacheWrite: 0,
total: 0.0026,
},
},
},
toolResults: [],
}),
].join("\n");
const parsed = parsePiJsonl(stdout);
expect(parsed.usage.inputTokens).toBe(100);
expect(parsed.usage.outputTokens).toBe(50);
expect(parsed.usage.cachedInputTokens).toBe(20);
expect(parsed.usage.costUsd).toBeCloseTo(0.0026, 4);
});
it("accumulates usage from multiple turns", () => {
const stdout = [
JSON.stringify({
type: "turn_end",
message: {
role: "assistant",
content: "First response",
usage: {
input: 50,
output: 25,
cacheRead: 0,
cost: { total: 0.001 },
},
},
}),
JSON.stringify({
type: "turn_end",
message: {
role: "assistant",
content: "Second response",
usage: {
input: 30,
output: 20,
cacheRead: 10,
cost: { total: 0.0015 },
},
},
}),
].join("\n");
const parsed = parsePiJsonl(stdout);
expect(parsed.usage.inputTokens).toBe(80);
expect(parsed.usage.outputTokens).toBe(45);
expect(parsed.usage.cachedInputTokens).toBe(10);
expect(parsed.usage.costUsd).toBeCloseTo(0.0025, 4);
});
it("handles standalone usage events with Pi format", () => {
const stdout = [
JSON.stringify({
type: "usage",
usage: {
input: 200,
output: 100,
cacheRead: 50,
cost: { total: 0.005 },
},
}),
].join("\n");
const parsed = parsePiJsonl(stdout);
expect(parsed.usage.inputTokens).toBe(200);
expect(parsed.usage.outputTokens).toBe(100);
expect(parsed.usage.cachedInputTokens).toBe(50);
expect(parsed.usage.costUsd).toBe(0.005);
});
it("handles standalone usage events with generic format", () => {
const stdout = [
JSON.stringify({
type: "usage",
usage: {
inputTokens: 150,
outputTokens: 75,
cachedInputTokens: 25,
costUsd: 0.003,
},
}),
].join("\n");
const parsed = parsePiJsonl(stdout);
expect(parsed.usage.inputTokens).toBe(150);
expect(parsed.usage.outputTokens).toBe(75);
expect(parsed.usage.cachedInputTokens).toBe(25);
expect(parsed.usage.costUsd).toBe(0.003);
});
});
describe("isPiUnknownSessionError", () => {

View File

@@ -11,7 +11,7 @@ interface ParsedPiOutput {
costUsd: number;
};
finalMessage: string | null;
toolCalls: Array<{ toolName: string; args: unknown; result: string | null; isError: boolean }>;
toolCalls: Array<{ toolCallId: string; toolName: string; args: unknown; result: string | null; isError: boolean }>;
}
function asRecord(value: unknown): Record<string, unknown> | null {
@@ -43,7 +43,7 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
toolCalls: [],
};
let currentToolCall: { toolName: string; args: unknown } | null = null;
let currentToolCall: { toolCallId: string; toolName: string; args: unknown } | null = null;
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
@@ -54,6 +54,11 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
const eventType = asString(event.type, "");
// RPC protocol messages - skip these (internal implementation detail)
if (eventType === "response" || eventType === "extension_ui_request" || eventType === "extension_ui_response" || eventType === "extension_error") {
continue;
}
// Agent lifecycle
if (eventType === "agent_start") {
continue;
@@ -85,6 +90,20 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
result.finalMessage = text;
result.messages.push(text);
}
// Extract usage and cost from assistant message
const usage = asRecord(message.usage);
if (usage) {
result.usage.inputTokens += asNumber(usage.input, 0);
result.usage.outputTokens += asNumber(usage.output, 0);
result.usage.cachedInputTokens += asNumber(usage.cacheRead, 0);
// Pi stores cost in usage.cost.total (and broken down in usage.cost.input, etc.)
const cost = asRecord(usage.cost);
if (cost) {
result.usage.costUsd += asNumber(cost.total, 0);
}
}
}
// Tool results are in toolResults array
@@ -95,8 +114,8 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
const content = tr.content;
const isError = tr.isError === true;
// Find matching tool call
const existingCall = result.toolCalls.find((tc) => tc.toolName === toolCallId);
// Find matching tool call by toolCallId
const existingCall = result.toolCalls.find((tc) => tc.toolCallId === toolCallId);
if (existingCall) {
existingCall.result = typeof content === "string" ? content : JSON.stringify(content);
existingCall.isError = isError;
@@ -128,10 +147,12 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
// Tool execution
if (eventType === "tool_execution_start") {
const toolCallId = asString(event.toolCallId, "");
const toolName = asString(event.toolName, "");
const args = event.args;
currentToolCall = { toolName, args };
currentToolCall = { toolCallId, toolName, args };
result.toolCalls.push({
toolCallId,
toolName,
args,
result: null,
@@ -146,8 +167,8 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
const toolResult = event.result;
const isError = event.isError === true;
// Find the tool call
const existingCall = result.toolCalls.find((tc) => tc.toolName === toolName);
// Find the tool call by toolCallId (not toolName, to handle multiple calls to same tool)
const existingCall = result.toolCalls.find((tc) => tc.toolCallId === toolCallId);
if (existingCall) {
existingCall.result = typeof toolResult === "string" ? toolResult : JSON.stringify(toolResult);
existingCall.isError = isError;
@@ -156,14 +177,22 @@ export function parsePiJsonl(stdout: string): ParsedPiOutput {
continue;
}
// Usage tracking if available in the event
// Usage tracking if available in the event (fallback for standalone usage events)
if (eventType === "usage" || event.usage) {
const usage = asRecord(event.usage);
if (usage) {
result.usage.inputTokens += asNumber(usage.inputTokens, 0);
result.usage.outputTokens += asNumber(usage.outputTokens, 0);
result.usage.cachedInputTokens += asNumber(usage.cachedInputTokens, 0);
result.usage.costUsd += asNumber(usage.costUsd, 0);
// Support both Pi format (input/output/cacheRead) and generic format (inputTokens/outputTokens/cachedInputTokens)
result.usage.inputTokens += asNumber(usage.inputTokens ?? usage.input, 0);
result.usage.outputTokens += asNumber(usage.outputTokens ?? usage.output, 0);
result.usage.cachedInputTokens += asNumber(usage.cachedInputTokens ?? usage.cacheRead, 0);
// Cost may be in usage.costUsd (direct) or usage.cost.total (Pi format)
const cost = asRecord(usage.cost);
if (cost) {
result.usage.costUsd += asNumber(cost.total ?? usage.costUsd, 0);
} else {
result.usage.costUsd += asNumber(usage.costUsd, 0);
}
}
}
}

View File

@@ -34,6 +34,11 @@ export function parsePiStdoutLine(line: string, ts: string): TranscriptEntry[] {
const type = asString(parsed.type);
// RPC protocol messages - filter these out (internal implementation detail)
if (type === "response" || type === "extension_ui_request" || type === "extension_ui_response" || type === "extension_error") {
return [];
}
// Agent lifecycle
if (type === "agent_start") {
return [{ kind: "system", ts, text: "Pi agent started" }];