Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ describe("CheckpointDiffQueryLive", () => {
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () =>
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
getActiveSnapshot: () =>
Effect.die("CheckpointDiffQuery should not request the active orchestration snapshot"),
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
listArchivedThreads: () => Effect.succeed([]),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.some(threadCheckpointContext)),
Expand Down Expand Up @@ -136,7 +139,10 @@ describe("CheckpointDiffQueryLive", () => {
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () =>
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
getActiveSnapshot: () =>
Effect.die("CheckpointDiffQuery should not request the active orchestration snapshot"),
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
listArchivedThreads: () => Effect.succeed([]),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.none()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ describe("OrchestrationEngine", () => {
Layer.provide(
Layer.succeed(ProjectionSnapshotQuery, {
getSnapshot: () => Effect.succeed(projectionSnapshot),
getActiveSnapshot: () => Effect.succeed(projectionSnapshot),
getCounts: () => Effect.succeed({ projectCount: 1, threadCount: 1 }),
listArchivedThreads: () => Effect.succeed([]),
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
getFirstActiveThreadIdByProjectId: () => Effect.succeed(Option.none()),
getThreadCheckpointContext: () => Effect.succeed(Option.none()),
Expand Down
259 changes: 258 additions & 1 deletion apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,21 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
NULL,
NULL
),
(
'thread-archived',
'project-active',
'Archived Thread',
'{"provider":"codex","model":"gpt-5-codex"}',
'full-access',
'default',
NULL,
'/tmp/worktree-archived',
NULL,
'2026-03-01T00:00:04.000Z',
'2026-03-01T00:00:04.500Z',
'2026-03-01T00:00:04.750Z',
NULL
),
(
'thread-deleted',
'project-active',
Expand All @@ -451,7 +466,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
const counts = yield* snapshotQuery.getCounts();
assert.deepEqual(counts, {
projectCount: 2,
threadCount: 3,
threadCount: 4,
});

const project = yield* snapshotQuery.getActiveProjectByWorkspaceRoot("/tmp/workspace");
Expand All @@ -470,9 +485,251 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
if (firstThreadId._tag === "Some") {
assert.equal(firstThreadId.value, ThreadId.makeUnsafe("thread-first"));
}

const archivedThreads = yield* snapshotQuery.listArchivedThreads();
assert.deepEqual(archivedThreads, [
{
threadId: ThreadId.makeUnsafe("thread-archived"),
projectId: asProjectId("project-active"),
projectTitle: "Active Project",
workspaceRoot: "/tmp/workspace",
title: "Archived Thread",
worktreePath: "/tmp/worktree-archived",
createdAt: "2026-03-01T00:00:04.000Z",
updatedAt: "2026-03-01T00:00:04.500Z",
archivedAt: "2026-03-01T00:00:04.750Z",
},
]);
}),
);

it.effect("hydrates an active snapshot without archived or deleted thread data", () =>
Effect.gen(function* () {
const snapshotQuery = yield* ProjectionSnapshotQuery;
const sql = yield* SqlClient.SqlClient;

yield* sql`DELETE FROM projection_projects`;
yield* sql`DELETE FROM projection_threads`;
yield* sql`DELETE FROM projection_thread_messages`;
yield* sql`DELETE FROM projection_thread_proposed_plans`;
yield* sql`DELETE FROM projection_thread_activities`;
yield* sql`DELETE FROM projection_thread_sessions`;
yield* sql`DELETE FROM projection_turns`;
yield* sql`DELETE FROM projection_state`;

yield* sql`
INSERT INTO projection_projects (
project_id,
title,
workspace_root,
default_model_selection_json,
scripts_json,
created_at,
updated_at,
deleted_at
)
VALUES (
'project-active-only',
'Active-only Project',
'/tmp/active-only',
NULL,
'[]',
'2026-03-03T00:00:00.000Z',
'2026-03-03T00:00:01.000Z',
NULL
)
`;

yield* sql`
INSERT INTO projection_threads (
thread_id,
project_id,
title,
model_selection_json,
runtime_mode,
interaction_mode,
branch,
worktree_path,
latest_turn_id,
created_at,
updated_at,
archived_at,
deleted_at
)
VALUES
(
'thread-active-only',
'project-active-only',
'Active Thread',
'{"provider":"codex","model":"gpt-5-codex"}',
'full-access',
'default',
NULL,
NULL,
'turn-active-only',
'2026-03-03T00:00:02.000Z',
'2026-03-03T00:00:03.000Z',
NULL,
NULL
),
(
'thread-archived-only',
'project-active-only',
'Archived Thread',
'{"provider":"codex","model":"gpt-5-codex"}',
'full-access',
'default',
NULL,
NULL,
'turn-archived-only',
'2026-03-03T00:00:04.000Z',
'2026-03-03T00:00:05.000Z',
'2026-03-03T00:00:06.000Z',
NULL
),
(
'thread-deleted-only',
'project-active-only',
'Deleted Thread',
'{"provider":"codex","model":"gpt-5-codex"}',
'full-access',
'default',
NULL,
NULL,
NULL,
'2026-03-03T00:00:07.000Z',
'2026-03-03T00:00:08.000Z',
NULL,
'2026-03-03T00:00:09.000Z'
)
`;

yield* sql`
INSERT INTO projection_thread_messages (
message_id,
thread_id,
turn_id,
role,
text,
is_streaming,
created_at,
updated_at
)
VALUES
(
'message-active-only',
'thread-active-only',
'turn-active-only',
'assistant',
'active message',
0,
'2026-03-03T00:00:10.000Z',
'2026-03-03T00:00:11.000Z'
),
(
'message-archived-only',
'thread-archived-only',
'turn-archived-only',
'assistant',
'archived message',
0,
'2026-03-03T00:00:12.000Z',
'2026-03-03T00:00:13.000Z'
)
`;

yield* sql`
INSERT INTO projection_turns (
thread_id,
turn_id,
pending_message_id,
source_proposed_plan_thread_id,
source_proposed_plan_id,
assistant_message_id,
state,
requested_at,
started_at,
completed_at,
checkpoint_turn_count,
checkpoint_ref,
checkpoint_status,
checkpoint_files_json
)
VALUES
(
'thread-active-only',
'turn-active-only',
NULL,
NULL,
NULL,
'message-active-only',
'completed',
'2026-03-03T00:00:14.000Z',
'2026-03-03T00:00:14.000Z',
'2026-03-03T00:00:14.000Z',
1,
'checkpoint-active-only',
'ready',
'[]'
),
(
'thread-archived-only',
'turn-archived-only',
NULL,
NULL,
NULL,
'message-archived-only',
'completed',
'2026-03-03T00:00:15.000Z',
'2026-03-03T00:00:15.000Z',
'2026-03-03T00:00:15.000Z',
1,
'checkpoint-archived-only',
'ready',
'[]'
)
`;

let sequence = 9;
for (const projector of Object.values(ORCHESTRATION_PROJECTOR_NAMES)) {
yield* sql`
INSERT INTO projection_state (
projector,
last_applied_sequence,
updated_at
)
VALUES (
${projector},
${sequence},
'2026-03-03T00:00:16.000Z'
)
`;
sequence += 1;
}

const snapshot = yield* snapshotQuery.getActiveSnapshot();

assert.equal(snapshot.snapshotSequence, 9);
assert.equal(snapshot.projects.length, 1);
assert.deepEqual(
snapshot.threads.map((thread) => ({
id: thread.id,
title: thread.title,
messageCount: thread.messages.length,
checkpointCount: thread.checkpoints.length,
})),
[
{
id: ThreadId.makeUnsafe("thread-active-only"),
title: "Active Thread",
messageCount: 1,
checkpointCount: 1,
},
],
);
}),
);

it.effect("reads single-thread checkpoint context without hydrating unrelated threads", () =>
Effect.gen(function* () {
const snapshotQuery = yield* ProjectionSnapshotQuery;
Expand Down
Loading
Loading