Add worktree history merge command

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
dotta
2026-03-20 15:02:24 -05:00
parent a46dc4634b
commit 0ec79d4295
3 changed files with 986 additions and 1 deletions

View File

@@ -17,13 +17,21 @@ import { execFileSync } from "node:child_process";
import { createServer } from "node:net";
import * as p from "@clack/prompts";
import pc from "picocolors";
import { eq } from "drizzle-orm";
import { and, eq, inArray, sql } from "drizzle-orm";
import {
applyPendingMigrations,
agents,
companies,
createDb,
ensurePostgresDatabase,
formatDatabaseBackupResult,
goals,
heartbeatRuns,
issueComments,
issueDocuments,
issues,
projectWorkspaces,
projects,
runDatabaseBackup,
runDatabaseRestore,
} from "@paperclipai/db";
@@ -48,6 +56,12 @@ import {
type WorktreeSeedMode,
type WorktreeLocalPaths,
} from "./worktree-lib.js";
import {
buildWorktreeMergePlan,
parseWorktreeMergeScopes,
type PlannedCommentInsert,
type PlannedIssueInsert,
} from "./worktree-merge-history-lib.js";
type WorktreeInitOptions = {
name?: string;
@@ -73,6 +87,14 @@ type WorktreeEnvOptions = {
json?: boolean;
};
type WorktreeMergeHistoryOptions = {
company?: string;
scope?: string;
apply?: boolean;
dry?: boolean;
yes?: boolean;
};
type EmbeddedPostgresInstance = {
initialise(): Promise<void>;
start(): Promise<void>;
@@ -1071,6 +1093,447 @@ export async function worktreeEnvCommand(opts: WorktreeEnvOptions): Promise<void
console.log(formatShellExports(out));
}
type ClosableDb = ReturnType<typeof createDb> & {
$client?: { end?: (opts?: { timeout?: number }) => Promise<void> };
};
type OpenDbHandle = {
db: ClosableDb;
stop: () => Promise<void>;
};
type ResolvedMergeCompany = {
id: string;
name: string;
issuePrefix: string;
};
function requirePathArgument(name: string, value: string | undefined): string {
const trimmed = nonEmpty(value);
if (!trimmed) {
throw new Error(`${name} is required.`);
}
return path.resolve(trimmed);
}
async function closeDb(db: ClosableDb): Promise<void> {
await db.$client?.end?.({ timeout: 5 }).catch(() => undefined);
}
async function openConfiguredDb(configPath: string): Promise<OpenDbHandle> {
const config = readConfig(configPath);
if (!config) {
throw new Error(`Config not found at ${configPath}.`);
}
const envEntries = readPaperclipEnvEntries(resolvePaperclipEnvFile(configPath));
let embeddedHandle: EmbeddedPostgresHandle | null = null;
try {
if (config.database.mode === "embedded-postgres") {
embeddedHandle = await ensureEmbeddedPostgres(
config.database.embeddedPostgresDataDir,
config.database.embeddedPostgresPort,
);
}
const connectionString = resolveSourceConnectionString(config, envEntries, embeddedHandle?.port);
const db = createDb(connectionString) as ClosableDb;
return {
db,
stop: async () => {
await closeDb(db);
if (embeddedHandle?.startedByThisProcess) {
await embeddedHandle.stop();
}
},
};
} catch (error) {
if (embeddedHandle?.startedByThisProcess) {
await embeddedHandle.stop().catch(() => undefined);
}
throw error;
}
}
async function resolveMergeCompany(input: {
sourceDb: ClosableDb;
targetDb: ClosableDb;
selector?: string;
}): Promise<ResolvedMergeCompany> {
const [sourceCompanies, targetCompanies] = await Promise.all([
input.sourceDb
.select({
id: companies.id,
name: companies.name,
issuePrefix: companies.issuePrefix,
})
.from(companies),
input.targetDb
.select({
id: companies.id,
name: companies.name,
issuePrefix: companies.issuePrefix,
})
.from(companies),
]);
const targetById = new Map(targetCompanies.map((company) => [company.id, company]));
const shared = sourceCompanies.filter((company) => targetById.has(company.id));
const selector = nonEmpty(input.selector);
if (selector) {
const matched = shared.find(
(company) => company.id === selector || company.issuePrefix.toLowerCase() === selector.toLowerCase(),
);
if (!matched) {
throw new Error(`Could not resolve company "${selector}" in both source and target databases.`);
}
return matched;
}
if (shared.length === 1) {
return shared[0];
}
if (shared.length === 0) {
throw new Error("Source and target databases do not share a company id. Pass --company explicitly once both sides match.");
}
const options = shared
.map((company) => `${company.issuePrefix} (${company.name})`)
.join(", ");
throw new Error(`Multiple shared companies found. Re-run with --company <id-or-prefix>. Options: ${options}`);
}
function renderMergePlan(plan: Awaited<ReturnType<typeof collectMergePlan>>["plan"], extras: {
sourcePath: string;
unsupportedRunCount: number;
unsupportedDocumentCount: number;
}): string {
const lines = [
`Mode: preview`,
`Source: ${extras.sourcePath}`,
`Company: ${plan.companyName} (${plan.issuePrefix})`,
"",
"Issues",
`- insert: ${plan.counts.issuesToInsert}`,
`- already present: ${plan.counts.issuesExisting}`,
`- shared/imported issues with drift: ${plan.counts.issueDrift}`,
];
const issueInserts = plan.issuePlans.filter((item): item is PlannedIssueInsert => item.action === "insert");
if (issueInserts.length > 0) {
lines.push("");
lines.push("Planned issue imports");
for (const issue of issueInserts) {
const adjustments = issue.adjustments.length > 0 ? ` [${issue.adjustments.join(", ")}]` : "";
lines.push(
`- ${issue.source.identifier ?? issue.source.id} -> ${issue.previewIdentifier} (${issue.targetStatus})${adjustments}`,
);
}
}
if (plan.scopes.includes("comments")) {
lines.push("");
lines.push("Comments");
lines.push(`- insert: ${plan.counts.commentsToInsert}`);
lines.push(`- already present: ${plan.counts.commentsExisting}`);
lines.push(`- skipped (missing parent): ${plan.counts.commentsMissingParent}`);
}
lines.push("");
lines.push("Adjustments");
lines.push(`- cleared assignee agents: ${plan.adjustments.clear_assignee_agent}`);
lines.push(`- cleared projects: ${plan.adjustments.clear_project}`);
lines.push(`- cleared project workspaces: ${plan.adjustments.clear_project_workspace}`);
lines.push(`- cleared goals: ${plan.adjustments.clear_goal}`);
lines.push(`- cleared comment author agents: ${plan.adjustments.clear_author_agent}`);
lines.push(`- coerced in_progress to todo: ${plan.adjustments.coerce_in_progress_to_todo}`);
lines.push("");
lines.push("Not imported in this phase");
lines.push(`- heartbeat runs: ${extras.unsupportedRunCount}`);
lines.push(`- issue documents: ${extras.unsupportedDocumentCount}`);
lines.push("");
lines.push("Identifiers shown above are provisional preview values. `--apply` reserves fresh issue numbers at write time.");
return lines.join("\n");
}
async function collectMergePlan(input: {
sourceDb: ClosableDb;
targetDb: ClosableDb;
company: ResolvedMergeCompany;
scopes: ReturnType<typeof parseWorktreeMergeScopes>;
}) {
const companyId = input.company.id;
const [targetCompanyRow, sourceIssuesRows, targetIssuesRows, sourceCommentsRows, targetCommentsRows, targetAgentsRows, targetProjectsRows, targetProjectWorkspaceRows, targetGoalsRows, runCountRows, documentCountRows] = await Promise.all([
input.targetDb
.select({
issueCounter: companies.issueCounter,
})
.from(companies)
.where(eq(companies.id, companyId))
.then((rows) => rows[0] ?? null),
input.sourceDb
.select()
.from(issues)
.where(eq(issues.companyId, companyId)),
input.targetDb
.select()
.from(issues)
.where(eq(issues.companyId, companyId)),
input.scopes.includes("comments")
? input.sourceDb
.select()
.from(issueComments)
.where(eq(issueComments.companyId, companyId))
: Promise.resolve([]),
input.scopes.includes("comments")
? input.targetDb
.select()
.from(issueComments)
.where(eq(issueComments.companyId, companyId))
: Promise.resolve([]),
input.targetDb
.select()
.from(agents)
.where(eq(agents.companyId, companyId)),
input.targetDb
.select()
.from(projects)
.where(eq(projects.companyId, companyId)),
input.targetDb
.select()
.from(projectWorkspaces)
.where(eq(projectWorkspaces.companyId, companyId)),
input.targetDb
.select()
.from(goals)
.where(eq(goals.companyId, companyId)),
input.sourceDb
.select({ count: sql<number>`count(*)::int` })
.from(heartbeatRuns)
.where(eq(heartbeatRuns.companyId, companyId)),
input.sourceDb
.select({ count: sql<number>`count(*)::int` })
.from(issueDocuments)
.innerJoin(issues, eq(issueDocuments.issueId, issues.id))
.where(eq(issues.companyId, companyId)),
]);
if (!targetCompanyRow) {
throw new Error(`Target company ${companyId} was not found.`);
}
const plan = buildWorktreeMergePlan({
companyId,
companyName: input.company.name,
issuePrefix: input.company.issuePrefix,
previewIssueCounterStart: targetCompanyRow.issueCounter,
scopes: input.scopes,
sourceIssues: sourceIssuesRows,
targetIssues: targetIssuesRows,
sourceComments: sourceCommentsRows,
targetComments: targetCommentsRows,
targetAgents: targetAgentsRows,
targetProjects: targetProjectsRows,
targetProjectWorkspaces: targetProjectWorkspaceRows,
targetGoals: targetGoalsRows,
});
return {
plan,
unsupportedRunCount: runCountRows[0]?.count ?? 0,
unsupportedDocumentCount: documentCountRows[0]?.count ?? 0,
};
}
async function applyMergePlan(input: {
targetDb: ClosableDb;
company: ResolvedMergeCompany;
plan: Awaited<ReturnType<typeof collectMergePlan>>["plan"];
}) {
const companyId = input.company.id;
return await input.targetDb.transaction(async (tx) => {
const issueCandidates = input.plan.issuePlans.filter(
(plan): plan is PlannedIssueInsert => plan.action === "insert",
);
const issueCandidateIds = issueCandidates.map((issue) => issue.source.id);
const existingIssueIds = issueCandidateIds.length > 0
? new Set(
(await tx
.select({ id: issues.id })
.from(issues)
.where(inArray(issues.id, issueCandidateIds)))
.map((row) => row.id),
)
: new Set<string>();
const issueInserts = issueCandidates.filter((issue) => !existingIssueIds.has(issue.source.id));
let nextIssueNumber = 0;
if (issueInserts.length > 0) {
const [companyRow] = await tx
.update(companies)
.set({ issueCounter: sql`${companies.issueCounter} + ${issueInserts.length}` })
.where(eq(companies.id, companyId))
.returning({ issueCounter: companies.issueCounter });
nextIssueNumber = companyRow.issueCounter - issueInserts.length + 1;
}
const insertedIssueIdentifiers = new Map<string, string>();
for (const issue of issueInserts) {
const issueNumber = nextIssueNumber;
nextIssueNumber += 1;
const identifier = `${input.company.issuePrefix}-${issueNumber}`;
insertedIssueIdentifiers.set(issue.source.id, identifier);
await tx.insert(issues).values({
id: issue.source.id,
companyId,
projectId: issue.targetProjectId,
projectWorkspaceId: issue.targetProjectWorkspaceId,
goalId: issue.targetGoalId,
parentId: issue.source.parentId,
title: issue.source.title,
description: issue.source.description,
status: issue.targetStatus,
priority: issue.source.priority,
assigneeAgentId: issue.targetAssigneeAgentId,
assigneeUserId: issue.source.assigneeUserId,
checkoutRunId: null,
executionRunId: null,
executionAgentNameKey: null,
executionLockedAt: null,
createdByAgentId: issue.targetCreatedByAgentId,
createdByUserId: issue.source.createdByUserId,
issueNumber,
identifier,
requestDepth: issue.source.requestDepth,
billingCode: issue.source.billingCode,
assigneeAdapterOverrides: issue.targetAssigneeAgentId ? issue.source.assigneeAdapterOverrides : null,
executionWorkspaceId: null,
executionWorkspacePreference: null,
executionWorkspaceSettings: null,
startedAt: issue.source.startedAt,
completedAt: issue.source.completedAt,
cancelledAt: issue.source.cancelledAt,
hiddenAt: issue.source.hiddenAt,
createdAt: issue.source.createdAt,
updatedAt: issue.source.updatedAt,
});
}
const commentCandidates = input.plan.commentPlans.filter(
(plan): plan is PlannedCommentInsert => plan.action === "insert",
);
const commentCandidateIds = commentCandidates.map((comment) => comment.source.id);
const existingCommentIds = commentCandidateIds.length > 0
? new Set(
(await tx
.select({ id: issueComments.id })
.from(issueComments)
.where(inArray(issueComments.id, commentCandidateIds)))
.map((row) => row.id),
)
: new Set<string>();
for (const comment of commentCandidates) {
if (existingCommentIds.has(comment.source.id)) continue;
const parentExists = await tx
.select({ id: issues.id })
.from(issues)
.where(and(eq(issues.id, comment.source.issueId), eq(issues.companyId, companyId)))
.then((rows) => rows[0] ?? null);
if (!parentExists) continue;
await tx.insert(issueComments).values({
id: comment.source.id,
companyId,
issueId: comment.source.issueId,
authorAgentId: comment.targetAuthorAgentId,
authorUserId: comment.source.authorUserId,
body: comment.source.body,
createdAt: comment.source.createdAt,
updatedAt: comment.source.updatedAt,
});
}
return {
insertedIssues: issueInserts.length,
insertedComments: commentCandidates.filter((comment) => !existingCommentIds.has(comment.source.id)).length,
insertedIssueIdentifiers,
};
});
}
export async function worktreeMergeHistoryCommand(sourceArg: string, opts: WorktreeMergeHistoryOptions): Promise<void> {
if (opts.apply && opts.dry) {
throw new Error("Use either --apply or --dry, not both.");
}
const sourceRoot = requirePathArgument("Source worktree path", sourceArg);
const sourceConfigPath = path.resolve(sourceRoot, ".paperclip", "config.json");
if (!existsSync(sourceConfigPath)) {
throw new Error(`Source worktree config not found at ${sourceConfigPath}.`);
}
const targetConfigPath = resolveConfigPath();
if (path.resolve(sourceConfigPath) === path.resolve(targetConfigPath)) {
throw new Error("Source and target Paperclip configs are the same. Point --source at a different worktree.");
}
const scopes = parseWorktreeMergeScopes(opts.scope);
const sourceHandle = await openConfiguredDb(sourceConfigPath);
const targetHandle = await openConfiguredDb(targetConfigPath);
try {
const company = await resolveMergeCompany({
sourceDb: sourceHandle.db,
targetDb: targetHandle.db,
selector: opts.company,
});
const collected = await collectMergePlan({
sourceDb: sourceHandle.db,
targetDb: targetHandle.db,
company,
scopes,
});
console.log(renderMergePlan(collected.plan, {
sourcePath: sourceRoot,
unsupportedRunCount: collected.unsupportedRunCount,
unsupportedDocumentCount: collected.unsupportedDocumentCount,
}));
if (!opts.apply) {
return;
}
const confirmed = opts.yes
? true
: await p.confirm({
message: `Import ${collected.plan.counts.issuesToInsert} issues and ${collected.plan.counts.commentsToInsert} comments from ${path.basename(sourceRoot)}?`,
initialValue: false,
});
if (p.isCancel(confirmed) || !confirmed) {
p.log.warn("Import cancelled.");
return;
}
const applied = await applyMergePlan({
targetDb: targetHandle.db,
company,
plan: collected.plan,
});
p.outro(
pc.green(
`Imported ${applied.insertedIssues} issues and ${applied.insertedComments} comments into ${company.issuePrefix}.`,
),
);
} finally {
await targetHandle.stop();
await sourceHandle.stop();
}
}
export function registerWorktreeCommands(program: Command): void {
const worktree = program.command("worktree").description("Worktree-local Paperclip instance helpers");
@@ -1114,6 +1577,17 @@ export function registerWorktreeCommands(program: Command): void {
.option("--json", "Print JSON instead of shell exports")
.action(worktreeEnvCommand);
program
.command("worktree:merge-history")
.description("Preview or import issue/comment history from another worktree into the current instance")
.argument("<source>", "Path to the source worktree root")
.option("--company <id-or-prefix>", "Company id or issue prefix to import")
.option("--scope <items>", "Comma-separated scopes to import (issues, comments)", "issues,comments")
.option("--apply", "Apply the import after previewing the plan", false)
.option("--dry", "Preview only and do not import anything", false)
.option("--yes", "Skip the interactive confirmation prompt when applying", false)
.action(worktreeMergeHistoryCommand);
program
.command("worktree:cleanup")
.description("Safely remove a worktree, its branch, and its isolated instance data")