Implement agent runtime services and WebSocket realtime

Expand heartbeat service with full run executor, wakeup coordinator,
and adapter lifecycle. Add run-log-store for pluggable log persistence.
Add live-events service and WebSocket handler for realtime updates.
Expand agent and issue routes with runtime operations. Add ws dependency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Forgotten
2026-02-17 12:24:43 -06:00
parent 2583bf4c43
commit c9c75bbc0a
11 changed files with 1746 additions and 156 deletions

View File

@@ -16,6 +16,7 @@
"express": "^5.1.0",
"pino": "^9.6.0",
"pino-http": "^10.4.0",
"ws": "^8.19.0",
"zod": "^3.24.2"
},
"devDependencies": {

View File

@@ -2,6 +2,8 @@ export interface Config {
port: number;
databaseUrl: string | undefined;
serveUi: boolean;
heartbeatSchedulerEnabled: boolean;
heartbeatSchedulerIntervalMs: number;
}
export function loadConfig(): Config {
@@ -9,5 +11,7 @@ export function loadConfig(): Config {
port: Number(process.env.PORT) || 3100,
databaseUrl: process.env.DATABASE_URL,
serveUi: process.env.SERVE_UI === "true",
heartbeatSchedulerEnabled: process.env.HEARTBEAT_SCHEDULER_ENABLED !== "false",
heartbeatSchedulerIntervalMs: Math.max(10000, Number(process.env.HEARTBEAT_SCHEDULER_INTERVAL_MS) || 30000),
};
}

View File

@@ -1,7 +1,11 @@
import { createServer } from "node:http";
import { resolve } from "node:path";
import { createDb, createPgliteDb } from "@paperclip/db";
import { createApp } from "./app.js";
import { loadConfig } from "./config.js";
import { logger } from "./middleware/logger.js";
import { setupLiveEventsWebSocketServer } from "./realtime/live-events-ws.js";
import { heartbeatService } from "./services/index.js";
const config = loadConfig();
@@ -9,13 +13,33 @@ let db;
if (config.databaseUrl) {
db = createDb(config.databaseUrl);
} else {
logger.info("No DATABASE_URL set — using embedded PGlite (./data/pglite)");
db = await createPgliteDb("./data/pglite");
const dataDir = resolve("./data/pglite");
logger.info(`No DATABASE_URL set — using embedded PGlite (${dataDir})`);
db = await createPgliteDb(dataDir);
logger.info("PGlite ready, schema pushed");
}
const app = createApp(db as any, { serveUi: config.serveUi });
const server = createServer(app);
app.listen(config.port, () => {
setupLiveEventsWebSocketServer(server, db as any);
if (config.heartbeatSchedulerEnabled) {
const heartbeat = heartbeatService(db as any);
setInterval(() => {
void heartbeat
.tickTimers(new Date())
.then((result) => {
if (result.enqueued > 0) {
logger.info({ ...result }, "heartbeat timer tick enqueued runs");
}
})
.catch((err) => {
logger.error({ err }, "heartbeat timer tick failed");
});
}, config.heartbeatSchedulerIntervalMs);
}
server.listen(config.port, () => {
logger.info(`Server listening on :${config.port}`);
});

View File

@@ -0,0 +1,177 @@
import { createHash } from "node:crypto";
import type { IncomingMessage, Server as HttpServer } from "node:http";
import type { Duplex } from "node:stream";
import { and, eq, isNull } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import { agentApiKeys } from "@paperclip/db";
import { WebSocket, WebSocketServer } from "ws";
import { logger } from "../middleware/logger.js";
import { subscribeCompanyLiveEvents } from "../services/live-events.js";
interface UpgradeContext {
companyId: string;
actorType: "board" | "agent";
actorId: string;
}
interface IncomingMessageWithContext extends IncomingMessage {
paperclipUpgradeContext?: UpgradeContext;
}
function hashToken(token: string) {
return createHash("sha256").update(token).digest("hex");
}
function rejectUpgrade(socket: Duplex, statusLine: string, message: string) {
const safe = message.replace(/[\r\n]+/g, " ").trim();
socket.write(`HTTP/1.1 ${statusLine}\r\nConnection: close\r\nContent-Type: text/plain\r\n\r\n${safe}`);
socket.destroy();
}
function parseCompanyId(pathname: string) {
const match = pathname.match(/^\/api\/companies\/([^/]+)\/events\/ws$/);
if (!match) return null;
try {
return decodeURIComponent(match[1] ?? "");
} catch {
return null;
}
}
function parseBearerToken(rawAuth: string | string[] | undefined) {
const auth = Array.isArray(rawAuth) ? rawAuth[0] : rawAuth;
if (!auth) return null;
if (!auth.toLowerCase().startsWith("bearer ")) return null;
const token = auth.slice("bearer ".length).trim();
return token.length > 0 ? token : null;
}
async function authorizeUpgrade(
db: Db,
req: IncomingMessage,
companyId: string,
url: URL,
): Promise<UpgradeContext | null> {
const queryToken = url.searchParams.get("token")?.trim() ?? "";
const authToken = parseBearerToken(req.headers.authorization);
const token = authToken ?? (queryToken.length > 0 ? queryToken : null);
// Browser board context has no bearer token in V1.
if (!token) {
return {
companyId,
actorType: "board",
actorId: "board",
};
}
const tokenHash = hashToken(token);
const key = await db
.select()
.from(agentApiKeys)
.where(and(eq(agentApiKeys.keyHash, tokenHash), isNull(agentApiKeys.revokedAt)))
.then((rows) => rows[0] ?? null);
if (!key || key.companyId !== companyId) {
return null;
}
await db
.update(agentApiKeys)
.set({ lastUsedAt: new Date() })
.where(eq(agentApiKeys.id, key.id));
return {
companyId,
actorType: "agent",
actorId: key.agentId,
};
}
export function setupLiveEventsWebSocketServer(server: HttpServer, db: Db) {
const wss = new WebSocketServer({ noServer: true });
const cleanupByClient = new Map<WebSocket, () => void>();
const aliveByClient = new Map<WebSocket, boolean>();
const pingInterval = setInterval(() => {
for (const socket of wss.clients) {
if (!aliveByClient.get(socket)) {
socket.terminate();
continue;
}
aliveByClient.set(socket, false);
socket.ping();
}
}, 30000);
wss.on("connection", (socket, req) => {
const context = (req as IncomingMessageWithContext).paperclipUpgradeContext;
if (!context) {
socket.close(1008, "missing context");
return;
}
const unsubscribe = subscribeCompanyLiveEvents(context.companyId, (event) => {
if (socket.readyState !== WebSocket.OPEN) return;
socket.send(JSON.stringify(event));
});
cleanupByClient.set(socket, unsubscribe);
aliveByClient.set(socket, true);
socket.on("pong", () => {
aliveByClient.set(socket, true);
});
socket.on("close", () => {
const cleanup = cleanupByClient.get(socket);
if (cleanup) cleanup();
cleanupByClient.delete(socket);
aliveByClient.delete(socket);
});
socket.on("error", (err) => {
logger.warn({ err, companyId: context.companyId }, "live websocket client error");
});
});
wss.on("close", () => {
clearInterval(pingInterval);
});
server.on("upgrade", (req, socket, head) => {
if (!req.url) {
rejectUpgrade(socket, "400 Bad Request", "missing url");
return;
}
const url = new URL(req.url, "http://localhost");
const companyId = parseCompanyId(url.pathname);
if (!companyId) {
socket.destroy();
return;
}
void authorizeUpgrade(db, req, companyId, url)
.then((context) => {
if (!context) {
rejectUpgrade(socket, "403 Forbidden", "forbidden");
return;
}
const reqWithContext = req as IncomingMessageWithContext;
reqWithContext.paperclipUpgradeContext = context;
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, reqWithContext);
});
})
.catch((err) => {
logger.error({ err, path: req.url }, "failed websocket upgrade authorization");
rejectUpgrade(socket, "500 Internal Server Error", "upgrade failed");
});
});
return wss;
}

