The append method created a new WriteStream for every log chunk and resolved the promise on the end callback (data flushed) rather than the close event (fd released). Over many agent runs the leaked fds corrupted the fd table, causing child_process.spawn to fail with EBADF. Replace with fs.appendFile which properly opens, writes, and closes the fd before resolving. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
156 lines
4.8 KiB
TypeScript
156 lines
4.8 KiB
TypeScript
import { createReadStream, promises as fs } from "node:fs";
|
|
import path from "node:path";
|
|
import { createHash } from "node:crypto";
|
|
import { notFound } from "../errors.js";
|
|
|
|
export type RunLogStoreType = "local_file";
|
|
|
|
export interface RunLogHandle {
|
|
store: RunLogStoreType;
|
|
logRef: string;
|
|
}
|
|
|
|
export interface RunLogReadOptions {
|
|
offset?: number;
|
|
limitBytes?: number;
|
|
}
|
|
|
|
export interface RunLogReadResult {
|
|
content: string;
|
|
nextOffset?: number;
|
|
}
|
|
|
|
export interface RunLogFinalizeSummary {
|
|
bytes: number;
|
|
sha256?: string;
|
|
compressed: boolean;
|
|
}
|
|
|
|
export interface RunLogStore {
|
|
begin(input: { companyId: string; agentId: string; runId: string }): Promise<RunLogHandle>;
|
|
append(
|
|
handle: RunLogHandle,
|
|
event: { stream: "stdout" | "stderr" | "system"; chunk: string; ts: string },
|
|
): Promise<void>;
|
|
finalize(handle: RunLogHandle): Promise<RunLogFinalizeSummary>;
|
|
read(handle: RunLogHandle, opts?: RunLogReadOptions): Promise<RunLogReadResult>;
|
|
}
|
|
|
|
function safeSegments(...segments: string[]) {
|
|
return segments.map((segment) => segment.replace(/[^a-zA-Z0-9._-]/g, "_"));
|
|
}
|
|
|
|
function resolveWithin(basePath: string, relativePath: string) {
|
|
const resolved = path.resolve(basePath, relativePath);
|
|
const base = path.resolve(basePath) + path.sep;
|
|
if (!resolved.startsWith(base) && resolved !== path.resolve(basePath)) {
|
|
throw new Error("Invalid log path");
|
|
}
|
|
return resolved;
|
|
}
|
|
|
|
function createLocalFileRunLogStore(basePath: string): RunLogStore {
|
|
async function ensureDir(relativeDir: string) {
|
|
const dir = resolveWithin(basePath, relativeDir);
|
|
await fs.mkdir(dir, { recursive: true });
|
|
}
|
|
|
|
async function readFileRange(filePath: string, offset: number, limitBytes: number): Promise<RunLogReadResult> {
|
|
const stat = await fs.stat(filePath).catch(() => null);
|
|
if (!stat) throw notFound("Run log not found");
|
|
|
|
const start = Math.max(0, Math.min(offset, stat.size));
|
|
const end = Math.max(start, Math.min(start + limitBytes - 1, stat.size - 1));
|
|
|
|
if (start > end) {
|
|
return { content: "", nextOffset: start };
|
|
}
|
|
|
|
const chunks: Buffer[] = [];
|
|
await new Promise<void>((resolve, reject) => {
|
|
const stream = createReadStream(filePath, { start, end });
|
|
stream.on("data", (chunk) => {
|
|
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
|
});
|
|
stream.on("error", reject);
|
|
stream.on("end", () => resolve());
|
|
});
|
|
|
|
const content = Buffer.concat(chunks).toString("utf8");
|
|
const nextOffset = end + 1 < stat.size ? end + 1 : undefined;
|
|
return { content, nextOffset };
|
|
}
|
|
|
|
async function sha256File(filePath: string): Promise<string> {
|
|
return new Promise<string>((resolve, reject) => {
|
|
const hash = createHash("sha256");
|
|
const stream = createReadStream(filePath);
|
|
stream.on("data", (chunk) => hash.update(chunk));
|
|
stream.on("error", reject);
|
|
stream.on("end", () => resolve(hash.digest("hex")));
|
|
});
|
|
}
|
|
|
|
return {
|
|
async begin(input) {
|
|
const [companyId, agentId] = safeSegments(input.companyId, input.agentId);
|
|
const runId = safeSegments(input.runId)[0]!;
|
|
const relDir = path.join(companyId, agentId);
|
|
const relPath = path.join(relDir, `${runId}.ndjson`);
|
|
await ensureDir(relDir);
|
|
|
|
const absPath = resolveWithin(basePath, relPath);
|
|
await fs.writeFile(absPath, "", "utf8");
|
|
|
|
return { store: "local_file", logRef: relPath };
|
|
},
|
|
|
|
async append(handle, event) {
|
|
if (handle.store !== "local_file") return;
|
|
const absPath = resolveWithin(basePath, handle.logRef);
|
|
const line = JSON.stringify({
|
|
ts: event.ts,
|
|
stream: event.stream,
|
|
chunk: event.chunk,
|
|
});
|
|
await fs.appendFile(absPath, `${line}\n`, "utf8");
|
|
},
|
|
|
|
async finalize(handle) {
|
|
if (handle.store !== "local_file") {
|
|
return { bytes: 0, compressed: false };
|
|
}
|
|
const absPath = resolveWithin(basePath, handle.logRef);
|
|
const stat = await fs.stat(absPath).catch(() => null);
|
|
if (!stat) throw notFound("Run log not found");
|
|
|
|
const hash = await sha256File(absPath);
|
|
return {
|
|
bytes: stat.size,
|
|
sha256: hash,
|
|
compressed: false,
|
|
};
|
|
},
|
|
|
|
async read(handle, opts) {
|
|
if (handle.store !== "local_file") {
|
|
throw notFound("Run log not found");
|
|
}
|
|
const absPath = resolveWithin(basePath, handle.logRef);
|
|
const offset = opts?.offset ?? 0;
|
|
const limitBytes = opts?.limitBytes ?? 256_000;
|
|
return readFileRange(absPath, offset, limitBytes);
|
|
},
|
|
};
|
|
}
|
|
|
|
let cachedStore: RunLogStore | null = null;
|
|
|
|
export function getRunLogStore() {
|
|
if (cachedStore) return cachedStore;
|
|
const basePath = process.env.RUN_LOG_BASE_PATH ?? path.resolve(process.cwd(), "data/run-logs");
|
|
cachedStore = createLocalFileRunLogStore(basePath);
|
|
return cachedStore;
|
|
}
|
|
|