diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index deb8a6d44d..0237241d91 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -174,6 +174,249 @@ export function projectEvent( updatedAt: event.occurredAt, }; + const projectThreadCreated = Effect.fn("projectEvent.threadCreated")(function* () { + const payload = yield* decodeForEvent( + ThreadCreatedPayload, + event.payload, + event.type, + "payload", + ); + const thread: OrchestrationThread = yield* decodeForEvent( + OrchestrationThread, + { + id: payload.threadId, + projectId: payload.projectId, + title: payload.title, + modelSelection: payload.modelSelection, + runtimeMode: payload.runtimeMode, + interactionMode: payload.interactionMode, + branch: payload.branch, + worktreePath: payload.worktreePath, + latestTurn: null, + createdAt: payload.createdAt, + updatedAt: payload.updatedAt, + archivedAt: null, + deletedAt: null, + messages: [], + activities: [], + checkpoints: [], + session: null, + }, + event.type, + "thread", + ); + const existing = nextBase.threads.find((entry) => entry.id === thread.id); + return { + ...nextBase, + threads: existing + ? nextBase.threads.map((entry) => (entry.id === thread.id ? thread : entry)) + : [...nextBase.threads, thread], + }; + }); + + const projectThreadMessageSent = Effect.fn("projectEvent.threadMessageSent")(function* () { + const payload = yield* decodeForEvent( + MessageSentPayloadSchema, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const message: OrchestrationMessage = yield* decodeForEvent( + OrchestrationMessage, + { + id: payload.messageId, + role: payload.role, + text: payload.text, + ...(payload.attachments !== undefined ? { attachments: payload.attachments } : {}), + turnId: payload.turnId, + streaming: payload.streaming, + createdAt: payload.createdAt, + updatedAt: payload.updatedAt, + }, + event.type, + "message", + ); + + const existingMessage = thread.messages.find((entry) => entry.id === message.id); + const messages = existingMessage + ? thread.messages.map((entry) => + entry.id === message.id + ? { + ...entry, + text: message.streaming + ? `${entry.text}${message.text}` + : message.text.length > 0 + ? message.text + : entry.text, + streaming: message.streaming, + updatedAt: message.updatedAt, + turnId: message.turnId, + ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + } + : entry, + ) + : [...thread.messages, message]; + const cappedMessages = messages.slice(-MAX_THREAD_MESSAGES); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + messages: cappedMessages, + updatedAt: event.occurredAt, + }), + }; + }); + + const projectThreadSessionSet = Effect.fn("projectEvent.threadSessionSet")(function* () { + const payload = yield* decodeForEvent( + ThreadSessionSetPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const session: OrchestrationSession = yield* decodeForEvent( + OrchestrationSession, + payload.session, + event.type, + "session", + ); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + session, + latestTurn: + session.status === "running" && session.activeTurnId !== null + ? { + turnId: session.activeTurnId, + state: "running", + requestedAt: + thread.latestTurn?.turnId === session.activeTurnId + ? thread.latestTurn.requestedAt + : session.updatedAt, + startedAt: + thread.latestTurn?.turnId === session.activeTurnId + ? (thread.latestTurn.startedAt ?? session.updatedAt) + : session.updatedAt, + completedAt: null, + assistantMessageId: + thread.latestTurn?.turnId === session.activeTurnId + ? thread.latestTurn.assistantMessageId + : null, + } + : thread.latestTurn, + updatedAt: event.occurredAt, + }), + }; + }); + + const projectThreadProposedPlanUpserted = Effect.fn("projectEvent.threadProposedPlanUpserted")( + function* () { + const payload = yield* decodeForEvent( + ThreadProposedPlanUpsertedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const proposedPlans = [ + ...thread.proposedPlans.filter((entry) => entry.id !== payload.proposedPlan.id), + payload.proposedPlan, + ] + .toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), + ) + .slice(-200); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + proposedPlans, + updatedAt: event.occurredAt, + }), + }; + }, + ); + + const projectThreadTurnDiffCompleted = Effect.fn("projectEvent.threadTurnDiffCompleted")( + function* () { + const payload = yield* decodeForEvent( + ThreadTurnDiffCompletedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const checkpoint = yield* decodeForEvent( + OrchestrationCheckpointSummary, + { + turnId: payload.turnId, + checkpointTurnCount: payload.checkpointTurnCount, + checkpointRef: payload.checkpointRef, + status: payload.status, + files: payload.files, + assistantMessageId: payload.assistantMessageId, + completedAt: payload.completedAt, + }, + event.type, + "checkpoint", + ); + + const existing = thread.checkpoints.find((entry) => entry.turnId === checkpoint.turnId); + if (existing && existing.status !== "missing" && checkpoint.status === "missing") { + return nextBase; + } + + const checkpoints = [ + ...thread.checkpoints.filter((entry) => entry.turnId !== checkpoint.turnId), + checkpoint, + ] + .toSorted((left, right) => left.checkpointTurnCount - right.checkpointTurnCount) + .slice(-MAX_THREAD_CHECKPOINTS); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + checkpoints, + latestTurn: { + turnId: payload.turnId, + state: checkpointStatusToLatestTurnState(payload.status), + requestedAt: + thread.latestTurn?.turnId === payload.turnId + ? thread.latestTurn.requestedAt + : payload.completedAt, + startedAt: + thread.latestTurn?.turnId === payload.turnId + ? (thread.latestTurn.startedAt ?? payload.completedAt) + : payload.completedAt, + completedAt: payload.completedAt, + assistantMessageId: payload.assistantMessageId, + }, + updatedAt: event.occurredAt, + }), + }; + }, + ); + switch (event.type) { case "project.created": return decodeForEvent(ProjectCreatedPayload, event.payload, event.type, "payload").pipe( @@ -241,45 +484,7 @@ export function projectEvent( ); case "thread.created": - return Effect.gen(function* () { - const payload = yield* decodeForEvent( - ThreadCreatedPayload, - event.payload, - event.type, - "payload", - ); - const thread: OrchestrationThread = yield* decodeForEvent( - OrchestrationThread, - { - id: payload.threadId, - projectId: payload.projectId, - title: payload.title, - modelSelection: payload.modelSelection, - runtimeMode: payload.runtimeMode, - interactionMode: payload.interactionMode, - branch: payload.branch, - worktreePath: payload.worktreePath, - latestTurn: null, - createdAt: payload.createdAt, - updatedAt: payload.updatedAt, - archivedAt: null, - deletedAt: null, - messages: [], - activities: [], - checkpoints: [], - session: null, - }, - event.type, - "thread", - ); - const existing = nextBase.threads.find((entry) => entry.id === thread.id); - return { - ...nextBase, - threads: existing - ? nextBase.threads.map((entry) => (entry.id === thread.id ? thread : entry)) - : [...nextBase.threads, thread], - }; - }); + return projectThreadCreated(); case "thread.deleted": return decodeForEvent(ThreadDeletedPayload, event.payload, event.type, "payload").pipe( @@ -358,214 +563,16 @@ export function projectEvent( ); case "thread.message-sent": - return Effect.gen(function* () { - const payload = yield* decodeForEvent( - MessageSentPayloadSchema, - event.payload, - event.type, - "payload", - ); - const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); - if (!thread) { - return nextBase; - } - - const message: OrchestrationMessage = yield* decodeForEvent( - OrchestrationMessage, - { - id: payload.messageId, - role: payload.role, - text: payload.text, - ...(payload.attachments !== undefined ? { attachments: payload.attachments } : {}), - turnId: payload.turnId, - streaming: payload.streaming, - createdAt: payload.createdAt, - updatedAt: payload.updatedAt, - }, - event.type, - "message", - ); - - const existingMessage = thread.messages.find((entry) => entry.id === message.id); - const messages = existingMessage - ? thread.messages.map((entry) => - entry.id === message.id - ? { - ...entry, - text: message.streaming - ? `${entry.text}${message.text}` - : message.text.length > 0 - ? message.text - : entry.text, - streaming: message.streaming, - updatedAt: message.updatedAt, - turnId: message.turnId, - ...(message.attachments !== undefined - ? { attachments: message.attachments } - : {}), - } - : entry, - ) - : [...thread.messages, message]; - const cappedMessages = messages.slice(-MAX_THREAD_MESSAGES); - - return { - ...nextBase, - threads: updateThread(nextBase.threads, payload.threadId, { - messages: cappedMessages, - updatedAt: event.occurredAt, - }), - }; - }); + return projectThreadMessageSent(); case "thread.session-set": - return Effect.gen(function* () { - const payload = yield* decodeForEvent( - ThreadSessionSetPayload, - event.payload, - event.type, - "payload", - ); - const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); - if (!thread) { - return nextBase; - } - - const session: OrchestrationSession = yield* decodeForEvent( - OrchestrationSession, - payload.session, - event.type, - "session", - ); - - return { - ...nextBase, - threads: updateThread(nextBase.threads, payload.threadId, { - session, - latestTurn: - session.status === "running" && session.activeTurnId !== null - ? { - turnId: session.activeTurnId, - state: "running", - requestedAt: - thread.latestTurn?.turnId === session.activeTurnId - ? thread.latestTurn.requestedAt - : session.updatedAt, - startedAt: - thread.latestTurn?.turnId === session.activeTurnId - ? (thread.latestTurn.startedAt ?? session.updatedAt) - : session.updatedAt, - completedAt: null, - assistantMessageId: - thread.latestTurn?.turnId === session.activeTurnId - ? thread.latestTurn.assistantMessageId - : null, - } - : thread.latestTurn, - updatedAt: event.occurredAt, - }), - }; - }); + return projectThreadSessionSet(); case "thread.proposed-plan-upserted": - return Effect.gen(function* () { - const payload = yield* decodeForEvent( - ThreadProposedPlanUpsertedPayload, - event.payload, - event.type, - "payload", - ); - const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); - if (!thread) { - return nextBase; - } - - const proposedPlans = [ - ...thread.proposedPlans.filter((entry) => entry.id !== payload.proposedPlan.id), - payload.proposedPlan, - ] - .toSorted( - (left, right) => - left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), - ) - .slice(-200); - - return { - ...nextBase, - threads: updateThread(nextBase.threads, payload.threadId, { - proposedPlans, - updatedAt: event.occurredAt, - }), - }; - }); + return projectThreadProposedPlanUpserted(); case "thread.turn-diff-completed": - return Effect.gen(function* () { - const payload = yield* decodeForEvent( - ThreadTurnDiffCompletedPayload, - event.payload, - event.type, - "payload", - ); - const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); - if (!thread) { - return nextBase; - } - - const checkpoint = yield* decodeForEvent( - OrchestrationCheckpointSummary, - { - turnId: payload.turnId, - checkpointTurnCount: payload.checkpointTurnCount, - checkpointRef: payload.checkpointRef, - status: payload.status, - files: payload.files, - assistantMessageId: payload.assistantMessageId, - completedAt: payload.completedAt, - }, - event.type, - "checkpoint", - ); - - // Do not let a placeholder (status "missing") overwrite a checkpoint - // that has already been captured with a real git ref (status "ready"). - // ProviderRuntimeIngestion may fire multiple turn.diff.updated events - // per turn; without this guard later placeholders would clobber the - // real capture dispatched by CheckpointReactor. - const existing = thread.checkpoints.find((entry) => entry.turnId === checkpoint.turnId); - if (existing && existing.status !== "missing" && checkpoint.status === "missing") { - return nextBase; - } - - const checkpoints = [ - ...thread.checkpoints.filter((entry) => entry.turnId !== checkpoint.turnId), - checkpoint, - ] - .toSorted((left, right) => left.checkpointTurnCount - right.checkpointTurnCount) - .slice(-MAX_THREAD_CHECKPOINTS); - - return { - ...nextBase, - threads: updateThread(nextBase.threads, payload.threadId, { - checkpoints, - latestTurn: { - turnId: payload.turnId, - state: checkpointStatusToLatestTurnState(payload.status), - requestedAt: - thread.latestTurn?.turnId === payload.turnId - ? thread.latestTurn.requestedAt - : payload.completedAt, - startedAt: - thread.latestTurn?.turnId === payload.turnId - ? (thread.latestTurn.startedAt ?? payload.completedAt) - : payload.completedAt, - completedAt: payload.completedAt, - assistantMessageId: payload.assistantMessageId, - }, - updatedAt: event.occurredAt, - }), - }; - }); + return projectThreadTurnDiffCompleted(); case "thread.reverted": return decodeForEvent(ThreadRevertedPayload, event.payload, event.type, "payload").pipe(