Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/opencode/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"@hono/standard-validator": "0.1.5",
"@hono/zod-validator": "catalog:",
"@lydell/node-pty": "1.2.0-beta.10",
"@modelcontextprotocol/core": "github:axiomantic/typescript-sdk#mcp-events",
"@modelcontextprotocol/sdk": "1.27.1",
"@npmcli/arborist": "9.4.0",
"@octokit/graphql": "9.0.2",
Expand Down
10 changes: 10 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ export namespace Config {
return list.toReversed()
}

const McpEventsPermissions = z.object({
inject_context: z.boolean().optional().default(false),
notify_user: z.boolean().optional().default(true),
trigger_turn: z.boolean().optional().default(false),
}).optional()

export const McpLocal = z
.object({
type: z.literal("local").describe("Type of MCP server connection"),
Expand All @@ -385,6 +391,8 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified."),
events: McpEventsPermissions
.describe("Per-effect permissions for MCP events. Controls which event effects this server is allowed to request."),
})
.strict()
.meta({
Expand Down Expand Up @@ -424,6 +432,8 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in ms for MCP server requests. Defaults to 5000 (5 seconds) if not specified."),
events: McpEventsPermissions
.describe("Per-effect permissions for MCP events. Controls which event effects this server is allowed to request."),
})
.strict()
.meta({
Expand Down
153 changes: 150 additions & 3 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type Tool as MCPToolDef,
ToolListChangedNotificationSchema,
} from "@modelcontextprotocol/sdk/types.js"
import { EventEmitNotificationSchema, EventSubscribeResultSchema, EventUnsubscribeResultSchema } from "@modelcontextprotocol/core/packages/core/src/types/schemas.js"
import { Config } from "../config/config"
import { Log } from "../util/log"
import { NamedError } from "@opencode-ai/util/error"
Expand All @@ -30,6 +31,28 @@ import { makeRuntime } from "@/effect/run-service"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"

/**
* Resolve per-effect permissions for an MCP server config.
* When no events config is present, defaults to safe values:
* inject_context=false, notify_user=true, trigger_turn=false
*/
export function resolveEventPermissions(mcp: Config.Mcp) {
const events = mcp.events
return {
inject_context: events?.inject_context ?? false,
notify_user: events?.notify_user ?? true,
trigger_turn: events?.trigger_turn ?? false,
}
}

/**
* Convert topic patterns with {param} placeholders to MQTT-style + wildcards
* for MCP event subscription.
*/
export function convertTopicPatterns(topics: Array<{ pattern: string }>): string[] {
return topics.map((t) => t.pattern.replace(/\{[^}]+\}/g, "+"))
}

export namespace MCP {
const log = Log.create({ service: "mcp" })
const DEFAULT_TIMEOUT = 30_000
Expand All @@ -52,6 +75,29 @@ export namespace MCP {
}),
)

export const McpEvent = BusEvent.define(
"mcp.event",
z.object({
server: z.string(),
topic: z.string(),
payload: z.unknown(),
event_id: z.string(),
retained: z.boolean().optional(),
requested_effects: z.array(z.object({
type: z.enum(["inject_context", "notify_user", "trigger_turn"]),
priority: z.enum(["low", "normal", "high", "urgent"]).optional(),
})).optional(),
permissions: z.object({
inject_context: z.boolean(),
notify_user: z.boolean(),
trigger_turn: z.boolean(),
}).optional(),
source: z.string().optional(),
correlation_id: z.string().optional(),
expires_at: z.string().optional(),
}),
)

