From 3b01bbe18f0dece312f6aeed321e70f9f63be9e0 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 10:27:57 -0700 Subject: [PATCH 1/2] Move archived threads off the main read path Co-authored-by: codex --- .../Layers/CheckpointDiffQuery.test.ts | 6 + .../Layers/OrchestrationEngine.test.ts | 2 + .../Layers/ProjectionSnapshotQuery.test.ts | 259 +++++- .../Layers/ProjectionSnapshotQuery.ts | 878 +++++++++++++----- .../Services/ProjectionSnapshotQuery.ts | 18 + apps/server/src/server.test.ts | 29 + apps/server/src/serverRuntimeStartup.test.ts | 2 + apps/server/src/ws.ts | 22 + apps/web/src/components/ChatView.browser.tsx | 5 +- .../components/KeybindingsToast.browser.tsx | 5 +- .../components/settings/SettingsPanels.tsx | 194 ++-- apps/web/src/hooks/useThreadActions.ts | 143 ++- apps/web/src/lib/orchestrationReactQuery.ts | 23 + apps/web/src/routes/__root.tsx | 41 +- apps/web/src/wsNativeApi.test.ts | 2 + apps/web/src/wsNativeApi.ts | 2 + apps/web/src/wsRpcClient.ts | 10 + packages/contracts/src/ipc.ts | 3 + packages/contracts/src/orchestration.ts | 51 + packages/contracts/src/rpc.ts | 24 + 20 files changed, 1387 insertions(+), 332 deletions(-) create mode 100644 apps/web/src/lib/orchestrationReactQuery.ts diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts index c66c529b9a..eea07a60ab 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts @@ -82,7 +82,10 @@ describe("CheckpointDiffQueryLive", () => { Layer.succeed(ProjectionSnapshotQuery, { getSnapshot: () => Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), + getActiveSnapshot: () => + Effect.die("CheckpointDiffQuery should not request the active orchestration snapshot"), getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), + listArchivedThreads: () => Effect.succeed([]), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), getThreadCheckpointContext: () => Effect.succeed(Option.some(threadCheckpointContext)), @@ -136,7 +139,10 @@ describe("CheckpointDiffQueryLive", () => { Layer.succeed(ProjectionSnapshotQuery, { getSnapshot: () => Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"), + getActiveSnapshot: () => + Effect.die("CheckpointDiffQuery should not request the active orchestration snapshot"), getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }), + listArchivedThreads: () => Effect.succeed([]), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), getThreadCheckpointContext: () => Effect.succeed(Option.none()), diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 16bbd98d6b..5fa67d27c0 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -131,7 +131,9 @@ describe("OrchestrationEngine", () => { Layer.provide( Layer.succeed(ProjectionSnapshotQuery, { getSnapshot: () => Effect.succeed(projectionSnapshot), + getActiveSnapshot: () => Effect.succeed(projectionSnapshot), getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }), + listArchivedThreads: () => Effect.succeed([]), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), getThreadCheckpointContext: () => Effect.succeed(Option.none()), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index c038bc9d2c..5e258e287e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -431,6 +431,21 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { NULL, NULL ), + ( + 'thread-archived', + 'project-active', + 'Archived Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + '/tmp/worktree-archived', + NULL, + '2026-03-01T00:00:04.000Z', + '2026-03-01T00:00:04.500Z', + '2026-03-01T00:00:04.750Z', + NULL + ), ( 'thread-deleted', 'project-active', @@ -451,7 +466,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { const counts = yield* snapshotQuery.getCounts(); assert.deepEqual(counts, { projectCount: 2, - threadCount: 3, + threadCount: 4, }); const project = yield* snapshotQuery.getActiveProjectByWorkspaceRoot("/tmp/workspace"); @@ -470,9 +485,251 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { if (firstThreadId._tag === "Some") { assert.equal(firstThreadId.value, ThreadId.makeUnsafe("thread-first")); } + + const archivedThreads = yield* snapshotQuery.listArchivedThreads(); + assert.deepEqual(archivedThreads, [ + { + threadId: ThreadId.makeUnsafe("thread-archived"), + projectId: asProjectId("project-active"), + projectTitle: "Active Project", + workspaceRoot: "/tmp/workspace", + title: "Archived Thread", + worktreePath: "/tmp/worktree-archived", + createdAt: "2026-03-01T00:00:04.000Z", + updatedAt: "2026-03-01T00:00:04.500Z", + archivedAt: "2026-03-01T00:00:04.750Z", + }, + ]); }), ); + it.effect("hydrates an active snapshot without archived or deleted thread data", () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_thread_messages`; + yield* sql`DELETE FROM projection_thread_proposed_plans`; + yield* sql`DELETE FROM projection_thread_activities`; + yield* sql`DELETE FROM projection_thread_sessions`; + yield* sql`DELETE FROM projection_turns`; + yield* sql`DELETE FROM projection_state`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-active-only', + 'Active-only Project', + '/tmp/active-only', + NULL, + '[]', + '2026-03-03T00:00:00.000Z', + '2026-03-03T00:00:01.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES + ( + 'thread-active-only', + 'project-active-only', + 'Active Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + 'turn-active-only', + '2026-03-03T00:00:02.000Z', + '2026-03-03T00:00:03.000Z', + NULL, + NULL + ), + ( + 'thread-archived-only', + 'project-active-only', + 'Archived Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + 'turn-archived-only', + '2026-03-03T00:00:04.000Z', + '2026-03-03T00:00:05.000Z', + '2026-03-03T00:00:06.000Z', + NULL + ), + ( + 'thread-deleted-only', + 'project-active-only', + 'Deleted Thread', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL, + NULL, + NULL, + '2026-03-03T00:00:07.000Z', + '2026-03-03T00:00:08.000Z', + NULL, + '2026-03-03T00:00:09.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + is_streaming, + created_at, + updated_at + ) + VALUES + ( + 'message-active-only', + 'thread-active-only', + 'turn-active-only', + 'assistant', + 'active message', + 0, + '2026-03-03T00:00:10.000Z', + '2026-03-03T00:00:11.000Z' + ), + ( + 'message-archived-only', + 'thread-archived-only', + 'turn-archived-only', + 'assistant', + 'archived message', + 0, + '2026-03-03T00:00:12.000Z', + '2026-03-03T00:00:13.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_turns ( + thread_id, + turn_id, + pending_message_id, + source_proposed_plan_thread_id, + source_proposed_plan_id, + assistant_message_id, + state, + requested_at, + started_at, + completed_at, + checkpoint_turn_count, + checkpoint_ref, + checkpoint_status, + checkpoint_files_json + ) + VALUES + ( + 'thread-active-only', + 'turn-active-only', + NULL, + NULL, + NULL, + 'message-active-only', + 'completed', + '2026-03-03T00:00:14.000Z', + '2026-03-03T00:00:14.000Z', + '2026-03-03T00:00:14.000Z', + 1, + 'checkpoint-active-only', + 'ready', + '[]' + ), + ( + 'thread-archived-only', + 'turn-archived-only', + NULL, + NULL, + NULL, + 'message-archived-only', + 'completed', + '2026-03-03T00:00:15.000Z', + '2026-03-03T00:00:15.000Z', + '2026-03-03T00:00:15.000Z', + 1, + 'checkpoint-archived-only', + 'ready', + '[]' + ) + `; + + let sequence = 9; + for (const projector of Object.values(ORCHESTRATION_PROJECTOR_NAMES)) { + yield* sql` + INSERT INTO projection_state ( + projector, + last_applied_sequence, + updated_at + ) + VALUES ( + ${projector}, + ${sequence}, + '2026-03-03T00:00:16.000Z' + ) + `; + sequence += 1; + } + + const snapshot = yield* snapshotQuery.getActiveSnapshot(); + + assert.equal(snapshot.snapshotSequence, 9); + assert.equal(snapshot.projects.length, 1); + assert.deepEqual( + snapshot.threads.map((thread) => ({ + id: thread.id, + title: thread.title, + messageCount: thread.messages.length, + checkpointCount: thread.checkpoints.length, + })), + [ + { + id: ThreadId.makeUnsafe("thread-active-only"), + title: "Active Thread", + messageCount: 1, + checkpointCount: 1, + }, + ], + ); + }), + ); + it.effect("reads single-thread checkpoint context without hydrating unrelated threads", () => Effect.gen(function* () { const snapshotQuery = yield* ProjectionSnapshotQuery; diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index da7c695674..03ae24fc19 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -3,6 +3,7 @@ import { IsoDateTime, MessageId, NonNegativeInt, + OrchestrationArchivedThreadSummary, OrchestrationCheckpointFile, OrchestrationProposedPlanId, OrchestrationReadModel, @@ -89,6 +90,7 @@ const ProjectionLatestTurnDbRowSchema = Schema.Struct({ sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), }); const ProjectionStateDbRowSchema = ProjectionState; +const ProjectionArchivedThreadSummaryRowSchema = OrchestrationArchivedThreadSummary; const ProjectionCountsRowSchema = Schema.Struct({ projectCount: Schema.Number, threadCount: Schema.Number, @@ -112,6 +114,17 @@ const ProjectionThreadCheckpointContextThreadRowSchema = Schema.Struct({ workspaceRoot: Schema.String, worktreePath: Schema.NullOr(Schema.String), }); +type ProjectionProjectDbRow = Schema.Schema.Type; +type ProjectionThreadDbRow = Schema.Schema.Type; +type ProjectionThreadMessageDbRow = Schema.Schema.Type; +type ProjectionThreadProposedPlanDbRow = Schema.Schema.Type< + typeof ProjectionThreadProposedPlanDbRowSchema +>; +type ProjectionThreadActivityDbRow = Schema.Schema.Type; +type ProjectionThreadSessionDbRow = Schema.Schema.Type; +type ProjectionCheckpointDbRow = Schema.Schema.Type; +type ProjectionLatestTurnDbRow = Schema.Schema.Type; +type ProjectionStateDbRow = Schema.Schema.Type; const REQUIRED_SNAPSHOT_PROJECTORS = [ ORCHESTRATION_PROJECTOR_NAMES.projects, @@ -183,6 +196,26 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveProjectRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionProjectDbRowSchema, + execute: () => + sql` + SELECT + project_id AS "projectId", + title, + workspace_root AS "workspaceRoot", + default_model_selection_json AS "defaultModelSelection", + scripts_json AS "scripts", + created_at AS "createdAt", + updated_at AS "updatedAt", + deleted_at AS "deletedAt" + FROM projection_projects + WHERE deleted_at IS NULL + ORDER BY created_at ASC, project_id ASC + `, + }); + const listThreadRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadDbRowSchema, @@ -207,6 +240,32 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveThreadRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + project_id AS "projectId", + title, + model_selection_json AS "modelSelection", + runtime_mode AS "runtimeMode", + interaction_mode AS "interactionMode", + branch, + worktree_path AS "worktreePath", + latest_turn_id AS "latestTurnId", + created_at AS "createdAt", + updated_at AS "updatedAt", + archived_at AS "archivedAt", + deleted_at AS "deletedAt" + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ORDER BY created_at ASC, thread_id ASC + `, + }); + const listThreadMessageRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadMessageDbRowSchema, @@ -227,6 +286,32 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveThreadMessageRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadMessageDbRowSchema, + execute: () => + sql` + SELECT + message_id AS "messageId", + thread_id AS "threadId", + turn_id AS "turnId", + role, + text, + attachments_json AS "attachments", + is_streaming AS "isStreaming", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_messages + WHERE thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY thread_id ASC, created_at ASC, message_id ASC + `, + }); + const listThreadProposedPlanRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadProposedPlanDbRowSchema, @@ -246,6 +331,31 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveThreadProposedPlanRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadProposedPlanDbRowSchema, + execute: () => + sql` + SELECT + plan_id AS "planId", + thread_id AS "threadId", + turn_id AS "turnId", + plan_markdown AS "planMarkdown", + implemented_at AS "implementedAt", + implementation_thread_id AS "implementationThreadId", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_proposed_plans + WHERE thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY thread_id ASC, created_at ASC, plan_id ASC + `, + }); + const listThreadActivityRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadActivityDbRowSchema, @@ -271,6 +381,37 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveThreadActivityRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadActivityDbRowSchema, + execute: () => + sql` + SELECT + activity_id AS "activityId", + thread_id AS "threadId", + turn_id AS "turnId", + tone, + kind, + summary, + payload_json AS "payload", + sequence, + created_at AS "createdAt" + FROM projection_thread_activities + WHERE thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY + thread_id ASC, + CASE WHEN sequence IS NULL THEN 0 ELSE 1 END ASC, + sequence ASC, + created_at ASC, + activity_id ASC + `, + }); + const listThreadSessionRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadSessionDbRowSchema, @@ -291,6 +432,32 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveThreadSessionRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadSessionDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + status, + provider_name AS "providerName", + provider_session_id AS "providerSessionId", + provider_thread_id AS "providerThreadId", + runtime_mode AS "runtimeMode", + active_turn_id AS "activeTurnId", + last_error AS "lastError", + updated_at AS "updatedAt" + FROM projection_thread_sessions + WHERE thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY thread_id ASC + `, + }); + const listCheckpointRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionCheckpointDbRowSchema, @@ -311,6 +478,32 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveCheckpointRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionCheckpointDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + checkpoint_turn_count AS "checkpointTurnCount", + checkpoint_ref AS "checkpointRef", + checkpoint_status AS "status", + checkpoint_files_json AS "files", + assistant_message_id AS "assistantMessageId", + completed_at AS "completedAt" + FROM projection_turns + WHERE checkpoint_turn_count IS NOT NULL + AND thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY thread_id ASC, checkpoint_turn_count ASC + `, + }); + const listLatestTurnRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionLatestTurnDbRowSchema, @@ -332,6 +525,33 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listActiveLatestTurnRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionLatestTurnDbRowSchema, + execute: () => + sql` + SELECT + thread_id AS "threadId", + turn_id AS "turnId", + state, + requested_at AS "requestedAt", + started_at AS "startedAt", + completed_at AS "completedAt", + assistant_message_id AS "assistantMessageId", + source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + source_proposed_plan_id AS "sourceProposedPlanId" + FROM projection_turns + WHERE turn_id IS NOT NULL + AND thread_id IN ( + SELECT thread_id + FROM projection_threads + WHERE deleted_at IS NULL + AND archived_at IS NULL + ) + ORDER BY thread_id ASC, requested_at DESC, turn_id DESC + `, + }); + const listProjectionStateRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionStateDbRowSchema, @@ -345,6 +565,31 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listArchivedThreadSummaryRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionArchivedThreadSummaryRowSchema, + execute: () => + sql` + SELECT + threads.thread_id AS "threadId", + threads.project_id AS "projectId", + projects.title AS "projectTitle", + projects.workspace_root AS "workspaceRoot", + threads.title, + threads.worktree_path AS "worktreePath", + threads.created_at AS "createdAt", + threads.updated_at AS "updatedAt", + threads.archived_at AS "archivedAt" + FROM projection_threads AS threads + INNER JOIN projection_projects AS projects + ON projects.project_id = threads.project_id + WHERE threads.deleted_at IS NULL + AND threads.archived_at IS NOT NULL + AND projects.deleted_at IS NULL + ORDER BY threads.archived_at DESC, threads.thread_id DESC + `, + }); + const readProjectionCounts = SqlSchema.findOne({ Request: Schema.Void, Result: ProjectionCountsRowSchema, @@ -388,6 +633,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_threads WHERE project_id = ${projectId} AND deleted_at IS NULL + AND archived_at IS NULL ORDER BY created_at ASC, thread_id ASC LIMIT 1 `, @@ -433,7 +679,223 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); - const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => + const buildSnapshot = (input: { + readonly operationPrefix: string; + readonly projectRows: ReadonlyArray; + readonly threadRows: ReadonlyArray; + readonly messageRows: ReadonlyArray; + readonly proposedPlanRows: ReadonlyArray; + readonly activityRows: ReadonlyArray; + readonly sessionRows: ReadonlyArray; + readonly checkpointRows: ReadonlyArray; + readonly latestTurnRows: ReadonlyArray; + readonly stateRows: ReadonlyArray; + }) => { + const messagesByThread = new Map>(); + const proposedPlansByThread = new Map>(); + const activitiesByThread = new Map>(); + const checkpointsByThread = new Map>(); + const sessionsByThread = new Map(); + const latestTurnByThread = new Map(); + + let updatedAt: string | null = null; + + for (const row of input.projectRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (const row of input.threadRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (const row of input.stateRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } + + for (const row of input.messageRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadMessages = messagesByThread.get(row.threadId) ?? []; + threadMessages.push({ + id: row.messageId, + role: row.role, + text: row.text, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + turnId: row.turnId, + streaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + messagesByThread.set(row.threadId, threadMessages); + } + + for (const row of input.proposedPlanRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadProposedPlans = proposedPlansByThread.get(row.threadId) ?? []; + threadProposedPlans.push({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + proposedPlansByThread.set(row.threadId, threadProposedPlans); + } + + for (const row of input.activityRows) { + updatedAt = maxIso(updatedAt, row.createdAt); + const threadActivities = activitiesByThread.get(row.threadId) ?? []; + threadActivities.push({ + id: row.activityId, + tone: row.tone, + kind: row.kind, + summary: row.summary, + payload: row.payload, + turnId: row.turnId, + ...(row.sequence !== null ? { sequence: row.sequence } : {}), + createdAt: row.createdAt, + }); + activitiesByThread.set(row.threadId, threadActivities); + } + + for (const row of input.checkpointRows) { + updatedAt = maxIso(updatedAt, row.completedAt); + const threadCheckpoints = checkpointsByThread.get(row.threadId) ?? []; + threadCheckpoints.push({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + }); + checkpointsByThread.set(row.threadId, threadCheckpoints); + } + + for (const row of input.latestTurnRows) { + updatedAt = maxIso(updatedAt, row.requestedAt); + if (row.startedAt !== null) { + updatedAt = maxIso(updatedAt, row.startedAt); + } + if (row.completedAt !== null) { + updatedAt = maxIso(updatedAt, row.completedAt); + } + if (latestTurnByThread.has(row.threadId)) { + continue; + } + latestTurnByThread.set(row.threadId, { + turnId: row.turnId, + state: + row.state === "error" + ? "error" + : row.state === "interrupted" + ? "interrupted" + : row.state === "completed" + ? "completed" + : "running", + requestedAt: row.requestedAt, + startedAt: row.startedAt, + completedAt: row.completedAt, + assistantMessageId: row.assistantMessageId, + ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null + ? { + sourceProposedPlan: { + threadId: row.sourceProposedPlanThreadId, + planId: row.sourceProposedPlanId, + }, + } + : {}), + }); + } + + for (const row of input.sessionRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + sessionsByThread.set(row.threadId, { + threadId: row.threadId, + status: row.status, + providerName: row.providerName, + runtimeMode: row.runtimeMode, + activeTurnId: row.activeTurnId, + lastError: row.lastError, + updatedAt: row.updatedAt, + }); + } + + const projects: ReadonlyArray = input.projectRows.map((row) => ({ + id: row.projectId, + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModelSelection: row.defaultModelSelection, + scripts: row.scripts, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + })); + + const threads: ReadonlyArray = input.threadRows.map((row) => ({ + id: row.threadId, + projectId: row.projectId, + title: row.title, + modelSelection: row.modelSelection, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + branch: row.branch, + worktreePath: row.worktreePath, + latestTurn: latestTurnByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + archivedAt: row.archivedAt, + deletedAt: row.deletedAt, + messages: messagesByThread.get(row.threadId) ?? [], + proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + activities: activitiesByThread.get(row.threadId) ?? [], + checkpoints: checkpointsByThread.get(row.threadId) ?? [], + session: sessionsByThread.get(row.threadId) ?? null, + })); + + return decodeReadModel({ + snapshotSequence: computeSnapshotSequence(input.stateRows), + projects, + threads, + updatedAt: updatedAt ?? new Date(0).toISOString(), + }).pipe(Effect.mapError(toPersistenceDecodeError(`${input.operationPrefix}:decodeReadModel`))); + }; + + const getSnapshotFromQueries = (input: { + readonly operationPrefix: string; + readonly loadProjectRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadThreadRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadMessageRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadProposedPlanRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadActivityRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadSessionRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadCheckpointRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + readonly loadLatestTurnRows: Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + }) => sql .withTransaction( Effect.gen(function* () { @@ -448,254 +910,36 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { latestTurnRows, stateRows, ] = yield* Effect.all([ - listProjectRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listProjects:query", - "ProjectionSnapshotQuery.getSnapshot:listProjects:decodeRows", - ), - ), - ), - listThreadRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreads:query", - "ProjectionSnapshotQuery.getSnapshot:listThreads:decodeRows", - ), - ), - ), - listThreadMessageRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", - ), - ), - ), - listThreadProposedPlanRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:decodeRows", - ), - ), - ), - listThreadActivityRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", - ), - ), - ), - listThreadSessionRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:decodeRows", - ), - ), - ), - listCheckpointRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:query", - "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:decodeRows", - ), - ), - ), - listLatestTurnRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:query", - "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:decodeRows", - ), - ), - ), + input.loadProjectRows, + input.loadThreadRows, + input.loadMessageRows, + input.loadProposedPlanRows, + input.loadActivityRows, + input.loadSessionRows, + input.loadCheckpointRows, + input.loadLatestTurnRows, listProjectionStateRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listProjectionState:query", - "ProjectionSnapshotQuery.getSnapshot:listProjectionState:decodeRows", + `${input.operationPrefix}:listProjectionState:query`, + `${input.operationPrefix}:listProjectionState:decodeRows`, ), ), ), ]); - const messagesByThread = new Map>(); - const proposedPlansByThread = new Map>(); - const activitiesByThread = new Map>(); - const checkpointsByThread = new Map>(); - const sessionsByThread = new Map(); - const latestTurnByThread = new Map(); - - let updatedAt: string | null = null; - - for (const row of projectRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - for (const row of threadRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - for (const row of stateRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - - for (const row of messageRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - const threadMessages = messagesByThread.get(row.threadId) ?? []; - threadMessages.push({ - id: row.messageId, - role: row.role, - text: row.text, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - turnId: row.turnId, - streaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }); - messagesByThread.set(row.threadId, threadMessages); - } - - for (const row of proposedPlanRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - const threadProposedPlans = proposedPlansByThread.get(row.threadId) ?? []; - threadProposedPlans.push({ - id: row.planId, - turnId: row.turnId, - planMarkdown: row.planMarkdown, - implementedAt: row.implementedAt, - implementationThreadId: row.implementationThreadId, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }); - proposedPlansByThread.set(row.threadId, threadProposedPlans); - } - - for (const row of activityRows) { - updatedAt = maxIso(updatedAt, row.createdAt); - const threadActivities = activitiesByThread.get(row.threadId) ?? []; - threadActivities.push({ - id: row.activityId, - tone: row.tone, - kind: row.kind, - summary: row.summary, - payload: row.payload, - turnId: row.turnId, - ...(row.sequence !== null ? { sequence: row.sequence } : {}), - createdAt: row.createdAt, - }); - activitiesByThread.set(row.threadId, threadActivities); - } - - for (const row of checkpointRows) { - updatedAt = maxIso(updatedAt, row.completedAt); - const threadCheckpoints = checkpointsByThread.get(row.threadId) ?? []; - threadCheckpoints.push({ - turnId: row.turnId, - checkpointTurnCount: row.checkpointTurnCount, - checkpointRef: row.checkpointRef, - status: row.status, - files: row.files, - assistantMessageId: row.assistantMessageId, - completedAt: row.completedAt, - }); - checkpointsByThread.set(row.threadId, threadCheckpoints); - } - - for (const row of latestTurnRows) { - updatedAt = maxIso(updatedAt, row.requestedAt); - if (row.startedAt !== null) { - updatedAt = maxIso(updatedAt, row.startedAt); - } - if (row.completedAt !== null) { - updatedAt = maxIso(updatedAt, row.completedAt); - } - if (latestTurnByThread.has(row.threadId)) { - continue; - } - latestTurnByThread.set(row.threadId, { - turnId: row.turnId, - state: - row.state === "error" - ? "error" - : row.state === "interrupted" - ? "interrupted" - : row.state === "completed" - ? "completed" - : "running", - requestedAt: row.requestedAt, - startedAt: row.startedAt, - completedAt: row.completedAt, - assistantMessageId: row.assistantMessageId, - ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null - ? { - sourceProposedPlan: { - threadId: row.sourceProposedPlanThreadId, - planId: row.sourceProposedPlanId, - }, - } - : {}), - }); - } - - for (const row of sessionRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - sessionsByThread.set(row.threadId, { - threadId: row.threadId, - status: row.status, - providerName: row.providerName, - runtimeMode: row.runtimeMode, - activeTurnId: row.activeTurnId, - lastError: row.lastError, - updatedAt: row.updatedAt, - }); - } - - const projects: ReadonlyArray = projectRows.map((row) => ({ - id: row.projectId, - title: row.title, - workspaceRoot: row.workspaceRoot, - defaultModelSelection: row.defaultModelSelection, - scripts: row.scripts, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - deletedAt: row.deletedAt, - })); - - const threads: ReadonlyArray = threadRows.map((row) => ({ - id: row.threadId, - projectId: row.projectId, - title: row.title, - modelSelection: row.modelSelection, - runtimeMode: row.runtimeMode, - interactionMode: row.interactionMode, - branch: row.branch, - worktreePath: row.worktreePath, - latestTurn: latestTurnByThread.get(row.threadId) ?? null, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - archivedAt: row.archivedAt, - deletedAt: row.deletedAt, - messages: messagesByThread.get(row.threadId) ?? [], - proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], - activities: activitiesByThread.get(row.threadId) ?? [], - checkpoints: checkpointsByThread.get(row.threadId) ?? [], - session: sessionsByThread.get(row.threadId) ?? null, - })); - - const snapshot = { - snapshotSequence: computeSnapshotSequence(stateRows), - projects, - threads, - updatedAt: updatedAt ?? new Date(0).toISOString(), - }; - - return yield* decodeReadModel(snapshot).pipe( - Effect.mapError( - toPersistenceDecodeError("ProjectionSnapshotQuery.getSnapshot:decodeReadModel"), - ), - ); + return yield* buildSnapshot({ + operationPrefix: input.operationPrefix, + projectRows, + threadRows, + messageRows, + proposedPlanRows, + activityRows, + sessionRows, + checkpointRows, + latestTurnRows, + stateRows, + }); }), ) .pipe( @@ -703,10 +947,148 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { if (isPersistenceError(error)) { return error; } - return toPersistenceSqlError("ProjectionSnapshotQuery.getSnapshot:query")(error); + return toPersistenceSqlError(`${input.operationPrefix}:query`)(error); }), ); + const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => + getSnapshotFromQueries({ + operationPrefix: "ProjectionSnapshotQuery.getSnapshot", + loadProjectRows: listProjectRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listProjects:query", + "ProjectionSnapshotQuery.getSnapshot:listProjects:decodeRows", + ), + ), + ), + loadThreadRows: listThreadRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreads:query", + "ProjectionSnapshotQuery.getSnapshot:listThreads:decodeRows", + ), + ), + ), + loadMessageRows: listThreadMessageRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", + ), + ), + ), + loadProposedPlanRows: listThreadProposedPlanRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:decodeRows", + ), + ), + ), + loadActivityRows: listThreadActivityRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", + ), + ), + ), + loadSessionRows: listThreadSessionRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:decodeRows", + ), + ), + ), + loadCheckpointRows: listCheckpointRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:query", + "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:decodeRows", + ), + ), + ), + loadLatestTurnRows: listLatestTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:query", + "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:decodeRows", + ), + ), + ), + }); + + const getActiveSnapshot: ProjectionSnapshotQueryShape["getActiveSnapshot"] = () => + getSnapshotFromQueries({ + operationPrefix: "ProjectionSnapshotQuery.getActiveSnapshot", + loadProjectRows: listActiveProjectRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listProjects:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listProjects:decodeRows", + ), + ), + ), + loadThreadRows: listActiveThreadRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listThreads:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listThreads:decodeRows", + ), + ), + ), + loadMessageRows: listActiveThreadMessageRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadMessages:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadMessages:decodeRows", + ), + ), + ), + loadProposedPlanRows: listActiveThreadProposedPlanRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadProposedPlans:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadProposedPlans:decodeRows", + ), + ), + ), + loadActivityRows: listActiveThreadActivityRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadActivities:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadActivities:decodeRows", + ), + ), + ), + loadSessionRows: listActiveThreadSessionRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadSessions:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listThreadSessions:decodeRows", + ), + ), + ), + loadCheckpointRows: listActiveCheckpointRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listCheckpoints:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listCheckpoints:decodeRows", + ), + ), + ), + loadLatestTurnRows: listActiveLatestTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getActiveSnapshot:listLatestTurns:query", + "ProjectionSnapshotQuery.getActiveSnapshot:listLatestTurns:decodeRows", + ), + ), + ), + }); + const getCounts: ProjectionSnapshotQueryShape["getCounts"] = () => readProjectionCounts(undefined).pipe( Effect.mapError( @@ -723,6 +1105,16 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ); + const listArchivedThreads: ProjectionSnapshotQueryShape["listArchivedThreads"] = () => + listArchivedThreadSummaryRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.listArchivedThreads:query", + "ProjectionSnapshotQuery.listArchivedThreads:decodeRows", + ), + ), + ); + const getActiveProjectByWorkspaceRoot: ProjectionSnapshotQueryShape["getActiveProjectByWorkspaceRoot"] = (workspaceRoot) => getActiveProjectRowByWorkspaceRoot({ workspaceRoot }).pipe( @@ -806,7 +1198,9 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { return { getSnapshot, + getActiveSnapshot, getCounts, + listArchivedThreads, getActiveProjectByWorkspaceRoot, getFirstActiveThreadIdByProjectId, getThreadCheckpointContext, diff --git a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts index a7673dc32e..177a70ba6d 100644 --- a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts @@ -7,6 +7,7 @@ * @module ProjectionSnapshotQuery */ import type { + OrchestrationArchivedThreadSummary, OrchestrationCheckpointSummary, OrchestrationProject, OrchestrationReadModel, @@ -44,11 +45,28 @@ export interface ProjectionSnapshotQueryShape { */ readonly getSnapshot: () => Effect.Effect; + /** + * Read the latest UI-facing orchestration snapshot, omitting archived and + * deleted threads plus their related child rows. + */ + readonly getActiveSnapshot: () => Effect.Effect< + OrchestrationReadModel, + ProjectionRepositoryError + >; + /** * Read aggregate projection counts without hydrating the full read model. */ readonly getCounts: () => Effect.Effect; + /** + * Read archived thread summaries for the archive settings page. + */ + readonly listArchivedThreads: () => Effect.Effect< + ReadonlyArray, + ProjectionRepositoryError + >; + /** * Read the active project for an exact workspace root match. */ diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 0e53eae43e..3fff95af02 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -217,6 +217,8 @@ const buildAppUnderTest = (options?: { Layer.provide( Layer.mock(ProjectionSnapshotQuery)({ getSnapshot: () => Effect.succeed(makeDefaultOrchestrationReadModel()), + getActiveSnapshot: () => Effect.succeed(makeDefaultOrchestrationReadModel()), + listArchivedThreads: () => Effect.succeed([]), ...options?.layers?.projectionSnapshotQuery, }), ), @@ -1098,11 +1100,26 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }, ], }; + const archivedThreads = [ + { + threadId: ThreadId.makeUnsafe("thread-archived"), + projectId: ProjectId.makeUnsafe("project-a"), + projectTitle: "Project A", + workspaceRoot: "/tmp/project-a", + title: "Archived Thread", + worktreePath: null, + createdAt: now, + updatedAt: now, + archivedAt: now, + }, + ]; yield* buildAppUnderTest({ layers: { projectionSnapshotQuery: { getSnapshot: () => Effect.succeed(snapshot), + getActiveSnapshot: () => Effect.succeed(snapshot), + listArchivedThreads: () => Effect.succeed(archivedThreads), }, orchestrationEngine: { dispatch: () => Effect.succeed({ sequence: 7 }), @@ -1133,6 +1150,11 @@ it.layer(NodeServices.layer)("server router seam", (it) => { ); assert.equal(snapshotResult.snapshotSequence, 1); + const activeSnapshotResult = yield* Effect.scoped( + withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.getActiveSnapshot]({})), + ); + assert.equal(activeSnapshotResult.snapshotSequence, 1); + const dispatchResult = yield* Effect.scoped( withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.dispatchCommand]({ @@ -1166,6 +1188,13 @@ it.layer(NodeServices.layer)("server router seam", (it) => { ); assert.equal(fullDiffResult.diff, "full-diff"); + const archivedThreadsResult = yield* Effect.scoped( + withWsRpcClient(wsUrl, (client) => + client[ORCHESTRATION_WS_METHODS.listArchivedThreads]({}), + ), + ); + assert.deepEqual(archivedThreadsResult, archivedThreads); + const replayResult = yield* Effect.scoped( withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.replayEvents]({ diff --git a/apps/server/src/serverRuntimeStartup.test.ts b/apps/server/src/serverRuntimeStartup.test.ts index fc06d77566..f9eb376561 100644 --- a/apps/server/src/serverRuntimeStartup.test.ts +++ b/apps/server/src/serverRuntimeStartup.test.ts @@ -61,6 +61,7 @@ it.effect("launchStartupHeartbeat does not block the caller while counts are loa yield* launchStartupHeartbeat.pipe( Effect.provideService(ProjectionSnapshotQuery, { getSnapshot: () => Effect.die("unused"), + getActiveSnapshot: () => Effect.die("unused"), getCounts: () => Deferred.await(releaseCounts).pipe( Effect.as({ @@ -68,6 +69,7 @@ it.effect("launchStartupHeartbeat does not block the caller while counts are loa threadCount: 3, }), ), + listArchivedThreads: () => Effect.succeed([]), getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()), getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()), getThreadCheckpointContext: () => Effect.succeed(Option.none()), diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index cff7e26efa..5695c0b624 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -4,7 +4,9 @@ import { type GitManagerServiceError, OrchestrationDispatchCommandError, type OrchestrationEvent, + OrchestrationGetActiveSnapshotError, OrchestrationGetFullThreadDiffError, + OrchestrationListArchivedThreadsError, OrchestrationGetSnapshotError, OrchestrationGetTurnDiffError, ORCHESTRATION_WS_METHODS, @@ -82,6 +84,16 @@ const WsRpcLayer = WsRpcGroup.toLayer( }), ), ), + [ORCHESTRATION_WS_METHODS.getActiveSnapshot]: (_input) => + projectionSnapshotQuery.getActiveSnapshot().pipe( + Effect.mapError( + (cause) => + new OrchestrationGetActiveSnapshotError({ + message: "Failed to load active orchestration snapshot", + cause, + }), + ), + ), [ORCHESTRATION_WS_METHODS.dispatchCommand]: (command) => Effect.gen(function* () { const normalizedCommand = yield* normalizeDispatchCommand(command); @@ -116,6 +128,16 @@ const WsRpcLayer = WsRpcGroup.toLayer( }), ), ), + [ORCHESTRATION_WS_METHODS.listArchivedThreads]: (_input) => + projectionSnapshotQuery.listArchivedThreads().pipe( + Effect.mapError( + (cause) => + new OrchestrationListArchivedThreadsError({ + message: "Failed to load archived threads", + cause, + }), + ), + ), [ORCHESTRATION_WS_METHODS.replayEvents]: (input) => Stream.runCollect( orchestrationEngine.readEvents( diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index 0e5f573d54..e30327f642 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -622,7 +622,10 @@ function resolveWsRpc(body: NormalizedWsRpcRequestBody): unknown { return customResult; } const tag = body._tag; - if (tag === ORCHESTRATION_WS_METHODS.getSnapshot) { + if ( + tag === ORCHESTRATION_WS_METHODS.getSnapshot || + tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot + ) { return fixture.snapshot; } if (tag === WS_METHODS.serverGetConfig) { diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index 9581e72cd2..ab76aa31cd 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -147,7 +147,10 @@ function buildFixture(): TestFixture { } function resolveWsRpc(tag: string): unknown { - if (tag === ORCHESTRATION_WS_METHODS.getSnapshot) { + if ( + tag === ORCHESTRATION_WS_METHODS.getSnapshot || + tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot + ) { return fixture.snapshot; } if (tag === WS_METHODS.serverGetConfig) { diff --git a/apps/web/src/components/settings/SettingsPanels.tsx b/apps/web/src/components/settings/SettingsPanels.tsx index e3e620030b..1d4d7acb96 100644 --- a/apps/web/src/components/settings/SettingsPanels.tsx +++ b/apps/web/src/components/settings/SettingsPanels.tsx @@ -9,7 +9,7 @@ import { Undo2Icon, XIcon, } from "lucide-react"; -import { useQueryClient } from "@tanstack/react-query"; +import { useQuery, useQueryClient } from "@tanstack/react-query"; import { type ReactNode, useCallback, useEffect, useMemo, useRef, useState } from "react"; import { PROVIDER_DISPLAY_NAMES, @@ -40,13 +40,13 @@ import { setDesktopUpdateStateQueryData, useDesktopUpdateState, } from "../../lib/desktopUpdateReactQuery"; +import { archivedThreadsQueryOptions } from "../../lib/orchestrationReactQuery"; import { MAX_CUSTOM_MODEL_LENGTH, getCustomModelOptionsByProvider, resolveAppModelSelectionState, } from "../../modelSelection"; import { ensureNativeApi, readNativeApi } from "../../nativeApi"; -import { useStore } from "../../store"; import { formatRelativeTime, formatRelativeTimeLabel } from "../../timestampFormat"; import { cn } from "../../lib/utils"; import { Button } from "../ui/button"; @@ -1414,24 +1414,46 @@ export function GeneralSettingsPanel() { } export function ArchivedThreadsPanel() { - const projects = useStore((store) => store.projects); - const threads = useStore((store) => store.threads); - const { unarchiveThread, confirmAndDeleteThread } = useThreadActions(); + const { + data: archivedThreads = [], + error: archivedThreadsError, + isPending: archivedThreadsPending, + } = useQuery(archivedThreadsQueryOptions()); + const { unarchiveThread, deleteArchivedThread } = useThreadActions(); const archivedGroups = useMemo(() => { - const projectById = new Map(projects.map((project) => [project.id, project] as const)); - return [...projectById.values()] - .map((project) => ({ - project, - threads: threads - .filter((thread) => thread.projectId === project.id && thread.archivedAt !== null) - .toSorted((left, right) => { - const leftKey = left.archivedAt ?? left.createdAt; - const rightKey = right.archivedAt ?? right.createdAt; - return rightKey.localeCompare(leftKey) || right.id.localeCompare(left.id); - }), - })) - .filter((group) => group.threads.length > 0); - }, [projects, threads]); + const groups = new Map< + string, + { + project: { id: string; name: string; cwd: string }; + threads: Array<(typeof archivedThreads)[number]>; + } + >(); + + for (const thread of archivedThreads) { + const existing = groups.get(thread.projectId); + if (existing) { + existing.threads.push(thread); + continue; + } + groups.set(thread.projectId, { + project: { + id: thread.projectId, + name: thread.projectTitle, + cwd: thread.workspaceRoot, + }, + threads: [thread], + }); + } + + return Array.from(groups.values(), (group) => { + group.threads.sort( + (left, right) => + right.archivedAt.localeCompare(left.archivedAt) || + right.threadId.localeCompare(left.threadId), + ); + return group; + }); + }, [archivedThreads]); const handleArchivedThreadContextMenu = useCallback( async (threadId: ThreadId, position: { x: number; y: number }) => { @@ -1459,15 +1481,43 @@ export function ArchivedThreadsPanel() { } if (clicked === "delete") { - await confirmAndDeleteThread(threadId); + const thread = archivedThreads.find((entry) => entry.threadId === threadId); + if (!thread) { + return; + } + await deleteArchivedThread(thread, archivedThreads); } }, - [confirmAndDeleteThread, unarchiveThread], + [archivedThreads, deleteArchivedThread, unarchiveThread], ); return ( - {archivedGroups.length === 0 ? ( + {archivedThreadsError ? ( + + + + Could not load archived threads + + {archivedThreadsError instanceof Error + ? archivedThreadsError.message + : "An error occurred while loading archived threads."} + + + + + ) : null} + {!archivedThreadsError && archivedThreadsPending ? ( + + + + Loading archived threads + Fetching archived thread summaries. + + + + ) : null} + {!archivedThreadsError && !archivedThreadsPending && archivedGroups.length === 0 ? ( @@ -1479,56 +1529,58 @@ export function ArchivedThreadsPanel() { - ) : ( - archivedGroups.map(({ project, threads: projectThreads }) => ( - } - > - {projectThreads.map((thread) => ( -
{ - event.preventDefault(); - void handleArchivedThreadContextMenu(thread.id, { - x: event.clientX, - y: event.clientY, - }); - }} - > -
-

{thread.title}

-

- Archived {formatRelativeTimeLabel(thread.archivedAt ?? thread.createdAt)} - {" \u00b7 Created "} - {formatRelativeTimeLabel(thread.createdAt)} -

-
- -
- ))} -
- )) - )} +
+

