From 723472a6d1a1311713c8af5261aa21131df7c6a0 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 31 Mar 2026 22:00:01 +0100 Subject: [PATCH 1/6] Bootstrap orchestration engine from projections --- .../Layers/OrchestrationEngine.test.ts | 124 ++- .../Layers/OrchestrationEngine.ts | 43 +- .../Layers/ProjectionSnapshotQuery.ts | 718 ++++++++++-------- 3 files changed, 560 insertions(+), 325 deletions(-) diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 6aa889991e..c818aad3f9 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -9,18 +9,21 @@ import { type OrchestrationEvent, } from "@t3tools/contracts"; import { Effect, Layer, ManagedRuntime, Queue, Stream } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; import { describe, expect, it } from "vitest"; import { PersistenceSqlError } from "../../persistence/Errors.ts"; import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; +import { createEmptyReadModel, projectEvent } from "../projector.ts"; import { OrchestrationEventStore, type OrchestrationEventStoreShape, } from "../../persistence/Services/OrchestrationEventStore.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; +import { makeProjectionSnapshotQuery } from "./ProjectionSnapshotQuery.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; import { OrchestrationProjectionPipeline, @@ -28,6 +31,8 @@ import { } from "../Services/ProjectionPipeline.ts"; import { ServerConfig } from "../../config.ts"; import * as NodeServices from "@effect/platform-node/NodeServices"; +import * as NodeFileSystem from "@effect/platform-node/NodeFileSystem"; +import * as NodePath from "@effect/platform-node/NodePath"; const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value); const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value); @@ -38,19 +43,48 @@ async function createOrchestrationSystem() { const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), { prefix: "t3-orchestration-engine-test-", }); + const envLayer = Layer.merge( + ServerConfigLayer, + Layer.merge(NodeServices.layer, Layer.merge(NodeFileSystem.layer, NodePath.layer)), + ); + const sqlLayer = SqlitePersistenceMemory.pipe(Layer.provideMerge(envLayer)); + const runtimeDependencies = Layer.merge(envLayer, sqlLayer); + const eventStoreLayer = OrchestrationEventStoreLive.pipe(Layer.provideMerge(sqlLayer)); + const commandReceiptLayer = OrchestrationCommandReceiptRepositoryLive.pipe( + Layer.provideMerge(sqlLayer), + ); + const projectionPipelineLayer = OrchestrationProjectionPipelineLive.pipe( + Layer.provideMerge(eventStoreLayer), + Layer.provideMerge(runtimeDependencies), + ); const orchestrationLayer = OrchestrationEngineLive.pipe( - Layer.provide(OrchestrationProjectionPipelineLive), - Layer.provide(OrchestrationEventStoreLive), - Layer.provide(OrchestrationCommandReceiptRepositoryLive), - Layer.provide(SqlitePersistenceMemory), - Layer.provideMerge(ServerConfigLayer), - Layer.provideMerge(NodeServices.layer), + Layer.provideMerge(projectionPipelineLayer), + Layer.provideMerge(eventStoreLayer), + Layer.provideMerge(commandReceiptLayer), + Layer.provideMerge(sqlLayer), + ); + const runtimeLayer = Layer.merge(orchestrationLayer, Layer.merge(eventStoreLayer, sqlLayer)).pipe( + Layer.provideMerge(envLayer), + ); + const runtime = ManagedRuntime.make( + runtimeLayer as Layer.Layer< + Layer.Success, + Layer.Error, + never + >, ); - const runtime = ManagedRuntime.make(orchestrationLayer); const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const eventStore = await runtime.runPromise(Effect.service(OrchestrationEventStore)); return { engine, - run: (effect: Effect.Effect) => runtime.runPromise(effect), + eventStore, + run: ( + effect: Effect.Effect< + A, + E, + OrchestrationEngineService | OrchestrationEventStore | SqlClient.SqlClient + >, + ) => runtime.runPromise(effect), dispose: () => runtime.dispose(), }; } @@ -245,6 +279,80 @@ describe("OrchestrationEngine", () => { await system.dispose(); }); + it("hydrates the same read model from projections as full replay", async () => { + const system = await createOrchestrationSystem(); + const { engine, eventStore } = system; + const createdAt = now(); + + await system.run( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-snapshot-create"), + projectId: asProjectId("project-snapshot"), + title: "Snapshot Project", + workspaceRoot: "/tmp/project-snapshot", + defaultModelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + createdAt, + }), + ); + await system.run( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-snapshot-create"), + threadId: ThreadId.makeUnsafe("thread-snapshot"), + projectId: asProjectId("project-snapshot"), + title: "Snapshot Thread", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt, + }), + ); + await system.run( + engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-thread-snapshot-turn-start"), + threadId: ThreadId.makeUnsafe("thread-snapshot"), + message: { + messageId: asMessageId("msg-snapshot-1"), + role: "user", + text: "hello snapshot", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt, + }), + ); + + let replayedReadModel = createEmptyReadModel(new Date().toISOString()); + const replayedEvents = await system.run( + Stream.runCollect(eventStore.readAll()).pipe( + Effect.map((chunk): OrchestrationEvent[] => Array.from(chunk)), + ), + ); + for (const event of replayedEvents) { + replayedReadModel = await system.run(projectEvent(replayedReadModel, event)); + } + + const snapshotReadModel = await system.run( + Effect.flatMap(makeProjectionSnapshotQuery(), (snapshotQuery) => snapshotQuery.getSnapshot()), + ); + const engineReadModel = await system.run(engine.getReadModel()); + + expect(snapshotReadModel).toEqual(replayedReadModel); + expect(engineReadModel).toEqual(replayedReadModel); + await system.dispose(); + }); + it("streams persisted domain events in order", async () => { const system = await createOrchestrationSystem(); const { engine } = system; diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 5c52379f47..0800b24738 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -23,6 +23,7 @@ import { OrchestrationEngineService, type OrchestrationEngineShape, } from "../Services/OrchestrationEngine.ts"; +import { makeProjectionSnapshotQuery } from "./ProjectionSnapshotQuery.ts"; interface CommandEnvelope { command: OrchestrationCommand; @@ -54,6 +55,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { const eventStore = yield* OrchestrationEventStore; const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository; const projectionPipeline = yield* OrchestrationProjectionPipeline; + const projectionSnapshotQuery = yield* makeProjectionSnapshotQuery(); let readModel = createEmptyReadModel(new Date().toISOString()); @@ -194,13 +196,44 @@ const makeOrchestrationEngine = Effect.gen(function* () { ); }; + const bootstrapReadModelFromReplay = Effect.gen(function* () { + let replayedReadModel = createEmptyReadModel(new Date().toISOString()); + yield* Stream.runForEach(eventStore.readAll(), (event) => + Effect.gen(function* () { + replayedReadModel = yield* projectEvent(replayedReadModel, event); + }), + ); + return replayedReadModel; + }); + + const bootstrapReadModelFromProjectionSnapshot = Effect.gen(function* () { + const snapshot = yield* projectionSnapshotQuery.getSnapshot(); + if (snapshot.snapshotSequence === 0) { + const firstPersistedEvent = yield* Stream.runHead(eventStore.readFromSequence(0, 1)); + if (Option.isSome(firstPersistedEvent)) { + return yield* Effect.fail( + new Error("Projection snapshot is incomplete despite persisted orchestration events."), + ); + } + } + return snapshot; + }); + yield* projectionPipeline.bootstrap; - // bootstrap in-memory read model from event store - yield* Stream.runForEach(eventStore.readAll(), (event) => - Effect.gen(function* () { - readModel = yield* projectEvent(readModel, event); - }), + readModel = yield* bootstrapReadModelFromProjectionSnapshot.pipe( + Effect.catch((error) => + Effect.gen(function* () { + yield* Effect.logWarning( + "failed to bootstrap orchestration read model from projection snapshot; falling back to event replay", + ).pipe( + Effect.annotateLogs({ + detail: error instanceof Error ? error.message : String(error), + }), + ); + return yield* bootstrapReadModelFromReplay; + }), + ), ); const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope))); diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index f951c54b5b..585c97a245 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -43,6 +43,32 @@ import { type ProjectionSnapshotQueryShape, } from "../Services/ProjectionSnapshotQuery.ts"; +export type ProjectionSnapshotStageName = + | "listProjects" + | "listThreads" + | "listThreadMessages" + | "listThreadProposedPlans" + | "listThreadActivities" + | "listThreadSessions" + | "listCheckpoints" + | "listLatestTurns" + | "listProjectionState" + | "loadRows" + | "assembleMaps" + | "materializeProjects" + | "materializeThreads" + | "decodeReadModel" + | "total"; + +export type ProjectionSnapshotStageSample = { + readonly stage: ProjectionSnapshotStageName; + readonly durationMs: number; +}; + +type ProjectionSnapshotInstrumentation = { + readonly onStage?: (sample: ProjectionSnapshotStageSample) => void; +}; + const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel); const ProjectionProjectDbRowSchema = ProjectionProject.mapFields( Struct.assign({ @@ -135,14 +161,30 @@ function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: st : toPersistenceSqlError(sqlOperation)(cause); } -const makeProjectionSnapshotQuery = Effect.gen(function* () { - const sql = yield* SqlClient.SqlClient; +function recordStage( + instrumentation: ProjectionSnapshotInstrumentation | undefined, + stage: ProjectionSnapshotStageName, + durationMs: number, +) { + instrumentation?.onStage?.({ + stage, + durationMs, + }); +} + +function nowMs() { + return performance.now(); +} + +export const makeProjectionSnapshotQuery = (instrumentation?: ProjectionSnapshotInstrumentation) => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; - const listProjectRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionProjectDbRowSchema, - execute: () => - sql` + const listProjectRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionProjectDbRowSchema, + execute: () => + sql` SELECT project_id AS "projectId", title, @@ -155,13 +197,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_projects ORDER BY created_at ASC, project_id ASC `, - }); + }); - const listThreadRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionThreadDbRowSchema, - execute: () => - sql` + const listThreadRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadDbRowSchema, + execute: () => + sql` SELECT thread_id AS "threadId", project_id AS "projectId", @@ -179,13 +221,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_threads ORDER BY created_at ASC, thread_id ASC `, - }); + }); - const listThreadMessageRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionThreadMessageDbRowSchema, - execute: () => - sql` + const listThreadMessageRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadMessageDbRowSchema, + execute: () => + sql` SELECT message_id AS "messageId", thread_id AS "threadId", @@ -199,13 +241,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_thread_messages ORDER BY thread_id ASC, created_at ASC, message_id ASC `, - }); + }); - const listThreadProposedPlanRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionThreadProposedPlanDbRowSchema, - execute: () => - sql` + const listThreadProposedPlanRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadProposedPlanDbRowSchema, + execute: () => + sql` SELECT plan_id AS "planId", thread_id AS "threadId", @@ -218,13 +260,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_thread_proposed_plans ORDER BY thread_id ASC, created_at ASC, plan_id ASC `, - }); + }); - const listThreadActivityRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionThreadActivityDbRowSchema, - execute: () => - sql` + const listThreadActivityRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadActivityDbRowSchema, + execute: () => + sql` SELECT activity_id AS "activityId", thread_id AS "threadId", @@ -243,13 +285,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { created_at ASC, activity_id ASC `, - }); + }); - const listThreadSessionRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionThreadSessionDbRowSchema, - execute: () => - sql` + const listThreadSessionRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadSessionDbRowSchema, + execute: () => + sql` SELECT thread_id AS "threadId", status, @@ -263,13 +305,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { FROM projection_thread_sessions ORDER BY thread_id ASC `, - }); + }); - const listCheckpointRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionCheckpointDbRowSchema, - execute: () => - sql` + const listCheckpointRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionCheckpointDbRowSchema, + execute: () => + sql` SELECT thread_id AS "threadId", turn_id AS "turnId", @@ -283,13 +325,13 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { WHERE checkpoint_turn_count IS NOT NULL ORDER BY thread_id ASC, checkpoint_turn_count ASC `, - }); + }); - const listLatestTurnRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionLatestTurnDbRowSchema, - execute: () => - sql` + const listLatestTurnRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionLatestTurnDbRowSchema, + execute: () => + sql` SELECT thread_id AS "threadId", turn_id AS "turnId", @@ -304,301 +346,353 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { WHERE turn_id IS NOT NULL ORDER BY thread_id ASC, requested_at DESC, turn_id DESC `, - }); + }); - const listProjectionStateRows = SqlSchema.findAll({ - Request: Schema.Void, - Result: ProjectionStateDbRowSchema, - execute: () => - sql` + const listProjectionStateRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionStateDbRowSchema, + execute: () => + sql` SELECT projector, last_applied_sequence AS "lastAppliedSequence", updated_at AS "updatedAt" FROM projection_state `, - }); - - const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => - sql - .withTransaction( - Effect.gen(function* () { - const [ - projectRows, - threadRows, - messageRows, - proposedPlanRows, - activityRows, - sessionRows, - checkpointRows, - latestTurnRows, - stateRows, - ] = yield* Effect.all([ - listProjectRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listProjects:query", - "ProjectionSnapshotQuery.getSnapshot:listProjects:decodeRows", + }); + + const timed = ( + stage: ProjectionSnapshotStageName, + effect: Effect.Effect, + ): Effect.Effect => + Effect.gen(function* () { + const startedAt = nowMs(); + const result = yield* effect; + recordStage(instrumentation, stage, nowMs() - startedAt); + return result; + }); + + const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => + sql + .withTransaction( + Effect.gen(function* () { + const totalStartedAt = nowMs(); + const loadRowsStartedAt = nowMs(); + const [ + projectRows, + threadRows, + messageRows, + proposedPlanRows, + activityRows, + sessionRows, + checkpointRows, + latestTurnRows, + stateRows, + ] = yield* Effect.all([ + timed( + "listProjects", + listProjectRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listProjects:query", + "ProjectionSnapshotQuery.getSnapshot:listProjects:decodeRows", + ), + ), ), ), - ), - listThreadRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreads:query", - "ProjectionSnapshotQuery.getSnapshot:listThreads:decodeRows", + timed( + "listThreads", + listThreadRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreads:query", + "ProjectionSnapshotQuery.getSnapshot:listThreads:decodeRows", + ), + ), ), ), - ), - listThreadMessageRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", + timed( + "listThreadMessages", + listThreadMessageRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", + ), + ), ), ), - ), - listThreadProposedPlanRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:decodeRows", + timed( + "listThreadProposedPlans", + listThreadProposedPlanRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadProposedPlans:decodeRows", + ), + ), ), ), - ), - listThreadActivityRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", + timed( + "listThreadActivities", + listThreadActivityRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", + ), + ), ), ), - ), - listThreadSessionRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:decodeRows", + timed( + "listThreadSessions", + listThreadSessionRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadSessions:decodeRows", + ), + ), ), ), - ), - listCheckpointRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:query", - "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:decodeRows", + timed( + "listCheckpoints", + listCheckpointRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:query", + "ProjectionSnapshotQuery.getSnapshot:listCheckpoints:decodeRows", + ), + ), ), ), - ), - listLatestTurnRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:query", - "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:decodeRows", + timed( + "listLatestTurns", + listLatestTurnRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:query", + "ProjectionSnapshotQuery.getSnapshot:listLatestTurns:decodeRows", + ), + ), ), ), - ), - listProjectionStateRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listProjectionState:query", - "ProjectionSnapshotQuery.getSnapshot:listProjectionState:decodeRows", + timed( + "listProjectionState", + listProjectionStateRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listProjectionState:query", + "ProjectionSnapshotQuery.getSnapshot:listProjectionState:decodeRows", + ), + ), ), ), - ), - ]); - - const messagesByThread = new Map>(); - const proposedPlansByThread = new Map>(); - const activitiesByThread = new Map>(); - const checkpointsByThread = new Map>(); - const sessionsByThread = new Map(); - const latestTurnByThread = new Map(); - - let updatedAt: string | null = null; - - for (const row of projectRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - for (const row of threadRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - for (const row of stateRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - } - - for (const row of messageRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - const threadMessages = messagesByThread.get(row.threadId) ?? []; - threadMessages.push({ - id: row.messageId, - role: row.role, - text: row.text, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - turnId: row.turnId, - streaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }); - messagesByThread.set(row.threadId, threadMessages); - } - - for (const row of proposedPlanRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - const threadProposedPlans = proposedPlansByThread.get(row.threadId) ?? []; - threadProposedPlans.push({ - id: row.planId, - turnId: row.turnId, - planMarkdown: row.planMarkdown, - implementedAt: row.implementedAt, - implementationThreadId: row.implementationThreadId, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }); - proposedPlansByThread.set(row.threadId, threadProposedPlans); - } - - for (const row of activityRows) { - updatedAt = maxIso(updatedAt, row.createdAt); - const threadActivities = activitiesByThread.get(row.threadId) ?? []; - threadActivities.push({ - id: row.activityId, - tone: row.tone, - kind: row.kind, - summary: row.summary, - payload: row.payload, - turnId: row.turnId, - ...(row.sequence !== null ? { sequence: row.sequence } : {}), - createdAt: row.createdAt, - }); - activitiesByThread.set(row.threadId, threadActivities); - } - - for (const row of checkpointRows) { - updatedAt = maxIso(updatedAt, row.completedAt); - const threadCheckpoints = checkpointsByThread.get(row.threadId) ?? []; - threadCheckpoints.push({ - turnId: row.turnId, - checkpointTurnCount: row.checkpointTurnCount, - checkpointRef: row.checkpointRef, - status: row.status, - files: row.files, - assistantMessageId: row.assistantMessageId, - completedAt: row.completedAt, - }); - checkpointsByThread.set(row.threadId, threadCheckpoints); - } - - for (const row of latestTurnRows) { - updatedAt = maxIso(updatedAt, row.requestedAt); - if (row.startedAt !== null) { - updatedAt = maxIso(updatedAt, row.startedAt); + ]); + recordStage(instrumentation, "loadRows", nowMs() - loadRowsStartedAt); + + const messagesByThread = new Map>(); + const proposedPlansByThread = new Map>(); + const activitiesByThread = new Map>(); + const checkpointsByThread = new Map>(); + const sessionsByThread = new Map(); + const latestTurnByThread = new Map(); + + let updatedAt: string | null = null; + const assembleMapsStartedAt = nowMs(); + + for (const row of projectRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (const row of threadRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } + for (const row of stateRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); } - if (row.completedAt !== null) { + + for (const row of messageRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadMessages = messagesByThread.get(row.threadId) ?? []; + threadMessages.push({ + id: row.messageId, + role: row.role, + text: row.text, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + turnId: row.turnId, + streaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + messagesByThread.set(row.threadId, threadMessages); + } + + for (const row of proposedPlanRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadProposedPlans = proposedPlansByThread.get(row.threadId) ?? []; + threadProposedPlans.push({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + proposedPlansByThread.set(row.threadId, threadProposedPlans); + } + + for (const row of activityRows) { + updatedAt = maxIso(updatedAt, row.createdAt); + const threadActivities = activitiesByThread.get(row.threadId) ?? []; + threadActivities.push({ + id: row.activityId, + tone: row.tone, + kind: row.kind, + summary: row.summary, + payload: row.payload, + turnId: row.turnId, + ...(row.sequence !== null ? { sequence: row.sequence } : {}), + createdAt: row.createdAt, + }); + activitiesByThread.set(row.threadId, threadActivities); + } + + for (const row of checkpointRows) { updatedAt = maxIso(updatedAt, row.completedAt); + const threadCheckpoints = checkpointsByThread.get(row.threadId) ?? []; + threadCheckpoints.push({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + }); + checkpointsByThread.set(row.threadId, threadCheckpoints); + } + + for (const row of latestTurnRows) { + updatedAt = maxIso(updatedAt, row.requestedAt); + if (row.startedAt !== null) { + updatedAt = maxIso(updatedAt, row.startedAt); + } + if (row.completedAt !== null) { + updatedAt = maxIso(updatedAt, row.completedAt); + } + if (latestTurnByThread.has(row.threadId)) { + continue; + } + latestTurnByThread.set(row.threadId, { + turnId: row.turnId, + state: + row.state === "error" + ? "error" + : row.state === "interrupted" + ? "interrupted" + : row.state === "completed" + ? "completed" + : "running", + requestedAt: row.requestedAt, + startedAt: row.startedAt, + completedAt: row.completedAt, + assistantMessageId: row.assistantMessageId, + ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null + ? { + sourceProposedPlan: { + threadId: row.sourceProposedPlanThreadId, + planId: row.sourceProposedPlanId, + }, + } + : {}), + }); } - if (latestTurnByThread.has(row.threadId)) { - continue; + + for (const row of sessionRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + sessionsByThread.set(row.threadId, { + threadId: row.threadId, + status: row.status, + providerName: row.providerName, + runtimeMode: row.runtimeMode, + activeTurnId: row.activeTurnId, + lastError: row.lastError, + updatedAt: row.updatedAt, + }); } - latestTurnByThread.set(row.threadId, { - turnId: row.turnId, - state: - row.state === "error" - ? "error" - : row.state === "interrupted" - ? "interrupted" - : row.state === "completed" - ? "completed" - : "running", - requestedAt: row.requestedAt, - startedAt: row.startedAt, - completedAt: row.completedAt, - assistantMessageId: row.assistantMessageId, - ...(row.sourceProposedPlanThreadId !== null && row.sourceProposedPlanId !== null - ? { - sourceProposedPlan: { - threadId: row.sourceProposedPlanThreadId, - planId: row.sourceProposedPlanId, - }, - } - : {}), - }); - } - - for (const row of sessionRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - sessionsByThread.set(row.threadId, { - threadId: row.threadId, - status: row.status, - providerName: row.providerName, + recordStage(instrumentation, "assembleMaps", nowMs() - assembleMapsStartedAt); + + const projectsStartedAt = nowMs(); + const projects: ReadonlyArray = projectRows.map((row) => ({ + id: row.projectId, + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModelSelection: row.defaultModelSelection, + scripts: row.scripts, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + })); + recordStage(instrumentation, "materializeProjects", nowMs() - projectsStartedAt); + + const threadsStartedAt = nowMs(); + const threads: ReadonlyArray = threadRows.map((row) => ({ + id: row.threadId, + projectId: row.projectId, + title: row.title, + modelSelection: row.modelSelection, runtimeMode: row.runtimeMode, - activeTurnId: row.activeTurnId, - lastError: row.lastError, + interactionMode: row.interactionMode, + branch: row.branch, + worktreePath: row.worktreePath, + latestTurn: latestTurnByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, updatedAt: row.updatedAt, - }); - } - - const projects: ReadonlyArray = projectRows.map((row) => ({ - id: row.projectId, - title: row.title, - workspaceRoot: row.workspaceRoot, - defaultModelSelection: row.defaultModelSelection, - scripts: row.scripts, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - deletedAt: row.deletedAt, - })); - - const threads: ReadonlyArray = threadRows.map((row) => ({ - id: row.threadId, - projectId: row.projectId, - title: row.title, - modelSelection: row.modelSelection, - runtimeMode: row.runtimeMode, - interactionMode: row.interactionMode, - branch: row.branch, - worktreePath: row.worktreePath, - latestTurn: latestTurnByThread.get(row.threadId) ?? null, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - archivedAt: row.archivedAt, - deletedAt: row.deletedAt, - messages: messagesByThread.get(row.threadId) ?? [], - proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], - activities: activitiesByThread.get(row.threadId) ?? [], - checkpoints: checkpointsByThread.get(row.threadId) ?? [], - session: sessionsByThread.get(row.threadId) ?? null, - })); - - const snapshot = { - snapshotSequence: computeSnapshotSequence(stateRows), - projects, - threads, - updatedAt: updatedAt ?? new Date(0).toISOString(), - }; - - return yield* decodeReadModel(snapshot).pipe( - Effect.mapError( - toPersistenceDecodeError("ProjectionSnapshotQuery.getSnapshot:decodeReadModel"), - ), - ); - }), - ) - .pipe( - Effect.mapError((error) => { - if (isPersistenceError(error)) { - return error; - } - return toPersistenceSqlError("ProjectionSnapshotQuery.getSnapshot:query")(error); - }), - ); - - return { - getSnapshot, - } satisfies ProjectionSnapshotQueryShape; -}); + archivedAt: row.archivedAt, + deletedAt: row.deletedAt, + messages: messagesByThread.get(row.threadId) ?? [], + proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + activities: activitiesByThread.get(row.threadId) ?? [], + checkpoints: checkpointsByThread.get(row.threadId) ?? [], + session: sessionsByThread.get(row.threadId) ?? null, + })); + recordStage(instrumentation, "materializeThreads", nowMs() - threadsStartedAt); + + const snapshot = { + snapshotSequence: computeSnapshotSequence(stateRows), + projects, + threads, + updatedAt: updatedAt ?? new Date(0).toISOString(), + }; + + const decoded = yield* timed( + "decodeReadModel", + decodeReadModel(snapshot).pipe( + Effect.mapError( + toPersistenceDecodeError("ProjectionSnapshotQuery.getSnapshot:decodeReadModel"), + ), + ), + ); + recordStage(instrumentation, "total", nowMs() - totalStartedAt); + return decoded; + }), + ) + .pipe( + Effect.mapError((error) => { + if (isPersistenceError(error)) { + return error; + } + return toPersistenceSqlError("ProjectionSnapshotQuery.getSnapshot:query")(error); + }), + ); + + return { + getSnapshot, + } satisfies ProjectionSnapshotQueryShape; + }); export const OrchestrationProjectionSnapshotQueryLive = Layer.effect( ProjectionSnapshotQuery, - makeProjectionSnapshotQuery, + makeProjectionSnapshotQuery(), ); From e06c7256f330ae5fe285ea496daa488aa4df3bd8 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 31 Mar 2026 22:25:42 +0100 Subject: [PATCH 2/6] Trim engine bootstrap thread activities --- .../orchestration/Layers/OrchestrationEngine.test.ts | 11 +++++++++-- .../src/orchestration/Layers/OrchestrationEngine.ts | 9 ++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index c818aad3f9..54c4b698f5 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -279,7 +279,7 @@ describe("OrchestrationEngine", () => { await system.dispose(); }); - it("hydrates the same read model from projections as full replay", async () => { + it("hydrates engine bootstrap state from projections without thread activities", async () => { const system = await createOrchestrationSystem(); const { engine, eventStore } = system; const createdAt = now(); @@ -347,9 +347,16 @@ describe("OrchestrationEngine", () => { Effect.flatMap(makeProjectionSnapshotQuery(), (snapshotQuery) => snapshotQuery.getSnapshot()), ); const engineReadModel = await system.run(engine.getReadModel()); + const replayedReadModelWithoutActivities = { + ...replayedReadModel, + threads: replayedReadModel.threads.map((thread) => ({ + ...thread, + activities: [], + })), + }; expect(snapshotReadModel).toEqual(replayedReadModel); - expect(engineReadModel).toEqual(replayedReadModel); + expect(engineReadModel).toEqual(replayedReadModelWithoutActivities); await system.dispose(); }); diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 0800b24738..63869b88cf 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -216,7 +216,14 @@ const makeOrchestrationEngine = Effect.gen(function* () { ); } } - return snapshot; + return { + ...snapshot, + threads: snapshot.threads.map((thread) => + Object.assign({}, thread, { + activities: [], + }), + ), + }; }); yield* projectionPipeline.bootstrap; From 41e8e77114121b356f26f970403e071cc2c67cce Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 31 Mar 2026 22:52:32 +0100 Subject: [PATCH 3/6] Use startup-specific projection queries --- .../Layers/ProjectionStartupQuery.ts | 136 ++++++++++++++++++ .../Services/ProjectionStartupQuery.ts | 52 +++++++ apps/server/src/server.ts | 2 + apps/server/src/serverRuntimeStartup.ts | 33 ++--- 4 files changed, 205 insertions(+), 18 deletions(-) create mode 100644 apps/server/src/orchestration/Layers/ProjectionStartupQuery.ts create mode 100644 apps/server/src/orchestration/Services/ProjectionStartupQuery.ts diff --git a/apps/server/src/orchestration/Layers/ProjectionStartupQuery.ts b/apps/server/src/orchestration/Layers/ProjectionStartupQuery.ts new file mode 100644 index 0000000000..236a33711d --- /dev/null +++ b/apps/server/src/orchestration/Layers/ProjectionStartupQuery.ts @@ -0,0 +1,136 @@ +import { ModelSelection, ProjectId, ThreadId } from "@t3tools/contracts"; +import { Effect, Layer, Option, Schema } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; + +import { toPersistenceSqlError, type ProjectionRepositoryError } from "../../persistence/Errors.ts"; +import { + GetProjectionAutoBootstrapStateInput, + ProjectionStartupQuery, + type ProjectionAutoBootstrapState, + type ProjectionStartupQueryShape, +} from "../Services/ProjectionStartupQuery.ts"; + +const ProjectionStartupCountsRow = Schema.Struct({ + projectCount: Schema.Number, + threadCount: Schema.Number, +}); + +const ProjectionStartupProjectRow = Schema.Struct({ + projectId: ProjectId, + defaultModelSelection: Schema.NullOr(Schema.fromJsonString(ModelSelection)), +}); + +const ProjectionStartupThreadRow = Schema.Struct({ + threadId: ThreadId, +}); + +const EMPTY_AUTO_BOOTSTRAP_STATE: ProjectionAutoBootstrapState = { + project: null, + threadId: null, +}; + +function toStartupQuerySqlError(operation: string) { + return (cause: unknown): ProjectionRepositoryError => toPersistenceSqlError(operation)(cause); +} + +const makeProjectionStartupQuery = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const readStartupCountsRow = SqlSchema.findOne({ + Request: Schema.Void, + Result: ProjectionStartupCountsRow, + execute: () => + sql` + SELECT + ( + SELECT COUNT(*) + FROM projection_projects + WHERE deleted_at IS NULL + ) AS "projectCount", + ( + SELECT COUNT(*) + FROM projection_threads + WHERE deleted_at IS NULL + ) AS "threadCount" + `, + }); + + const findAutoBootstrapProjectRow = SqlSchema.findOneOption({ + Request: GetProjectionAutoBootstrapStateInput, + Result: ProjectionStartupProjectRow, + execute: ({ workspaceRoot }) => + sql` + SELECT + project_id AS "projectId", + default_model_selection_json AS "defaultModelSelection" + FROM projection_projects + WHERE workspace_root = ${workspaceRoot} + AND deleted_at IS NULL + ORDER BY created_at ASC, project_id ASC + LIMIT 1 + `, + }); + + const findAutoBootstrapThreadRow = SqlSchema.findOneOption({ + Request: Schema.Struct({ projectId: ProjectId }), + Result: ProjectionStartupThreadRow, + execute: ({ projectId }) => + sql` + SELECT + thread_id AS "threadId" + FROM projection_threads + WHERE project_id = ${projectId} + AND deleted_at IS NULL + ORDER BY created_at ASC, thread_id ASC + LIMIT 1 + `, + }); + + const getStartupCounts: ProjectionStartupQueryShape["getStartupCounts"] = () => + readStartupCountsRow(undefined).pipe( + Effect.mapError(toStartupQuerySqlError("ProjectionStartupQuery.getStartupCounts:query")), + ); + + const getAutoBootstrapState: ProjectionStartupQueryShape["getAutoBootstrapState"] = (input) => + Effect.gen(function* () { + const projectOption = yield* findAutoBootstrapProjectRow(input).pipe( + Effect.mapError( + toStartupQuerySqlError("ProjectionStartupQuery.getAutoBootstrapState:projectQuery"), + ), + ); + + if (Option.isNone(projectOption)) { + return EMPTY_AUTO_BOOTSTRAP_STATE; + } + + const threadOption = yield* findAutoBootstrapThreadRow({ + projectId: projectOption.value.projectId, + }).pipe( + Effect.mapError( + toStartupQuerySqlError("ProjectionStartupQuery.getAutoBootstrapState:threadQuery"), + ), + ); + + return { + project: { + id: projectOption.value.projectId, + defaultModelSelection: projectOption.value.defaultModelSelection, + }, + threadId: Option.match(threadOption, { + onNone: () => null, + onSome: (row) => row.threadId, + }), + } satisfies ProjectionAutoBootstrapState; + }); + + return { + getStartupCounts, + getAutoBootstrapState, + } satisfies ProjectionStartupQueryShape; +}); + +export const OrchestrationProjectionStartupQueryLive = Layer.effect( + ProjectionStartupQuery, + makeProjectionStartupQuery, +); diff --git a/apps/server/src/orchestration/Services/ProjectionStartupQuery.ts b/apps/server/src/orchestration/Services/ProjectionStartupQuery.ts new file mode 100644 index 0000000000..0a350fa7e5 --- /dev/null +++ b/apps/server/src/orchestration/Services/ProjectionStartupQuery.ts @@ -0,0 +1,52 @@ +/** + * ProjectionStartupQuery - Lightweight projection queries used during startup. + * + * Avoids building the full orchestration snapshot for small startup lookups + * like heartbeat counts and auto-bootstrap existence checks. + * + * @module ProjectionStartupQuery + */ +import { ModelSelection, ProjectId, ThreadId } from "@t3tools/contracts"; +import { Schema, ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; + +export const ProjectionStartupCounts = Schema.Struct({ + projectCount: Schema.Number, + threadCount: Schema.Number, +}); +export type ProjectionStartupCounts = typeof ProjectionStartupCounts.Type; + +export const GetProjectionAutoBootstrapStateInput = Schema.Struct({ + workspaceRoot: Schema.String, +}); +export type GetProjectionAutoBootstrapStateInput = typeof GetProjectionAutoBootstrapStateInput.Type; + +export const ProjectionAutoBootstrapProject = Schema.Struct({ + id: ProjectId, + defaultModelSelection: Schema.NullOr(ModelSelection), +}); +export type ProjectionAutoBootstrapProject = typeof ProjectionAutoBootstrapProject.Type; + +export const ProjectionAutoBootstrapState = Schema.Struct({ + project: Schema.NullOr(ProjectionAutoBootstrapProject), + threadId: Schema.NullOr(ThreadId), +}); +export type ProjectionAutoBootstrapState = typeof ProjectionAutoBootstrapState.Type; + +export interface ProjectionStartupQueryShape { + readonly getStartupCounts: () => Effect.Effect< + ProjectionStartupCounts, + ProjectionRepositoryError + >; + + readonly getAutoBootstrapState: ( + input: GetProjectionAutoBootstrapStateInput, + ) => Effect.Effect; +} + +export class ProjectionStartupQuery extends ServiceMap.Service< + ProjectionStartupQuery, + ProjectionStartupQueryShape +>()("t3/orchestration/Services/ProjectionStartupQuery") {} diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 40a8eb09bc..5e53eb93a6 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -22,6 +22,7 @@ import { OrchestrationEventStoreLive } from "./persistence/Layers/OrchestrationE import { OrchestrationCommandReceiptRepositoryLive } from "./persistence/Layers/OrchestrationCommandReceipts"; import { CheckpointDiffQueryLive } from "./checkpointing/Layers/CheckpointDiffQuery"; import { OrchestrationProjectionSnapshotQueryLive } from "./orchestration/Layers/ProjectionSnapshotQuery"; +import { OrchestrationProjectionStartupQueryLive } from "./orchestration/Layers/ProjectionStartupQuery"; import { CheckpointStoreLive } from "./checkpointing/Layers/CheckpointStore"; import { GitCoreLive } from "./git/Layers/GitCore"; import { GitHubCliLive } from "./git/Layers/GitHubCli"; @@ -100,6 +101,7 @@ const ReactorLayerLive = Layer.empty.pipe( ); const OrchestrationLayerLive = Layer.empty.pipe( + Layer.provideMerge(OrchestrationProjectionStartupQueryLive), Layer.provideMerge(OrchestrationProjectionSnapshotQueryLive), Layer.provideMerge(OrchestrationEngineLive), Layer.provideMerge(OrchestrationProjectionPipelineLive), diff --git a/apps/server/src/serverRuntimeStartup.ts b/apps/server/src/serverRuntimeStartup.ts index 2457f6ffe8..1aed8c9203 100644 --- a/apps/server/src/serverRuntimeStartup.ts +++ b/apps/server/src/serverRuntimeStartup.ts @@ -11,7 +11,7 @@ import { ServerConfig } from "./config"; import { Keybindings } from "./keybindings"; import { Open } from "./open"; import { OrchestrationEngineService } from "./orchestration/Services/OrchestrationEngine"; -import { ProjectionSnapshotQuery } from "./orchestration/Services/ProjectionSnapshotQuery"; +import { ProjectionStartupQuery } from "./orchestration/Services/ProjectionStartupQuery"; import { OrchestrationReactor } from "./orchestration/Services/OrchestrationReactor"; import { ServerLifecycleEvents } from "./serverLifecycleEvents"; import { ServerSettingsService } from "./serverSettings"; @@ -107,15 +107,11 @@ export const makeCommandGate = Effect.gen(function* () { const recordStartupHeartbeat = Effect.gen(function* () { const analytics = yield* AnalyticsService; - const projectionSnapshotQuery = yield* ProjectionSnapshotQuery; + const projectionStartupQuery = yield* ProjectionStartupQuery; - const { threadCount, projectCount } = yield* projectionSnapshotQuery.getSnapshot().pipe( - Effect.map((snapshot) => ({ - threadCount: snapshot.threads.length, - projectCount: snapshot.projects.length, - })), + const { threadCount, projectCount } = yield* projectionStartupQuery.getStartupCounts().pipe( Effect.catch((cause) => - Effect.logWarning("failed to gather startup snapshot for telemetry", { cause }).pipe( + Effect.logWarning("failed to gather startup counts for telemetry", { cause }).pipe( Effect.as({ threadCount: 0, projectCount: 0, @@ -132,7 +128,7 @@ const recordStartupHeartbeat = Effect.gen(function* () { const autoBootstrapWelcome = Effect.gen(function* () { const serverConfig = yield* ServerConfig; - const projectionReadModelQuery = yield* ProjectionSnapshotQuery; + const projectionStartupQuery = yield* ProjectionStartupQuery; const orchestrationEngine = yield* OrchestrationEngineService; const path = yield* Path.Path; @@ -141,10 +137,10 @@ const autoBootstrapWelcome = Effect.gen(function* () { if (serverConfig.autoBootstrapProjectFromCwd) { yield* Effect.gen(function* () { - const snapshot = yield* projectionReadModelQuery.getSnapshot(); - const existingProject = snapshot.projects.find( - (project) => project.workspaceRoot === serverConfig.cwd && project.deletedAt === null, - ); + const autoBootstrapState = yield* projectionStartupQuery.getAutoBootstrapState({ + workspaceRoot: serverConfig.cwd, + }); + const existingProject = autoBootstrapState.project; let nextProjectId: ProjectId; let nextProjectDefaultModelSelection: ModelSelection; @@ -173,10 +169,11 @@ const autoBootstrapWelcome = Effect.gen(function* () { }; } - const existingThread = snapshot.threads.find( - (thread) => thread.projectId === nextProjectId && thread.deletedAt === null, - ); - if (!existingThread) { + const existingThreadId = + existingProject && existingProject.id === nextProjectId + ? autoBootstrapState.threadId + : null; + if (existingThreadId === null) { const createdAt = new Date().toISOString(); const createdThreadId = ThreadId.makeUnsafe(crypto.randomUUID()); yield* orchestrationEngine.dispatch({ @@ -196,7 +193,7 @@ const autoBootstrapWelcome = Effect.gen(function* () { bootstrapThreadId = createdThreadId; } else { bootstrapProjectId = nextProjectId; - bootstrapThreadId = existingThread.id; + bootstrapThreadId = existingThreadId; } }); } From 4bdf2543d7ff61a27b95d022ac1628e12edc775b Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 31 Mar 2026 23:08:15 +0100 Subject: [PATCH 4/6] Skip bootstrap activity loading --- .../Layers/OrchestrationEngine.ts | 13 ++--- .../Layers/ProjectionSnapshotQuery.ts | 58 +++++++++++-------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index 63869b88cf..dab5736beb 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -55,7 +55,9 @@ const makeOrchestrationEngine = Effect.gen(function* () { const eventStore = yield* OrchestrationEventStore; const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository; const projectionPipeline = yield* OrchestrationProjectionPipeline; - const projectionSnapshotQuery = yield* makeProjectionSnapshotQuery(); + const projectionSnapshotQuery = yield* makeProjectionSnapshotQuery(undefined, { + includeActivities: false, + }); let readModel = createEmptyReadModel(new Date().toISOString()); @@ -216,14 +218,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { ); } } - return { - ...snapshot, - threads: snapshot.threads.map((thread) => - Object.assign({}, thread, { - activities: [], - }), - ), - }; + return snapshot; }); yield* projectionPipeline.bootstrap; diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 585c97a245..7a4d6e10c6 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -69,6 +69,10 @@ type ProjectionSnapshotInstrumentation = { readonly onStage?: (sample: ProjectionSnapshotStageSample) => void; }; +type ProjectionSnapshotQueryOptions = { + readonly includeActivities?: boolean; +}; + const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel); const ProjectionProjectDbRowSchema = ProjectionProject.mapFields( Struct.assign({ @@ -176,9 +180,13 @@ function nowMs() { return performance.now(); } -export const makeProjectionSnapshotQuery = (instrumentation?: ProjectionSnapshotInstrumentation) => +export const makeProjectionSnapshotQuery = ( + instrumentation?: ProjectionSnapshotInstrumentation, + options?: ProjectionSnapshotQueryOptions, +) => Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; + const includeActivities = options?.includeActivities ?? true; const listProjectRows = SqlSchema.findAll({ Request: Schema.Void, @@ -435,14 +443,16 @@ export const makeProjectionSnapshotQuery = (instrumentation?: ProjectionSnapshot ), timed( "listThreadActivities", - listThreadActivityRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", - ), - ), - ), + includeActivities + ? listThreadActivityRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadActivities:decodeRows", + ), + ), + ) + : Effect.succeed([]), ), timed( "listThreadSessions", @@ -542,20 +552,22 @@ export const makeProjectionSnapshotQuery = (instrumentation?: ProjectionSnapshot proposedPlansByThread.set(row.threadId, threadProposedPlans); } - for (const row of activityRows) { - updatedAt = maxIso(updatedAt, row.createdAt); - const threadActivities = activitiesByThread.get(row.threadId) ?? []; - threadActivities.push({ - id: row.activityId, - tone: row.tone, - kind: row.kind, - summary: row.summary, - payload: row.payload, - turnId: row.turnId, - ...(row.sequence !== null ? { sequence: row.sequence } : {}), - createdAt: row.createdAt, - }); - activitiesByThread.set(row.threadId, threadActivities); + if (includeActivities) { + for (const row of activityRows) { + updatedAt = maxIso(updatedAt, row.createdAt); + const threadActivities = activitiesByThread.get(row.threadId) ?? []; + threadActivities.push({ + id: row.activityId, + tone: row.tone, + kind: row.kind, + summary: row.summary, + payload: row.payload, + turnId: row.turnId, + ...(row.sequence !== null ? { sequence: row.sequence } : {}), + createdAt: row.createdAt, + }); + activitiesByThread.set(row.threadId, threadActivities); + } } for (const row of checkpointRows) { From 892ca20a8ab4b0acb7e4ebf56f28250908c010f0 Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Tue, 31 Mar 2026 23:40:34 +0100 Subject: [PATCH 5/6] Bootstrap UI from compact orchestration snapshot --- .../Layers/CheckpointDiffQuery.test.ts | 8 ++ .../Layers/ProjectionSnapshotQuery.ts | 68 ++++++++++------- .../Services/ProjectionSnapshotQuery.ts | 11 +++ apps/server/src/ws.ts | 10 +++ apps/web/src/routes/__root.tsx | 75 +++++++++++++++++-- apps/web/src/wsNativeApi.ts | 1 + apps/web/src/wsRpcClient.ts | 5 ++ packages/contracts/src/ipc.ts | 2 + packages/contracts/src/orchestration.ts | 12 +++ packages/contracts/src/rpc.ts | 11 +++ 10 files changed, 173 insertions(+), 30 deletions(-) diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts index 9fb2500ce4..b0b03a2f0a 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts @@ -125,6 +125,7 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { + getBootstrapSnapshot: () => Effect.succeed(snapshot), getSnapshot: () => Effect.succeed(snapshot), }), ), @@ -174,6 +175,13 @@ describe("CheckpointDiffQueryLive", () => { Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)), Layer.provideMerge( Layer.succeed(ProjectionSnapshotQuery, { + getBootstrapSnapshot: () => + Effect.succeed({ + snapshotSequence: 0, + projects: [], + threads: [], + updatedAt: "2026-01-01T00:00:00.000Z", + } satisfies OrchestrationReadModel), getSnapshot: () => Effect.succeed({ snapshotSequence: 0, diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 7a4d6e10c6..0bc437b65a 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -70,6 +70,7 @@ type ProjectionSnapshotInstrumentation = { }; type ProjectionSnapshotQueryOptions = { + readonly includeMessages?: boolean; readonly includeActivities?: boolean; }; @@ -186,7 +187,6 @@ export const makeProjectionSnapshotQuery = ( ) => Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; - const includeActivities = options?.includeActivities ?? true; const listProjectRows = SqlSchema.findAll({ Request: Schema.Void, @@ -380,8 +380,11 @@ export const makeProjectionSnapshotQuery = ( return result; }); - const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => - sql + const runSnapshotQuery = (queryOptions?: ProjectionSnapshotQueryOptions) => { + const includeMessages = queryOptions?.includeMessages ?? true; + const includeActivities = queryOptions?.includeActivities ?? true; + + return sql .withTransaction( Effect.gen(function* () { const totalStartedAt = nowMs(); @@ -421,14 +424,16 @@ export const makeProjectionSnapshotQuery = ( ), timed( "listThreadMessages", - listThreadMessageRows(undefined).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", - "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", - ), - ), - ), + includeMessages + ? listThreadMessageRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadMessages:decodeRows", + ), + ), + ) + : Effect.succeed([]), ), timed( "listThreadProposedPlans", @@ -521,20 +526,22 @@ export const makeProjectionSnapshotQuery = ( updatedAt = maxIso(updatedAt, row.updatedAt); } - for (const row of messageRows) { - updatedAt = maxIso(updatedAt, row.updatedAt); - const threadMessages = messagesByThread.get(row.threadId) ?? []; - threadMessages.push({ - id: row.messageId, - role: row.role, - text: row.text, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - turnId: row.turnId, - streaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - }); - messagesByThread.set(row.threadId, threadMessages); + if (includeMessages) { + for (const row of messageRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadMessages = messagesByThread.get(row.threadId) ?? []; + threadMessages.push({ + id: row.messageId, + role: row.role, + text: row.text, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), + turnId: row.turnId, + streaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + messagesByThread.set(row.threadId, threadMessages); + } } for (const row of proposedPlanRows) { @@ -698,8 +705,19 @@ export const makeProjectionSnapshotQuery = ( return toPersistenceSqlError("ProjectionSnapshotQuery.getSnapshot:query")(error); }), ); + }; + + const getSnapshot: ProjectionSnapshotQueryShape["getSnapshot"] = () => + runSnapshotQuery(options); + + const getBootstrapSnapshot: ProjectionSnapshotQueryShape["getBootstrapSnapshot"] = () => + runSnapshotQuery({ + includeMessages: false, + includeActivities: false, + }); return { + getBootstrapSnapshot, getSnapshot, } satisfies ProjectionSnapshotQueryShape; }); diff --git a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts index 91e42f02ff..d2ee5b7bce 100644 --- a/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Services/ProjectionSnapshotQuery.ts @@ -16,6 +16,17 @@ import type { ProjectionRepositoryError } from "../../persistence/Errors.ts"; * ProjectionSnapshotQueryShape - Service API for read-model snapshots. */ export interface ProjectionSnapshotQueryShape { + /** + * Read a lightweight bootstrap snapshot for initial UI hydration. + * + * Omits heavyweight per-thread history collections that are not needed for + * the first shell/sidebar paint. + */ + readonly getBootstrapSnapshot: () => Effect.Effect< + OrchestrationReadModel, + ProjectionRepositoryError + >; + /** * Read the latest orchestration projection snapshot. * diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index cff7e26efa..8eaf0af258 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -72,6 +72,16 @@ const WsRpcLayer = WsRpcGroup.toLayer( }); return WsRpcGroup.of({ + [ORCHESTRATION_WS_METHODS.getBootstrapSnapshot]: (_input) => + projectionSnapshotQuery.getBootstrapSnapshot().pipe( + Effect.mapError( + (cause) => + new OrchestrationGetSnapshotError({ + message: "Failed to load orchestration bootstrap snapshot", + cause, + }), + ), + ), [ORCHESTRATION_WS_METHODS.getSnapshot]: (_input) => projectionSnapshotQuery.getSnapshot().pipe( Effect.mapError( diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 368c595a4b..8442921064 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -1,5 +1,6 @@ import { OrchestrationEvent, + type OrchestrationReadModel, ThreadId, type ServerLifecycleWelcomePayload, } from "@t3tools/contracts"; @@ -269,6 +270,8 @@ function EventRouter() { disposedRef.current = false; const recovery = createOrchestrationRecoveryCoordinator(); let needsProviderInvalidation = false; + let fullSnapshotHydrationQueued = false; + let fullSnapshotHydrationTimer: ReturnType | null = null; const reconcileSnapshotDerivedState = () => { const threads = useStore.getState().threads; @@ -379,13 +382,23 @@ function EventRouter() { } }; - const runSnapshotRecovery = async (reason: "bootstrap" | "replay-failed"): Promise => { + const clearFullSnapshotHydrationTimer = () => { + if (fullSnapshotHydrationTimer !== null) { + globalThis.clearTimeout(fullSnapshotHydrationTimer); + fullSnapshotHydrationTimer = null; + } + }; + + const runSnapshotRecovery = async ( + reason: "bootstrap" | "replay-failed", + fetchSnapshot: () => Promise, + ): Promise<"applied" | "deferred" | "failed"> => { if (!recovery.beginSnapshotRecovery(reason)) { - return; + return "deferred"; } try { - const snapshot = await api.orchestration.getSnapshot(); + const snapshot = await fetchSnapshot(); if (!disposed) { syncServerReadModel(snapshot); reconcileSnapshotDerivedState(); @@ -396,16 +409,66 @@ function EventRouter() { } catch { // Keep prior state and wait for welcome or a later replay attempt. recovery.failSnapshotRecovery(); + return "failed"; + } + return "applied"; + }; + + const drainFullSnapshotHydration = async (): Promise => { + if (disposed || !fullSnapshotHydrationQueued) { + return; + } + + const result = await runSnapshotRecovery("bootstrap", () => api.orchestration.getSnapshot()); + if (disposed) { + return; + } + + if (result === "applied") { + fullSnapshotHydrationQueued = false; + clearFullSnapshotHydrationTimer(); + return; + } + + if (result === "failed") { + fullSnapshotHydrationQueued = false; + clearFullSnapshotHydrationTimer(); + return; } + + clearFullSnapshotHydrationTimer(); + fullSnapshotHydrationTimer = globalThis.setTimeout(() => { + void drainFullSnapshotHydration(); + }, 50); + }; + + const scheduleFullSnapshotHydration = () => { + if (disposed || fullSnapshotHydrationQueued) { + return; + } + fullSnapshotHydrationQueued = true; + clearFullSnapshotHydrationTimer(); + fullSnapshotHydrationTimer = globalThis.setTimeout(() => { + void drainFullSnapshotHydration(); + }, 0); }; const bootstrapFromSnapshot = async (): Promise => { - await runSnapshotRecovery("bootstrap"); + const result = await runSnapshotRecovery("bootstrap", () => + api.orchestration.getBootstrapSnapshot(), + ); + if (result === "applied" && !disposed) { + scheduleFullSnapshotHydration(); + return; + } + if (result === "failed" && !disposed) { + await runSnapshotRecovery("bootstrap", () => api.orchestration.getSnapshot()); + } }; bootstrapFromSnapshotRef.current = bootstrapFromSnapshot; const fallbackToSnapshotRecovery = async (): Promise => { - await runSnapshotRecovery("replay-failed"); + await runSnapshotRecovery("replay-failed", () => api.orchestration.getSnapshot()); }; const unsubDomainEvent = api.orchestration.onDomainEvent((event) => { const action = recovery.classifyDomainEvent(event.sequence); @@ -434,6 +497,8 @@ function EventRouter() { disposed = true; disposedRef.current = true; needsProviderInvalidation = false; + fullSnapshotHydrationQueued = false; + clearFullSnapshotHydrationTimer(); queryInvalidationThrottler.cancel(); unsubDomainEvent(); unsubTerminalEvent(); diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index 31160dfa1c..d658ada84f 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -90,6 +90,7 @@ export function createWsNativeApi(): NativeApi { updateSettings: rpcClient.server.updateSettings, }, orchestration: { + getBootstrapSnapshot: rpcClient.orchestration.getBootstrapSnapshot, getSnapshot: rpcClient.orchestration.getSnapshot, dispatchCommand: rpcClient.orchestration.dispatchCommand, getTurnDiff: rpcClient.orchestration.getTurnDiff, diff --git a/apps/web/src/wsRpcClient.ts b/apps/web/src/wsRpcClient.ts index 60f51ba707..5ee1d3657a 100644 --- a/apps/web/src/wsRpcClient.ts +++ b/apps/web/src/wsRpcClient.ts @@ -86,6 +86,9 @@ export interface WsRpcClient { readonly subscribeLifecycle: RpcStreamMethod; }; readonly orchestration: { + readonly getBootstrapSnapshot: RpcUnaryNoArgMethod< + typeof ORCHESTRATION_WS_METHODS.getBootstrapSnapshot + >; readonly getSnapshot: RpcUnaryNoArgMethod; readonly dispatchCommand: RpcUnaryMethod; readonly getTurnDiff: RpcUnaryMethod; @@ -185,6 +188,8 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { transport.subscribe((client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener), }, orchestration: { + getBootstrapSnapshot: () => + transport.request((client) => client[ORCHESTRATION_WS_METHODS.getBootstrapSnapshot]({})), getSnapshot: () => transport.request((client) => client[ORCHESTRATION_WS_METHODS.getSnapshot]({})), dispatchCommand: (input) => diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 3114f6f5be..1665992015 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -40,6 +40,7 @@ import type { import type { ServerUpsertKeybindingInput } from "./server"; import type { ClientOrchestrationCommand, + OrchestrationGetBootstrapSnapshotResult, OrchestrationGetFullThreadDiffInput, OrchestrationGetFullThreadDiffResult, OrchestrationGetTurnDiffInput, @@ -173,6 +174,7 @@ export interface NativeApi { updateSettings: (patch: ServerSettingsPatch) => Promise; }; orchestration: { + getBootstrapSnapshot: () => Promise; getSnapshot: () => Promise; dispatchCommand: (command: ClientOrchestrationCommand) => Promise<{ sequence: number }>; getTurnDiff: (input: OrchestrationGetTurnDiffInput) => Promise; diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index bd7e374f32..c9cfd085c3 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -16,6 +16,7 @@ import { } from "./baseSchemas"; export const ORCHESTRATION_WS_METHODS = { + getBootstrapSnapshot: "orchestration.getBootstrapSnapshot", getSnapshot: "orchestration.getSnapshot", dispatchCommand: "orchestration.dispatchCommand", getTurnDiff: "orchestration.getTurnDiff", @@ -985,6 +986,13 @@ export type OrchestrationGetSnapshotInput = typeof OrchestrationGetSnapshotInput const OrchestrationGetSnapshotResult = OrchestrationReadModel; export type OrchestrationGetSnapshotResult = typeof OrchestrationGetSnapshotResult.Type; +export const OrchestrationGetBootstrapSnapshotInput = Schema.Struct({}); +export type OrchestrationGetBootstrapSnapshotInput = + typeof OrchestrationGetBootstrapSnapshotInput.Type; +const OrchestrationGetBootstrapSnapshotResult = OrchestrationReadModel; +export type OrchestrationGetBootstrapSnapshotResult = + typeof OrchestrationGetBootstrapSnapshotResult.Type; + export const OrchestrationGetTurnDiffInput = TurnCountRange.mapFields( Struct.assign({ threadId: ThreadId }), { unsafePreserveChecks: true }, @@ -1012,6 +1020,10 @@ const OrchestrationReplayEventsResult = Schema.Array(OrchestrationEvent); export type OrchestrationReplayEventsResult = typeof OrchestrationReplayEventsResult.Type; export const OrchestrationRpcSchemas = { + getBootstrapSnapshot: { + input: OrchestrationGetBootstrapSnapshotInput, + output: OrchestrationGetBootstrapSnapshotResult, + }, getSnapshot: { input: OrchestrationGetSnapshotInput, output: OrchestrationGetSnapshotResult, diff --git a/packages/contracts/src/rpc.ts b/packages/contracts/src/rpc.ts index 34968e66ec..998d5d8ad1 100644 --- a/packages/contracts/src/rpc.ts +++ b/packages/contracts/src/rpc.ts @@ -29,6 +29,7 @@ import { KeybindingsConfigError } from "./keybindings"; import { ClientOrchestrationCommand, OrchestrationEvent, + OrchestrationGetBootstrapSnapshotInput, ORCHESTRATION_WS_METHODS, OrchestrationDispatchCommandError, OrchestrationGetFullThreadDiffError, @@ -263,6 +264,15 @@ export const WsOrchestrationGetSnapshotRpc = Rpc.make(ORCHESTRATION_WS_METHODS.g error: OrchestrationGetSnapshotError, }); +export const WsOrchestrationGetBootstrapSnapshotRpc = Rpc.make( + ORCHESTRATION_WS_METHODS.getBootstrapSnapshot, + { + payload: OrchestrationGetBootstrapSnapshotInput, + success: OrchestrationRpcSchemas.getBootstrapSnapshot.output, + error: OrchestrationGetSnapshotError, + }, +); + export const WsOrchestrationDispatchCommandRpc = Rpc.make( ORCHESTRATION_WS_METHODS.dispatchCommand, { @@ -351,6 +361,7 @@ export const WsRpcGroup = RpcGroup.make( WsSubscribeTerminalEventsRpc, WsSubscribeServerConfigRpc, WsSubscribeServerLifecycleRpc, + WsOrchestrationGetBootstrapSnapshotRpc, WsOrchestrationGetSnapshotRpc, WsOrchestrationDispatchCommandRpc, WsOrchestrationGetTurnDiffRpc, From 49dc85ffdb92f56df879c6311e822b4d1b7502bc Mon Sep 17 00:00:00 2001 From: justsomelegs <145564979+justsomelegs@users.noreply.github.com> Date: Wed, 1 Apr 2026 10:36:21 +0100 Subject: [PATCH 6/6] fix(orchestration): keep bootstrap activity shape consistent --- .../Layers/OrchestrationEngine.test.ts | 128 ++++++++++++++++++ .../Layers/OrchestrationEngine.ts | 14 +- 2 files changed, 140 insertions(+), 2 deletions(-) diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 54c4b698f5..b606cb305d 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -2,6 +2,7 @@ import { CheckpointRef, CommandId, DEFAULT_PROVIDER_INTERACTION_MODE, + EventId, MessageId, ProjectId, ThreadId, @@ -360,6 +361,133 @@ describe("OrchestrationEngine", () => { await system.dispose(); }); + it("strips historical thread activities when bootstrap falls back to replay", async () => { + const createdAt = now(); + const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), { + prefix: "t3-orchestration-engine-test-", + }); + const envLayer = Layer.merge( + ServerConfigLayer, + Layer.merge(NodeServices.layer, Layer.merge(NodeFileSystem.layer, NodePath.layer)), + ); + const sqlLayer = SqlitePersistenceMemory.pipe(Layer.provideMerge(envLayer)); + const eventStoreLayer = OrchestrationEventStoreLive.pipe(Layer.provideMerge(sqlLayer)); + const commandReceiptLayer = OrchestrationCommandReceiptRepositoryLive.pipe( + Layer.provideMerge(sqlLayer), + ); + const runtimeDependencies = Layer.merge(envLayer, sqlLayer); + const noBootstrapProjectionPipeline: OrchestrationProjectionPipelineShape = { + bootstrap: Effect.void, + projectEvent: () => Effect.void, + }; + const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(Layer.succeed(OrchestrationProjectionPipeline, noBootstrapProjectionPipeline)), + Layer.provideMerge(eventStoreLayer), + Layer.provideMerge(commandReceiptLayer), + Layer.provideMerge(sqlLayer), + ); + + const runtimeLayer = Layer.merge(orchestrationLayer, eventStoreLayer).pipe( + Layer.provideMerge(runtimeDependencies), + ); + const runtime = ManagedRuntime.make( + runtimeLayer as Layer.Layer< + Layer.Success, + Layer.Error, + never + >, + ); + + const eventStore = await runtime.runPromise(Effect.service(OrchestrationEventStore)); + await runtime.runPromise( + eventStore.append({ + type: "project.created", + eventId: EventId.makeUnsafe("evt-replay-bootstrap-project-created"), + aggregateKind: "project", + aggregateId: asProjectId("project-replay-bootstrap"), + occurredAt: createdAt, + commandId: CommandId.makeUnsafe("cmd-replay-bootstrap-project-created"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-replay-bootstrap-project-created"), + metadata: {}, + payload: { + projectId: asProjectId("project-replay-bootstrap"), + title: "Replay bootstrap project", + workspaceRoot: "/tmp/project-replay-bootstrap", + defaultModelSelection: null, + scripts: [], + createdAt, + updatedAt: createdAt, + }, + }), + ); + await runtime.runPromise( + eventStore.append({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-replay-bootstrap-thread-created"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-replay-bootstrap"), + occurredAt: createdAt, + commandId: CommandId.makeUnsafe("cmd-replay-bootstrap-thread-created"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-replay-bootstrap-thread-created"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-replay-bootstrap"), + projectId: asProjectId("project-replay-bootstrap"), + title: "Replay bootstrap thread", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }), + ); + await runtime.runPromise( + eventStore.append({ + type: "thread.activity-appended", + eventId: EventId.makeUnsafe("evt-replay-bootstrap-activity-appended"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-replay-bootstrap"), + occurredAt: createdAt, + commandId: CommandId.makeUnsafe("cmd-replay-bootstrap-activity-appended"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-replay-bootstrap-activity-appended"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-replay-bootstrap"), + activity: { + id: EventId.makeUnsafe("activity-replay-bootstrap"), + tone: "info", + kind: "runtime.warning", + summary: "Replay bootstrap activity", + payload: { + message: "should be stripped from engine bootstrap", + }, + turnId: null, + createdAt, + }, + }, + }), + ); + + const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService)); + const readModel = await runtime.runPromise(engine.getReadModel()); + + expect(readModel.snapshotSequence).toBe(3); + expect( + readModel.threads.find((thread) => thread.id === "thread-replay-bootstrap")?.activities, + ).toEqual([]); + + await runtime.dispose(); + }); + it("streams persisted domain events in order", async () => { const system = await createOrchestrationSystem(); const { engine } = system; diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index dab5736beb..c11e394d5d 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -50,6 +50,16 @@ function commandToAggregateRef(command: OrchestrationCommand): { } } +function stripHistoricalActivities(snapshot: OrchestrationReadModel): OrchestrationReadModel { + return { + ...snapshot, + threads: snapshot.threads.map((thread) => ({ + ...thread, + activities: [], + })), + }; +} + const makeOrchestrationEngine = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; const eventStore = yield* OrchestrationEventStore; @@ -205,7 +215,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { replayedReadModel = yield* projectEvent(replayedReadModel, event); }), ); - return replayedReadModel; + return stripHistoricalActivities(replayedReadModel); }); const bootstrapReadModelFromProjectionSnapshot = Effect.gen(function* () { @@ -218,7 +228,7 @@ const makeOrchestrationEngine = Effect.gen(function* () { ); } } - return snapshot; + return stripHistoricalActivities(snapshot); }); yield* projectionPipeline.bootstrap;