View File

@@ -3,6 +3,7 @@ import type { Db } from "@paperclip/db";
import {
createAgentKeySchema,
createAgentSchema,
wakeAgentSchema,
updateAgentSchema,
} from "@paperclip/shared";
import { validate } from "../middleware/validate.js";
@@ -39,6 +40,44 @@ export function agentRoutes(db: Db) {
res.json(agent);
});
router.get("/agents/:id/runtime-state", async (req, res) => {
assertBoard(req);
const id = req.params.id as string;
const agent = await svc.getById(id);
if (!agent) {
res.status(404).json({ error: "Agent not found" });
return;
}
assertCompanyAccess(req, agent.companyId);
const state = await heartbeat.getRuntimeState(id);
res.json(state);
});
router.post("/agents/:id/runtime-state/reset-session", async (req, res) => {
assertBoard(req);
const id = req.params.id as string;
const agent = await svc.getById(id);
if (!agent) {
res.status(404).json({ error: "Agent not found" });
return;
}
assertCompanyAccess(req, agent.companyId);
const state = await heartbeat.resetRuntimeSession(id);
await logActivity(db, {
companyId: agent.companyId,
actorType: "user",
actorId: req.actor.userId ?? "board",
action: "agent.runtime_session_reset",
entityType: "agent",
entityId: id,
});
res.json(state);
});
router.post("/companies/:companyId/agents", validate(createAgentSchema), async (req, res) => {
const companyId = req.params.companyId as string;
assertCompanyAccess(req, companyId);
@@ -192,6 +231,54 @@ export function agentRoutes(db: Db) {
res.status(201).json(key);
});
router.post("/agents/:id/wakeup", validate(wakeAgentSchema), async (req, res) => {
const id = req.params.id as string;
const agent = await svc.getById(id);
if (!agent) {
res.status(404).json({ error: "Agent not found" });
return;
}
assertCompanyAccess(req, agent.companyId);
if (req.actor.type === "agent" && req.actor.agentId !== id) {
res.status(403).json({ error: "Agent can only invoke itself" });
return;
}
const run = await heartbeat.wakeup(id, {
source: req.body.source,
triggerDetail: req.body.triggerDetail ?? "manual",
reason: req.body.reason ?? null,
payload: req.body.payload ?? null,
idempotencyKey: req.body.idempotencyKey ?? null,
requestedByActorType: req.actor.type === "agent" ? "agent" : "user",
requestedByActorId: req.actor.type === "agent" ? req.actor.agentId ?? null : req.actor.userId ?? null,
contextSnapshot: {
triggeredBy: req.actor.type,
actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId,
},
});
if (!run) {
res.status(202).json({ status: "skipped" });
return;
}
const actor = getActorInfo(req);
await logActivity(db, {
companyId: agent.companyId,
actorType: actor.actorType,
actorId: actor.actorId,
agentId: actor.agentId,
action: "heartbeat.invoked",
entityType: "heartbeat_run",
entityId: run.id,
details: { agentId: id },
});
res.status(202).json(run);
});
router.post("/agents/:id/heartbeat/invoke", async (req, res) => {
const id = req.params.id as string;
const agent = await svc.getById(id);
@@ -206,10 +293,24 @@ export function agentRoutes(db: Db) {
return;
}
const run = await heartbeat.invoke(id, "manual", {
triggeredBy: req.actor.type,
actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId,
});
const run = await heartbeat.invoke(
id,
"on_demand",
{
triggeredBy: req.actor.type,
actorId: req.actor.type === "agent" ? req.actor.agentId : req.actor.userId,
},
"manual",
{
actorType: req.actor.type === "agent" ? "agent" : "user",
actorId: req.actor.type === "agent" ? req.actor.agentId ?? null : req.actor.userId ?? null,
},
);
if (!run) {
res.status(202).json({ status: "skipped" });
return;
}
const actor = getActorInfo(req);
await logActivity(db, {
@@ -254,5 +355,39 @@ export function agentRoutes(db: Db) {
res.json(run);
});
router.get("/heartbeat-runs/:runId/events", async (req, res) => {
const runId = req.params.runId as string;
const run = await heartbeat.getRun(runId);
if (!run) {
res.status(404).json({ error: "Heartbeat run not found" });
return;
}
assertCompanyAccess(req, run.companyId);
const afterSeq = Number(req.query.afterSeq ?? 0);
const limit = Number(req.query.limit ?? 200);
const events = await heartbeat.listEvents(runId, Number.isFinite(afterSeq) ? afterSeq : 0, Number.isFinite(limit) ? limit : 200);
res.json(events);
});
router.get("/heartbeat-runs/:runId/log", async (req, res) => {
const runId = req.params.runId as string;
const run = await heartbeat.getRun(runId);
if (!run) {
res.status(404).json({ error: "Heartbeat run not found" });
return;
}
assertCompanyAccess(req, run.companyId);
const offset = Number(req.query.offset ?? 0);
const limitBytes = Number(req.query.limitBytes ?? 256000);
const result = await heartbeat.readLog(runId, {
offset: Number.isFinite(offset) ? offset : 0,
limitBytes: Number.isFinite(limitBytes) ? limitBytes : 256000,
});
res.json(result);
});
return router;
}

View File

@@ -7,12 +7,14 @@ import {
updateIssueSchema,
} from "@paperclip/shared";
import { validate } from "../middleware/validate.js";
import { issueService, logActivity } from "../services/index.js";
import { heartbeatService, issueService, logActivity } from "../services/index.js";
import { logger } from "../middleware/logger.js";
import { assertCompanyAccess, getActorInfo } from "./authz.js";
export function issueRoutes(db: Db) {
const router = Router();
const svc = issueService(db);
const heartbeat = heartbeatService(db);
router.get("/companies/:companyId/issues", async (req, res) => {
const companyId = req.params.companyId as string;
@@ -58,6 +60,20 @@ export function issueRoutes(db: Db) {
details: { title: issue.title },
});
if (issue.assigneeAgentId) {
void heartbeat
.wakeup(issue.assigneeAgentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: { issueId: issue.id, mutation: "create" },
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: { issueId: issue.id, source: "issue.create" },
})
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue create"));
}
res.status(201).json(issue);
});
@@ -88,6 +104,22 @@ export function issueRoutes(db: Db) {
details: req.body,
});
const assigneeChanged =
req.body.assigneeAgentId !== undefined && req.body.assigneeAgentId !== existing.assigneeAgentId;
if (assigneeChanged && issue.assigneeAgentId) {
void heartbeat
.wakeup(issue.assigneeAgentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_assigned",
payload: { issueId: issue.id, mutation: "update" },
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: { issueId: issue.id, source: "issue.update" },
})
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue update"));
}
res.json(issue);
});
@@ -148,6 +180,18 @@ export function issueRoutes(db: Db) {
details: { agentId: req.body.agentId },
});
void heartbeat
.wakeup(req.body.agentId, {
source: "assignment",
triggerDetail: "system",
reason: "issue_checked_out",
payload: { issueId: issue.id, mutation: "checkout" },
requestedByActorType: actor.actorType,
requestedByActorId: actor.actorId,
contextSnapshot: { issueId: issue.id, source: "issue.checkout" },
})
.catch((err) => logger.warn({ err, issueId: issue.id }, "failed to wake assignee on issue checkout"));
res.json(updated);
});