{thread.title}

+

+ Archived {formatRelativeTimeLabel(thread.archivedAt)} + {" \u00b7 Created "} + {formatRelativeTimeLabel(thread.createdAt)} +

+
+ + + ))} + + )) + : null}
); } diff --git a/apps/web/src/hooks/useThreadActions.ts b/apps/web/src/hooks/useThreadActions.ts index d5557b4a96..c7444ee18e 100644 --- a/apps/web/src/hooks/useThreadActions.ts +++ b/apps/web/src/hooks/useThreadActions.ts @@ -1,4 +1,4 @@ -import { ThreadId } from "@t3tools/contracts"; +import { ThreadId, type OrchestrationArchivedThreadSummary } from "@t3tools/contracts"; import { useMutation, useQueryClient } from "@tanstack/react-query"; import { useNavigate, useParams } from "@tanstack/react-router"; import { useCallback } from "react"; @@ -7,6 +7,7 @@ import { getFallbackThreadIdAfterDelete } from "../components/Sidebar.logic"; import { useComposerDraftStore } from "../composerDraftStore"; import { useHandleNewThread } from "./useHandleNewThread"; import { gitRemoveWorktreeMutationOptions } from "../lib/gitReactQuery"; +import { orchestrationQueryKeys } from "../lib/orchestrationReactQuery"; import { newCommandId } from "../lib/utils"; import { readNativeApi } from "../nativeApi"; import { useStore } from "../store"; @@ -17,6 +18,7 @@ import { useSettings } from "./useSettings"; export function useThreadActions() { const appSettings = useSettings(); + const syncServerReadModel = useStore((store) => store.syncServerReadModel); const clearComposerDraftForThread = useComposerDraftStore((store) => store.clearDraftThread); const clearProjectDraftThreadById = useComposerDraftStore( (store) => store.clearProjectDraftThreadById, @@ -31,6 +33,13 @@ export function useThreadActions() { const queryClient = useQueryClient(); const removeWorktreeMutation = useMutation(gitRemoveWorktreeMutationOptions({ queryClient })); + const refreshActiveSnapshot = useCallback(async () => { + const api = readNativeApi(); + if (!api) return; + const snapshot = await api.orchestration.getActiveSnapshot(); + syncServerReadModel(snapshot); + }, [syncServerReadModel]); + const archiveThread = useCallback( async (threadId: ThreadId) => { const api = readNativeApi(); @@ -46,23 +55,29 @@ export function useThreadActions() { commandId: newCommandId(), threadId, }); + void queryClient.invalidateQueries({ queryKey: orchestrationQueryKeys.archivedThreads() }); if (routeThreadId === threadId) { await handleNewThread(thread.projectId); } }, - [handleNewThread, routeThreadId], + [handleNewThread, queryClient, routeThreadId], ); - const unarchiveThread = useCallback(async (threadId: ThreadId) => { - const api = readNativeApi(); - if (!api) return; - await api.orchestration.dispatchCommand({ - type: "thread.unarchive", - commandId: newCommandId(), - threadId, - }); - }, []); + const unarchiveThread = useCallback( + async (threadId: ThreadId) => { + const api = readNativeApi(); + if (!api) return; + await api.orchestration.dispatchCommand({ + type: "thread.unarchive", + commandId: newCommandId(), + threadId, + }); + await refreshActiveSnapshot(); + void queryClient.invalidateQueries({ queryKey: orchestrationQueryKeys.archivedThreads() }); + }, + [queryClient, refreshActiveSnapshot], + ); const deleteThread = useCallback( async (threadId: ThreadId, opts: { deletedThreadIds?: ReadonlySet } = {}) => { @@ -123,6 +138,7 @@ export function useThreadActions() { commandId: newCommandId(), threadId, }); + void queryClient.invalidateQueries({ queryKey: orchestrationQueryKeys.archivedThreads() }); clearComposerDraftForThread(threadId); clearProjectDraftThreadById(thread.projectId, thread.id); clearTerminalState(threadId); @@ -172,6 +188,110 @@ export function useThreadActions() { navigate, removeWorktreeMutation, routeThreadId, + queryClient, + ], + ); + + const deleteArchivedThread = useCallback( + async ( + thread: OrchestrationArchivedThreadSummary, + archivedThreads: ReadonlyArray, + ) => { + const api = readNativeApi(); + if (!api) return; + + const { projects, threads } = useStore.getState(); + const threadProject = projects.find((project) => project.id === thread.projectId); + + if (appSettings.confirmThreadDelete) { + const confirmed = await api.dialogs.confirm( + [ + `Delete thread "${thread.title}"?`, + "This permanently clears conversation history for this thread.", + ].join("\n"), + ); + if (!confirmed) { + return; + } + } + + const normalizedWorktreePath = thread.worktreePath?.trim() || null; + const hasSharedWorktree = + normalizedWorktreePath !== null && + (threads.some( + (entry) => + entry.id !== thread.threadId && + (entry.worktreePath?.trim() || null) === normalizedWorktreePath, + ) || + archivedThreads.some( + (entry) => + entry.threadId !== thread.threadId && + (entry.worktreePath?.trim() || null) === normalizedWorktreePath, + )); + const orphanedWorktreePath = hasSharedWorktree ? null : normalizedWorktreePath; + const displayWorktreePath = orphanedWorktreePath + ? formatWorktreePathForDisplay(orphanedWorktreePath) + : null; + const canDeleteWorktree = orphanedWorktreePath !== null && threadProject !== undefined; + const shouldDeleteWorktree = + canDeleteWorktree && + (await api.dialogs.confirm( + [ + "This thread is the only one linked to this worktree:", + displayWorktreePath ?? orphanedWorktreePath, + "", + "Delete the worktree too?", + ].join("\n"), + )); + + try { + await api.terminal.close({ threadId: thread.threadId, deleteHistory: true }); + } catch { + // Terminal may already be closed. + } + + await api.orchestration.dispatchCommand({ + type: "thread.delete", + commandId: newCommandId(), + threadId: thread.threadId, + }); + clearComposerDraftForThread(thread.threadId); + clearProjectDraftThreadById(thread.projectId, thread.threadId); + clearTerminalState(thread.threadId); + void queryClient.invalidateQueries({ queryKey: orchestrationQueryKeys.archivedThreads() }); + + if (!shouldDeleteWorktree || !orphanedWorktreePath || !threadProject) { + return; + } + + try { + await removeWorktreeMutation.mutateAsync({ + cwd: threadProject.cwd, + path: orphanedWorktreePath, + force: true, + }); + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown error removing worktree."; + console.error("Failed to remove orphaned worktree after archived thread deletion", { + threadId: thread.threadId, + projectCwd: threadProject.cwd, + worktreePath: orphanedWorktreePath, + error, + }); + toastManager.add({ + type: "error", + title: "Thread deleted, but worktree removal failed", + description: `Could not remove ${displayWorktreePath ?? orphanedWorktreePath}. ${message}`, + }); + } + }, + [ + appSettings.confirmThreadDelete, + clearComposerDraftForThread, + clearProjectDraftThreadById, + clearTerminalState, + queryClient, + removeWorktreeMutation, ], ); @@ -203,6 +323,7 @@ export function useThreadActions() { archiveThread, unarchiveThread, deleteThread, + deleteArchivedThread, confirmAndDeleteThread, }; } diff --git a/apps/web/src/lib/orchestrationReactQuery.ts b/apps/web/src/lib/orchestrationReactQuery.ts new file mode 100644 index 0000000000..f746f492d4 --- /dev/null +++ b/apps/web/src/lib/orchestrationReactQuery.ts @@ -0,0 +1,23 @@ +import type { OrchestrationListArchivedThreadsResult } from "@t3tools/contracts"; +import { queryOptions } from "@tanstack/react-query"; +import { ensureNativeApi } from "~/nativeApi"; + +export const orchestrationQueryKeys = { + all: ["orchestration"] as const, + archivedThreads: () => ["orchestration", "archived-threads"] as const, +}; + +const DEFAULT_ARCHIVED_THREADS_STALE_TIME = 15_000; +const EMPTY_ARCHIVED_THREADS: OrchestrationListArchivedThreadsResult = []; + +export function archivedThreadsQueryOptions() { + return queryOptions({ + queryKey: orchestrationQueryKeys.archivedThreads(), + queryFn: async () => { + const api = ensureNativeApi(); + return api.orchestration.listArchivedThreads(); + }, + staleTime: DEFAULT_ARCHIVED_THREADS_STALE_TIME, + placeholderData: (previous) => previous ?? EMPTY_ARCHIVED_THREADS, + }); +} diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 368c595a4b..88afe75883 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -39,6 +39,7 @@ import { terminalRunningSubprocessFromEvent } from "../terminalActivity"; import { migrateLocalSettingsToServer } from "../hooks/useSettings"; import { providerQueryKeys } from "../lib/providerReactQuery"; import { projectQueryKeys } from "../lib/projectReactQuery"; +import { orchestrationQueryKeys } from "../lib/orchestrationReactQuery"; import { collectActiveTerminalThreadIds } from "../lib/terminalStateCleanup"; import { deriveOrchestrationBatchEffects } from "../orchestrationEventEffects"; import { createOrchestrationRecoveryCoordinator } from "../orchestrationRecovery"; @@ -309,10 +310,21 @@ function EventRouter() { }, ); - const applyEventBatch = (events: ReadonlyArray) => { + const applyEventBatch = ( + events: ReadonlyArray, + ): "applied" | "snapshot-recovery-needed" => { const nextEvents = recovery.markEventBatchApplied(events); if (nextEvents.length === 0) { - return; + return "applied"; + } + + const knownThreadIds = new Set(useStore.getState().threads.map((thread) => thread.id)); + const needsActiveSnapshotRecovery = nextEvents.some( + (event) => + event.type === "thread.unarchived" && !knownThreadIds.has(event.payload.threadId), + ); + if (needsActiveSnapshotRecovery) { + return "snapshot-recovery-needed"; } const batchEffects = deriveOrchestrationBatchEffects(nextEvents); @@ -327,6 +339,16 @@ function EventRouter() { needsProviderInvalidation = true; void queryInvalidationThrottler.maybeExecute(); } + if ( + nextEvents.some( + (event) => + event.type === "thread.archived" || + event.type === "thread.unarchived" || + event.type === "thread.deleted", + ) + ) { + void queryClient.invalidateQueries({ queryKey: orchestrationQueryKeys.archivedThreads() }); + } applyOrchestrationEvents(nextEvents); if (needsProjectUiSync) { @@ -356,6 +378,7 @@ function EventRouter() { for (const threadId of batchEffects.removeTerminalStateThreadIds) { removeTerminalState(threadId); } + return "applied"; }; const recoverFromSequenceGap = async (): Promise => { @@ -366,7 +389,12 @@ function EventRouter() { try { const events = await api.orchestration.replayEvents(recovery.getState().latestSequence); if (!disposed) { - applyEventBatch(events); + const result = applyEventBatch(events); + if (result === "snapshot-recovery-needed") { + recovery.failReplayRecovery(); + void fallbackToSnapshotRecovery(); + return; + } } } catch { recovery.failReplayRecovery(); @@ -385,7 +413,7 @@ function EventRouter() { } try { - const snapshot = await api.orchestration.getSnapshot(); + const snapshot = await api.orchestration.getActiveSnapshot(); if (!disposed) { syncServerReadModel(snapshot); reconcileSnapshotDerivedState(); @@ -410,7 +438,10 @@ function EventRouter() { const unsubDomainEvent = api.orchestration.onDomainEvent((event) => { const action = recovery.classifyDomainEvent(event.sequence); if (action === "apply") { - applyEventBatch([event]); + const result = applyEventBatch([event]); + if (result === "snapshot-recovery-needed") { + void fallbackToSnapshotRecovery(); + } return; } if (action === "recover") { diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index c9bcc7ffd2..b4cfde03f5 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -76,6 +76,8 @@ const rpcClientMock = { }, orchestration: { getSnapshot: vi.fn(), + getActiveSnapshot: vi.fn(), + listArchivedThreads: vi.fn(), dispatchCommand: vi.fn(), getTurnDiff: vi.fn(), getFullThreadDiff: vi.fn(), diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index 31160dfa1c..99558a02f6 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -91,6 +91,8 @@ export function createWsNativeApi(): NativeApi { }, orchestration: { getSnapshot: rpcClient.orchestration.getSnapshot, + getActiveSnapshot: rpcClient.orchestration.getActiveSnapshot, + listArchivedThreads: rpcClient.orchestration.listArchivedThreads, dispatchCommand: rpcClient.orchestration.dispatchCommand, getTurnDiff: rpcClient.orchestration.getTurnDiff, getFullThreadDiff: rpcClient.orchestration.getFullThreadDiff, diff --git a/apps/web/src/wsRpcClient.ts b/apps/web/src/wsRpcClient.ts index 60f51ba707..c02d4447bf 100644 --- a/apps/web/src/wsRpcClient.ts +++ b/apps/web/src/wsRpcClient.ts @@ -87,9 +87,15 @@ export interface WsRpcClient { }; readonly orchestration: { readonly getSnapshot: RpcUnaryNoArgMethod; + readonly getActiveSnapshot: RpcUnaryNoArgMethod< + typeof ORCHESTRATION_WS_METHODS.getActiveSnapshot + >; readonly dispatchCommand: RpcUnaryMethod; readonly getTurnDiff: RpcUnaryMethod; readonly getFullThreadDiff: RpcUnaryMethod; + readonly listArchivedThreads: RpcUnaryNoArgMethod< + typeof ORCHESTRATION_WS_METHODS.listArchivedThreads + >; readonly replayEvents: RpcUnaryMethod; readonly onDomainEvent: RpcStreamMethod; }; @@ -187,12 +193,16 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { orchestration: { getSnapshot: () => transport.request((client) => client[ORCHESTRATION_WS_METHODS.getSnapshot]({})), + getActiveSnapshot: () => + transport.request((client) => client[ORCHESTRATION_WS_METHODS.getActiveSnapshot]({})), dispatchCommand: (input) => transport.request((client) => client[ORCHESTRATION_WS_METHODS.dispatchCommand](input)), getTurnDiff: (input) => transport.request((client) => client[ORCHESTRATION_WS_METHODS.getTurnDiff](input)), getFullThreadDiff: (input) => transport.request((client) => client[ORCHESTRATION_WS_METHODS.getFullThreadDiff](input)), + listArchivedThreads: () => + transport.request((client) => client[ORCHESTRATION_WS_METHODS.listArchivedThreads]({})), replayEvents: (input) => transport .request((client) => client[ORCHESTRATION_WS_METHODS.replayEvents](input)) diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 3114f6f5be..3724ebed43 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -44,6 +44,7 @@ import type { OrchestrationGetFullThreadDiffResult, OrchestrationGetTurnDiffInput, OrchestrationGetTurnDiffResult, + OrchestrationListArchivedThreadsResult, OrchestrationEvent, OrchestrationReadModel, } from "./orchestration"; @@ -174,6 +175,8 @@ export interface NativeApi { }; orchestration: { getSnapshot: () => Promise; + getActiveSnapshot: () => Promise; + listArchivedThreads: () => Promise; dispatchCommand: (command: ClientOrchestrationCommand) => Promise<{ sequence: number }>; getTurnDiff: (input: OrchestrationGetTurnDiffInput) => Promise; getFullThreadDiff: ( diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index bd7e374f32..ea58cb0efe 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -17,9 +17,11 @@ import { export const ORCHESTRATION_WS_METHODS = { getSnapshot: "orchestration.getSnapshot", + getActiveSnapshot: "orchestration.getActiveSnapshot", dispatchCommand: "orchestration.dispatchCommand", getTurnDiff: "orchestration.getTurnDiff", getFullThreadDiff: "orchestration.getFullThreadDiff", + listArchivedThreads: "orchestration.listArchivedThreads", replayEvents: "orchestration.replayEvents", } as const; @@ -985,6 +987,31 @@ export type OrchestrationGetSnapshotInput = typeof OrchestrationGetSnapshotInput const OrchestrationGetSnapshotResult = OrchestrationReadModel; export type OrchestrationGetSnapshotResult = typeof OrchestrationGetSnapshotResult.Type; +export const OrchestrationGetActiveSnapshotInput = Schema.Struct({}); +export type OrchestrationGetActiveSnapshotInput = typeof OrchestrationGetActiveSnapshotInput.Type; +const OrchestrationGetActiveSnapshotResult = OrchestrationReadModel; +export type OrchestrationGetActiveSnapshotResult = typeof OrchestrationGetActiveSnapshotResult.Type; + +export const OrchestrationArchivedThreadSummary = Schema.Struct({ + threadId: ThreadId, + projectId: ProjectId, + projectTitle: TrimmedNonEmptyString, + workspaceRoot: TrimmedNonEmptyString, + title: TrimmedNonEmptyString, + worktreePath: Schema.NullOr(TrimmedNonEmptyString), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, + archivedAt: IsoDateTime, +}); +export type OrchestrationArchivedThreadSummary = typeof OrchestrationArchivedThreadSummary.Type; + +export const OrchestrationListArchivedThreadsInput = Schema.Struct({}); +export type OrchestrationListArchivedThreadsInput = + typeof OrchestrationListArchivedThreadsInput.Type; +const OrchestrationListArchivedThreadsResult = Schema.Array(OrchestrationArchivedThreadSummary); +export type OrchestrationListArchivedThreadsResult = + typeof OrchestrationListArchivedThreadsResult.Type; + export const OrchestrationGetTurnDiffInput = TurnCountRange.mapFields( Struct.assign({ threadId: ThreadId }), { unsafePreserveChecks: true }, @@ -1016,6 +1043,10 @@ export const OrchestrationRpcSchemas = { input: OrchestrationGetSnapshotInput, output: OrchestrationGetSnapshotResult, }, + getActiveSnapshot: { + input: OrchestrationGetActiveSnapshotInput, + output: OrchestrationGetActiveSnapshotResult, + }, dispatchCommand: { input: ClientOrchestrationCommand, output: DispatchResult, @@ -1028,6 +1059,10 @@ export const OrchestrationRpcSchemas = { input: OrchestrationGetFullThreadDiffInput, output: OrchestrationGetFullThreadDiffResult, }, + listArchivedThreads: { + input: OrchestrationListArchivedThreadsInput, + output: OrchestrationListArchivedThreadsResult, + }, replayEvents: { input: OrchestrationReplayEventsInput, output: OrchestrationReplayEventsResult, @@ -1042,6 +1077,14 @@ export class OrchestrationGetSnapshotError extends Schema.TaggedErrorClass()( + "OrchestrationGetActiveSnapshotError", + { + message: TrimmedNonEmptyString, + cause: Schema.optional(Schema.Defect), + }, +) {} + export class OrchestrationDispatchCommandError extends Schema.TaggedErrorClass()( "OrchestrationDispatchCommandError", { @@ -1066,6 +1109,14 @@ export class OrchestrationGetFullThreadDiffError extends Schema.TaggedErrorClass }, ) {} +export class OrchestrationListArchivedThreadsError extends Schema.TaggedErrorClass()( + "OrchestrationListArchivedThreadsError", + { + message: TrimmedNonEmptyString, + cause: Schema.optional(Schema.Defect), + }, +) {} + export class OrchestrationReplayEventsError extends Schema.TaggedErrorClass()( "OrchestrationReplayEventsError", { diff --git a/packages/contracts/src/rpc.ts b/packages/contracts/src/rpc.ts index 34968e66ec..2fc8fb3375 100644 --- a/packages/contracts/src/rpc.ts +++ b/packages/contracts/src/rpc.ts @@ -31,12 +31,16 @@ import { OrchestrationEvent, ORCHESTRATION_WS_METHODS, OrchestrationDispatchCommandError, + OrchestrationGetActiveSnapshotError, + OrchestrationGetActiveSnapshotInput, OrchestrationGetFullThreadDiffError, OrchestrationGetFullThreadDiffInput, OrchestrationGetSnapshotError, OrchestrationGetSnapshotInput, OrchestrationGetTurnDiffError, OrchestrationGetTurnDiffInput, + OrchestrationListArchivedThreadsError, + OrchestrationListArchivedThreadsInput, OrchestrationReplayEventsError, OrchestrationReplayEventsInput, OrchestrationRpcSchemas, @@ -263,6 +267,15 @@ export const WsOrchestrationGetSnapshotRpc = Rpc.make(ORCHESTRATION_WS_METHODS.g error: OrchestrationGetSnapshotError, }); +export const WsOrchestrationGetActiveSnapshotRpc = Rpc.make( + ORCHESTRATION_WS_METHODS.getActiveSnapshot, + { + payload: OrchestrationGetActiveSnapshotInput, + success: OrchestrationRpcSchemas.getActiveSnapshot.output, + error: OrchestrationGetActiveSnapshotError, + }, +); + export const WsOrchestrationDispatchCommandRpc = Rpc.make( ORCHESTRATION_WS_METHODS.dispatchCommand, { @@ -287,6 +300,15 @@ export const WsOrchestrationGetFullThreadDiffRpc = Rpc.make( }, ); +export const WsOrchestrationListArchivedThreadsRpc = Rpc.make( + ORCHESTRATION_WS_METHODS.listArchivedThreads, + { + payload: OrchestrationListArchivedThreadsInput, + success: OrchestrationRpcSchemas.listArchivedThreads.output, + error: OrchestrationListArchivedThreadsError, + }, +); + export const WsOrchestrationReplayEventsRpc = Rpc.make(ORCHESTRATION_WS_METHODS.replayEvents, { payload: OrchestrationReplayEventsInput, success: OrchestrationRpcSchemas.replayEvents.output, @@ -352,8 +374,10 @@ export const WsRpcGroup = RpcGroup.make( WsSubscribeServerConfigRpc, WsSubscribeServerLifecycleRpc, WsOrchestrationGetSnapshotRpc, + WsOrchestrationGetActiveSnapshotRpc, WsOrchestrationDispatchCommandRpc, WsOrchestrationGetTurnDiffRpc, WsOrchestrationGetFullThreadDiffRpc, + WsOrchestrationListArchivedThreadsRpc, WsOrchestrationReplayEventsRpc, ); From 4b155dbec80ce03ec6a3bfd48fa0b75f9cf9dee9 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 1 Apr 2026 15:17:31 -0700 Subject: [PATCH 2/2] Remove dead orchestration snapshot RPC Co-authored-by: codex --- apps/server/src/server.test.ts | 21 +++++++------------ apps/server/src/ws.ts | 11 ---------- apps/web/src/components/ChatView.browser.tsx | 5 +---- .../components/KeybindingsToast.browser.tsx | 5 +---- apps/web/src/wsNativeApi.test.ts | 1 - apps/web/src/wsNativeApi.ts | 1 - apps/web/src/wsRpcClient.ts | 3 --- packages/contracts/src/ipc.ts | 1 - packages/contracts/src/orchestration.ts | 18 ---------------- packages/contracts/src/rpc.ts | 17 ++++----------- 10 files changed, 14 insertions(+), 69 deletions(-) diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 3fff95af02..c0afb383c7 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -1145,11 +1145,6 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }); const wsUrl = yield* getWsServerUrl("/ws"); - const snapshotResult = yield* Effect.scoped( - withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.getSnapshot]({})), - ); - assert.equal(snapshotResult.snapshotSequence, 1); - const activeSnapshotResult = yield* Effect.scoped( withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.getActiveSnapshot]({})), ); @@ -1266,15 +1261,15 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); - it.effect("routes websocket rpc orchestration.getSnapshot errors", () => + it.effect("routes websocket rpc orchestration.getActiveSnapshot errors", () => Effect.gen(function* () { yield* buildAppUnderTest({ layers: { projectionSnapshotQuery: { - getSnapshot: () => + getActiveSnapshot: () => Effect.fail( new PersistenceSqlError({ - operation: "ProjectionSnapshotQuery.getSnapshot", + operation: "ProjectionSnapshotQuery.getActiveSnapshot", detail: "projection unavailable", }), ), @@ -1284,14 +1279,14 @@ it.layer(NodeServices.layer)("server router seam", (it) => { const wsUrl = yield* getWsServerUrl("/ws"); const result = yield* Effect.scoped( - withWsRpcClient(wsUrl, (client) => client[ORCHESTRATION_WS_METHODS.getSnapshot]({})).pipe( - Effect.result, - ), + withWsRpcClient(wsUrl, (client) => + client[ORCHESTRATION_WS_METHODS.getActiveSnapshot]({}), + ).pipe(Effect.result), ); assertTrue(result._tag === "Failure"); - assertTrue(result.failure._tag === "OrchestrationGetSnapshotError"); - assertInclude(result.failure.message, "Failed to load orchestration snapshot"); + assertTrue(result.failure._tag === "OrchestrationGetActiveSnapshotError"); + assertInclude(result.failure.message, "Failed to load active orchestration snapshot"); }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index 5695c0b624..a488d7dcfc 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -7,7 +7,6 @@ import { OrchestrationGetActiveSnapshotError, OrchestrationGetFullThreadDiffError, OrchestrationListArchivedThreadsError, - OrchestrationGetSnapshotError, OrchestrationGetTurnDiffError, ORCHESTRATION_WS_METHODS, ProjectSearchEntriesError, @@ -74,16 +73,6 @@ const WsRpcLayer = WsRpcGroup.toLayer( }); return WsRpcGroup.of({ - [ORCHESTRATION_WS_METHODS.getSnapshot]: (_input) => - projectionSnapshotQuery.getSnapshot().pipe( - Effect.mapError( - (cause) => - new OrchestrationGetSnapshotError({ - message: "Failed to load orchestration snapshot", - cause, - }), - ), - ), [ORCHESTRATION_WS_METHODS.getActiveSnapshot]: (_input) => projectionSnapshotQuery.getActiveSnapshot().pipe( Effect.mapError( diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index e30327f642..aebf7b8d87 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -622,10 +622,7 @@ function resolveWsRpc(body: NormalizedWsRpcRequestBody): unknown { return customResult; } const tag = body._tag; - if ( - tag === ORCHESTRATION_WS_METHODS.getSnapshot || - tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot - ) { + if (tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot) { return fixture.snapshot; } if (tag === WS_METHODS.serverGetConfig) { diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index ab76aa31cd..2bb6c1ef3c 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -147,10 +147,7 @@ function buildFixture(): TestFixture { } function resolveWsRpc(tag: string): unknown { - if ( - tag === ORCHESTRATION_WS_METHODS.getSnapshot || - tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot - ) { + if (tag === ORCHESTRATION_WS_METHODS.getActiveSnapshot) { return fixture.snapshot; } if (tag === WS_METHODS.serverGetConfig) { diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index b4cfde03f5..a5a0d84f4a 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -75,7 +75,6 @@ const rpcClientMock = { subscribeLifecycle: vi.fn(), }, orchestration: { - getSnapshot: vi.fn(), getActiveSnapshot: vi.fn(), listArchivedThreads: vi.fn(), dispatchCommand: vi.fn(), diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index 99558a02f6..e9220b68d3 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -90,7 +90,6 @@ export function createWsNativeApi(): NativeApi { updateSettings: rpcClient.server.updateSettings, }, orchestration: { - getSnapshot: rpcClient.orchestration.getSnapshot, getActiveSnapshot: rpcClient.orchestration.getActiveSnapshot, listArchivedThreads: rpcClient.orchestration.listArchivedThreads, dispatchCommand: rpcClient.orchestration.dispatchCommand, diff --git a/apps/web/src/wsRpcClient.ts b/apps/web/src/wsRpcClient.ts index c02d4447bf..4509cbf666 100644 --- a/apps/web/src/wsRpcClient.ts +++ b/apps/web/src/wsRpcClient.ts @@ -86,7 +86,6 @@ export interface WsRpcClient { readonly subscribeLifecycle: RpcStreamMethod; }; readonly orchestration: { - readonly getSnapshot: RpcUnaryNoArgMethod; readonly getActiveSnapshot: RpcUnaryNoArgMethod< typeof ORCHESTRATION_WS_METHODS.getActiveSnapshot >; @@ -191,8 +190,6 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { transport.subscribe((client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener), }, orchestration: { - getSnapshot: () => - transport.request((client) => client[ORCHESTRATION_WS_METHODS.getSnapshot]({})), getActiveSnapshot: () => transport.request((client) => client[ORCHESTRATION_WS_METHODS.getActiveSnapshot]({})), dispatchCommand: (input) => diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 3724ebed43..944070a41f 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -174,7 +174,6 @@ export interface NativeApi { updateSettings: (patch: ServerSettingsPatch) => Promise; }; orchestration: { - getSnapshot: () => Promise; getActiveSnapshot: () => Promise; listArchivedThreads: () => Promise; dispatchCommand: (command: ClientOrchestrationCommand) => Promise<{ sequence: number }>; diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index ea58cb0efe..60926e4c33 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -16,7 +16,6 @@ import { } from "./baseSchemas"; export const ORCHESTRATION_WS_METHODS = { - getSnapshot: "orchestration.getSnapshot", getActiveSnapshot: "orchestration.getActiveSnapshot", dispatchCommand: "orchestration.dispatchCommand", getTurnDiff: "orchestration.getTurnDiff", @@ -982,11 +981,6 @@ export const DispatchResult = Schema.Struct({ }); export type DispatchResult = typeof DispatchResult.Type; -export const OrchestrationGetSnapshotInput = Schema.Struct({}); -export type OrchestrationGetSnapshotInput = typeof OrchestrationGetSnapshotInput.Type; -const OrchestrationGetSnapshotResult = OrchestrationReadModel; -export type OrchestrationGetSnapshotResult = typeof OrchestrationGetSnapshotResult.Type; - export const OrchestrationGetActiveSnapshotInput = Schema.Struct({}); export type OrchestrationGetActiveSnapshotInput = typeof OrchestrationGetActiveSnapshotInput.Type; const OrchestrationGetActiveSnapshotResult = OrchestrationReadModel; @@ -1039,10 +1033,6 @@ const OrchestrationReplayEventsResult = Schema.Array(OrchestrationEvent); export type OrchestrationReplayEventsResult = typeof OrchestrationReplayEventsResult.Type; export const OrchestrationRpcSchemas = { - getSnapshot: { - input: OrchestrationGetSnapshotInput, - output: OrchestrationGetSnapshotResult, - }, getActiveSnapshot: { input: OrchestrationGetActiveSnapshotInput, output: OrchestrationGetActiveSnapshotResult, @@ -1069,14 +1059,6 @@ export const OrchestrationRpcSchemas = { }, } as const; -export class OrchestrationGetSnapshotError extends Schema.TaggedErrorClass()( - "OrchestrationGetSnapshotError", - { - message: TrimmedNonEmptyString, - cause: Schema.optional(Schema.Defect), - }, -) {} - export class OrchestrationGetActiveSnapshotError extends Schema.TaggedErrorClass()( "OrchestrationGetActiveSnapshotError", { diff --git a/packages/contracts/src/rpc.ts b/packages/contracts/src/rpc.ts index 2fc8fb3375..3d25984c1f 100644 --- a/packages/contracts/src/rpc.ts +++ b/packages/contracts/src/rpc.ts @@ -35,8 +35,6 @@ import { OrchestrationGetActiveSnapshotInput, OrchestrationGetFullThreadDiffError, OrchestrationGetFullThreadDiffInput, - OrchestrationGetSnapshotError, - OrchestrationGetSnapshotInput, OrchestrationGetTurnDiffError, OrchestrationGetTurnDiffInput, OrchestrationListArchivedThreadsError, @@ -261,12 +259,6 @@ export const WsTerminalCloseRpc = Rpc.make(WS_METHODS.terminalClose, { error: TerminalError, }); -export const WsOrchestrationGetSnapshotRpc = Rpc.make(ORCHESTRATION_WS_METHODS.getSnapshot, { - payload: OrchestrationGetSnapshotInput, - success: OrchestrationRpcSchemas.getSnapshot.output, - error: OrchestrationGetSnapshotError, -}); - export const WsOrchestrationGetActiveSnapshotRpc = Rpc.make( ORCHESTRATION_WS_METHODS.getActiveSnapshot, { @@ -369,15 +361,14 @@ export const WsRpcGroup = RpcGroup.make( WsTerminalClearRpc, WsTerminalRestartRpc, WsTerminalCloseRpc, - WsSubscribeOrchestrationDomainEventsRpc, - WsSubscribeTerminalEventsRpc, - WsSubscribeServerConfigRpc, - WsSubscribeServerLifecycleRpc, - WsOrchestrationGetSnapshotRpc, WsOrchestrationGetActiveSnapshotRpc, WsOrchestrationDispatchCommandRpc, WsOrchestrationGetTurnDiffRpc, WsOrchestrationGetFullThreadDiffRpc, WsOrchestrationListArchivedThreadsRpc, WsOrchestrationReplayEventsRpc, + WsSubscribeOrchestrationDomainEventsRpc, + WsSubscribeTerminalEventsRpc, + WsSubscribeServerConfigRpc, + WsSubscribeServerLifecycleRpc, );