export const BrowserOpenFailed = BusEvent.define(
"mcp.browser.open.failed",
z.object({
Expand Down Expand Up @@ -435,6 +481,7 @@ export namespace MCP {
log.info("create() successfully created client", { key, toolCount: listed.length })
return { mcpClient, status, defs: listed } satisfies CreateResult
})
const subscriptions = new Map<string, string[]>()
const cfgSvc = yield* Config.Service

const descendants = Effect.fnUntraced(
Expand Down Expand Up @@ -463,7 +510,7 @@ export namespace MCP {
Effect.catch(() => Effect.succeed([] as number[])),
)

function watch(s: State, name: string, client: MCPClient, timeout?: number) {
function watch(s: State, name: string, client: MCPClient, timeout?: number, eventPermissions?: { inject_context: boolean; notify_user: boolean; trigger_turn: boolean }) {
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
Expand All @@ -475,8 +522,90 @@ export namespace MCP {
s.defs[name] = listed
await Effect.runPromise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})

client.setNotificationHandler(EventEmitNotificationSchema, async (notification) => {
const event = notification.params
log.info("event received", { server: name, topic: event.topic, event_id: event.event_id })
await Effect.runPromise(
bus.publish(McpEvent, {
server: name,
topic: event.topic,
payload: event.payload,
event_id: event.event_id,
retained: event.retained,
requested_effects: event.requested_effects,
permissions: eventPermissions,
source: event.source,
correlation_id: event.correlation_id,
expires_at: event.expires_at,
}).pipe(Effect.ignore),
)
})
}

const autoSubscribeEvents = Effect.fn("MCP.autoSubscribeEvents")(function* (
key: string,
client: MCPClient,
eventPermissions?: { inject_context: boolean; notify_user: boolean; trigger_turn: boolean },
) {
yield* Effect.tryPromise({
try: async () => {
// Check server capabilities for events
// SDK compatibility: MCPClient.getServerCapabilities() exists at runtime but
// is not exposed in the published type definitions as of @modelcontextprotocol/sdk 1.x.
// Cast to any until the SDK exposes this in its public API.
const serverCapabilities = (client as any).getServerCapabilities?.() as
| { events?: { topics?: Array<{ pattern: string }> } }
| undefined
const eventsCapability = serverCapabilities?.events
if (!eventsCapability?.topics?.length) return

const topics = eventsCapability.topics as Array<{ pattern: string }>
// Convert {param} placeholders to + wildcards for subscription
const patterns = convertTopicPatterns(topics)

// SDK compatibility: EventSubscribeResultSchema is a custom schema not part
// of the official MCP SDK types. Cast required because client.request() expects
// the SDK's own result schema types. The returned shape is validated by the
// zod schema at runtime.
const subscribeResult = await client.request(
{ method: "events/subscribe", params: { topics: patterns } },
EventSubscribeResultSchema as any,
)

// Publish retained values as bus events
for (const retained of subscribeResult.retained ?? []) {
try {
await Effect.runPromise(
bus.publish(McpEvent, {
server: key,
topic: retained.topic,
payload: retained.payload,
event_id: retained.event_id,
retained: true,
permissions: eventPermissions,
source: retained.source,
correlation_id: retained.correlation_id,
expires_at: retained.expires_at,
}).pipe(Effect.ignore),
)
} catch (e) {
log.warn("failed to publish retained event", { topic: retained.topic, error: e })
}
}
subscriptions.set(key, (subscribeResult.subscribed ?? []).map((s: { pattern: string }) => s.pattern))
log.info("subscribed to events", {
server: key,
subscribed: (subscribeResult.subscribed ?? []).length,
})
},
catch: (e) => {
log.warn("failed to subscribe to events", { server: key, error: String(e) })
return e
},
}).pipe(Effect.ignore)
})

const state = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () {
const cfg = yield* cfgSvc.get()
Expand Down Expand Up @@ -508,7 +637,9 @@ export namespace MCP {
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout)
const perms = resolveEventPermissions(mcp)
watch(s, key, result.mcpClient, mcp.timeout, perms)
yield* autoSubscribeEvents(key, result.mcpClient, perms)
}
}),
{ concurrency: "unbounded" },
Expand Down Expand Up @@ -574,15 +705,19 @@ export namespace MCP {

s.status[name] = result.status
if (!result.mcpClient) {
subscriptions.delete(name)
yield* closeClient(s, name)
delete s.clients[name]
return result.status
}

subscriptions.delete(name)
yield* closeClient(s, name)
s.clients[name] = result.mcpClient
s.defs[name] = result.defs!
watch(s, name, result.mcpClient, mcp.timeout)
const perms = resolveEventPermissions(mcp)
watch(s, name, result.mcpClient, mcp.timeout, perms)
yield* autoSubscribeEvents(name, result.mcpClient, perms)
return result.status
})

Expand All @@ -603,6 +738,18 @@ export namespace MCP {

const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) {
const s = yield* InstanceState.get(state)
// Best-effort unsubscribe before closing transport
const subs = subscriptions.get(name) ?? []
if (subs.length > 0 && s.clients[name]) {
yield* Effect.tryPromise({
try: () => s.clients[name]!.request(
{ method: "events/unsubscribe", params: { topics: subs } },
EventUnsubscribeResultSchema as any,
),
Comment on lines +745 to +748
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The best-effort events/unsubscribe request uses the default timeout (likely 30 seconds). If a server is unresponsive or hanging, this will delay the disconnection process significantly. Since this is a best-effort cleanup, it should use a much shorter timeout.

Suggested change
try: () => s.clients[name]!.request(
{ method: "events/unsubscribe", params: { topics: subs } },
EventUnsubscribeResultSchema as any,
),
try: () => s.clients[name]!.request(
{ method: "events/unsubscribe", params: { topics: subs } },
EventUnsubscribeResultSchema as any,
{ timeout: 2000 },
),

catch: () => undefined,
}).pipe(Effect.ignore)
}
subscriptions.delete(name)
yield* closeClient(s, name)
delete s.clients[name]
s.status[name] = { status: "disabled" }
Expand Down
Loading
Loading