View File

@@ -1,5 +1,6 @@
import type { Db } from "@paperclip/db";
import { activityLog } from "@paperclip/db";
import { publishLiveEvent } from "./live-events.js";
export interface LogActivityInput {
companyId: string;
@@ -23,4 +24,18 @@ export async function logActivity(db: Db, input: LogActivityInput) {
agentId: input.agentId ?? null,
details: input.details ?? null,
});
publishLiveEvent({
companyId: input.companyId,
type: "activity.logged",
payload: {
actorType: input.actorType,
actorId: input.actorId,
action: input.action,
entityType: input.entityType,
entityId: input.entityId,
agentId: input.agentId ?? null,
details: input.details ?? null,
},
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -9,3 +9,4 @@ export { costService } from "./costs.js";
export { heartbeatService } from "./heartbeat.js";
export { dashboardService } from "./dashboard.js";
export { logActivity, type LogActivityInput } from "./activity-log.js";
export { publishLiveEvent, subscribeCompanyLiveEvents } from "./live-events.js";

View File

@@ -0,0 +1,40 @@
import { EventEmitter } from "node:events";
import type { LiveEvent, LiveEventType } from "@paperclip/shared";
type LiveEventPayload = Record<string, unknown>;
type LiveEventListener = (event: LiveEvent) => void;
const emitter = new EventEmitter();
emitter.setMaxListeners(0);
let nextEventId = 0;
function toLiveEvent(input: {
companyId: string;
type: LiveEventType;
payload?: LiveEventPayload;
}): LiveEvent {
nextEventId += 1;
return {
id: nextEventId,
companyId: input.companyId,
type: input.type,
createdAt: new Date().toISOString(),
payload: input.payload ?? {},
};
}
export function publishLiveEvent(input: {
companyId: string;
type: LiveEventType;
payload?: LiveEventPayload;
}) {
const event = toLiveEvent(input);
emitter.emit(input.companyId, event);
return event;
}
export function subscribeCompanyLiveEvents(companyId: string, listener: LiveEventListener) {
emitter.on(companyId, listener);
return () => emitter.off(companyId, listener);
}

View File

@@ -0,0 +1,159 @@
import { createReadStream, createWriteStream, promises as fs } from "node:fs";
import path from "node:path";
import { createHash } from "node:crypto";
import { notFound } from "../errors.js";
export type RunLogStoreType = "local_file";
export interface RunLogHandle {
store: RunLogStoreType;
logRef: string;
}
export interface RunLogReadOptions {
offset?: number;
limitBytes?: number;
}
export interface RunLogReadResult {
content: string;
nextOffset?: number;
}
export interface RunLogFinalizeSummary {
bytes: number;
sha256?: string;
compressed: boolean;
}
export interface RunLogStore {
begin(input: { companyId: string; agentId: string; runId: string }): Promise<RunLogHandle>;
append(
handle: RunLogHandle,
event: { stream: "stdout" | "stderr" | "system"; chunk: string; ts: string },
): Promise<void>;
finalize(handle: RunLogHandle): Promise<RunLogFinalizeSummary>;
read(handle: RunLogHandle, opts?: RunLogReadOptions): Promise<RunLogReadResult>;
}
function safeSegments(...segments: string[]) {
return segments.map((segment) => segment.replace(/[^a-zA-Z0-9._-]/g, "_"));
}
function resolveWithin(basePath: string, relativePath: string) {
const resolved = path.resolve(basePath, relativePath);
const base = path.resolve(basePath) + path.sep;
if (!resolved.startsWith(base) && resolved !== path.resolve(basePath)) {
throw new Error("Invalid log path");
}
return resolved;
}
function createLocalFileRunLogStore(basePath: string): RunLogStore {
async function ensureDir(relativeDir: string) {
const dir = resolveWithin(basePath, relativeDir);
await fs.mkdir(dir, { recursive: true });
}
async function readFileRange(filePath: string, offset: number, limitBytes: number): Promise<RunLogReadResult> {
const stat = await fs.stat(filePath).catch(() => null);
if (!stat) throw notFound("Run log not found");
const start = Math.max(0, Math.min(offset, stat.size));
const end = Math.max(start, Math.min(start + limitBytes - 1, stat.size - 1));
if (start > end) {
return { content: "", nextOffset: start };
}
const chunks: Buffer[] = [];
await new Promise<void>((resolve, reject) => {
const stream = createReadStream(filePath, { start, end });
stream.on("data", (chunk) => {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});
stream.on("error", reject);
stream.on("end", () => resolve());
});
const content = Buffer.concat(chunks).toString("utf8");
const nextOffset = end + 1 < stat.size ? end + 1 : undefined;
return { content, nextOffset };
}
async function sha256File(filePath: string): Promise<string> {
return new Promise<string>((resolve, reject) => {
const hash = createHash("sha256");
const stream = createReadStream(filePath);
stream.on("data", (chunk) => hash.update(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(hash.digest("hex")));
});
}
return {
async begin(input) {
const [companyId, agentId] = safeSegments(input.companyId, input.agentId);
const runId = safeSegments(input.runId)[0]!;
const relDir = path.join(companyId, agentId);
const relPath = path.join(relDir, `${runId}.ndjson`);
await ensureDir(relDir);
const absPath = resolveWithin(basePath, relPath);
await fs.writeFile(absPath, "", "utf8");
return { store: "local_file", logRef: relPath };
},
async append(handle, event) {
if (handle.store !== "local_file") return;
const absPath = resolveWithin(basePath, handle.logRef);
const line = JSON.stringify({
ts: event.ts,
stream: event.stream,
chunk: event.chunk,
});
await new Promise<void>((resolve, reject) => {
const stream = createWriteStream(absPath, { flags: "a", encoding: "utf8" });
stream.on("error", reject);
stream.end(`${line}\n`, () => resolve());
});
},
async finalize(handle) {
if (handle.store !== "local_file") {
return { bytes: 0, compressed: false };
}
const absPath = resolveWithin(basePath, handle.logRef);
const stat = await fs.stat(absPath).catch(() => null);
if (!stat) throw notFound("Run log not found");
const hash = await sha256File(absPath);
return {
bytes: stat.size,
sha256: hash,
compressed: false,
};
},
async read(handle, opts) {
if (handle.store !== "local_file") {
throw notFound("Run log not found");
}
const absPath = resolveWithin(basePath, handle.logRef);
const offset = opts?.offset ?? 0;
const limitBytes = opts?.limitBytes ?? 256_000;
return readFileRange(absPath, offset, limitBytes);
},
};
}
let cachedStore: RunLogStore | null = null;
export function getRunLogStore() {
if (cachedStore) return cachedStore;
const basePath = process.env.RUN_LOG_BASE_PATH ?? path.resolve(process.cwd(), "data/run-logs");
cachedStore = createLocalFileRunLogStore(basePath);
return cachedStore;
}