diff --git a/docs/events.md b/docs/events.md
new file mode 100644
index 000000000..f13129d3d
--- /dev/null
+++ b/docs/events.md
@@ -0,0 +1,430 @@
+# Events
+
+Events enable server-to-client push notifications over named topics. Clients subscribe to topic patterns, and servers emit events that are delivered to all matching subscribers. Events support MQTT-style wildcard patterns, retained values, and advisory effect hints.
+
+## When to Use Events
+
+Events are designed for:
+
+- Real-time state changes (e.g., a build finished, a file changed)
+- Progress or status broadcasts that multiple clients may care about
+- Lightweight notifications where a full tool call or resource read is unnecessary
+
+If the client needs to _request_ data, use resources or tools instead. Events are for server-initiated pushes.
+
+## Topic Patterns
+
+Topics are `/`-separated strings with a maximum depth of 8 segments. Clients subscribe using MQTT-style wildcard patterns:
+
+| Pattern | Matches | Does Not Match |
+|---------|---------|----------------|
+| `build/status` | `build/status` | `build/status/detail` |
+| `build/+` | `build/status`, `build/log` | `build/status/detail` |
+| `build/#` | `build`, `build/status`, `build/status/detail` | `deploy/status` |
+| `+/status` | `build/status`, `deploy/status` | `build/sub/status` |
+| `#` | Everything | (matches all topics) |
+
+- `+` matches exactly one segment
+- `#` matches zero or more trailing segments (must be the last segment)
+
+### Session-Scoped Topics
+
+Servers may use a `{session_id}` placeholder in topic patterns to scope topics to individual sessions (e.g., `app/sessions/{session_id}/messages`). When a topic contains `{session_id}`, the server enforces that subscribers can only substitute their own session UUID -- wildcards and other session IDs are rejected. Sessions use `{session_id}` to receive topics scoped to themselves -- useful for targeted notifications, status updates, task assignments, or any application-level filtering that should be per-session. This convention is not part of the core MCP spec but is a common server-side pattern (used by FastMCP, among others).
+
+## Server-Side
+
+### Declaring Event Topics
+
+Servers declare available topics through `EventTopicDescriptor` entries on the `EventsCapability`. The SDK auto-declares the `events` capability when an `EventSubscribeRequest` handler is registered.
+
+### Emitting Events
+
+Use `ServerSession.emit_event()` to push an event to the connected client:
+
+```python
+await server_session.emit_event(
+ topic="build/status",
+ payload={"project": "myapp", "status": "success"},
+)
+```
+
+`emit_event()` accepts these keyword arguments:
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `topic` | `str` | (required) | Topic string to publish on |
+| `payload` | `Any` | (required) | Event data (any JSON-serializable value) |
+| `event_id` | `str \| None` | auto-generated ULID | Unique event identifier |
+| `timestamp` | `str \| None` | current UTC ISO 8601 | Event timestamp |
+| `retained` | `bool` | `False` | Whether to treat as a retained value |
+| `source` | `str \| None` | `None` | Opaque source identifier |
+| `correlation_id` | `str \| None` | `None` | Links related events together |
+| `requested_effects` | `list[EventEffect] \| None` | `None` | Advisory hints for client behavior |
+| `expires_at` | `str \| None` | `None` | ISO 8601 expiry for retained values |
+
+### Requested Effects
+
+`EventEffect` provides advisory hints about how the client should handle an event:
+
+```python
+from mcp.types import EventEffect
+
+await server_session.emit_event(
+ topic="alert/critical",
+ payload={"message": "Disk full"},
+ requested_effects=[
+ EventEffect(type="notify_user", priority="urgent"),
+ ],
+)
+```
+
+| Effect Type | Description |
+|-------------|-------------|
+| `inject_context` | Suggest injecting the event payload into the LLM context |
+| `notify_user` | Suggest notifying the user |
+| `trigger_turn` | Suggest triggering an LLM turn |
+
+Priority levels: `low`, `normal` (default), `high`, `urgent`.
+
+### Subscription Registry
+
+`SubscriptionRegistry` manages which sessions are subscribed to which patterns. It handles wildcard matching and guarantees at-most-once delivery per session per event:
+
+```python
+from mcp.server.events import SubscriptionRegistry
+
+registry = SubscriptionRegistry()
+
+# Track a session's subscription
+await registry.add(session_id, "build/+")
+
+# Find all sessions that should receive an event
+matching_sessions = await registry.match("build/status")
+
+# Clean up on disconnect
+await registry.remove_all(session_id)
+```
+
+### Retained Value Store
+
+`RetainedValueStore` caches the most recent event per topic so new subscribers receive the current state immediately:
+
+```python
+from mcp.server.events import RetainedValueStore
+from mcp.types import RetainedEvent
+
+store = RetainedValueStore()
+
+# Store a retained value
+await store.set(
+ "sensor/temperature",
+ RetainedEvent(topic="sensor/temperature", eventId="evt-1", payload=22.5),
+ expires_at="2025-12-31T23:59:59Z", # optional expiry
+)
+
+# Retrieve retained values matching a pattern
+retained = await store.get_matching("sensor/+")
+```
+
+Retained values with an `expires_at` in the past are automatically cleaned up on access.
+
+### Handling Subscriptions (Low-Level Server)
+
+Register request handlers for `EventSubscribeRequest`, `EventUnsubscribeRequest`, and `EventListRequest` on the low-level `Server`:
+
+```python
+from mcp.server.lowlevel.server import Server, request_ctx
+from mcp.server.events import SubscriptionRegistry, RetainedValueStore
+from mcp.types import (
+ EventSubscribeRequest,
+ EventSubscribeResult,
+ EventUnsubscribeRequest,
+ EventUnsubscribeResult,
+ EventListRequest,
+ EventListResult,
+ EventTopicDescriptor,
+ RetainedEvent,
+ ServerResult,
+ SubscribedTopic,
+)
+
+registry = SubscriptionRegistry()
+store = RetainedValueStore()
+
+topics = [
+ EventTopicDescriptor(pattern="build/+", description="Build events"),
+ EventTopicDescriptor(
+ pattern="config/current",
+ description="Current config",
+ retained=True,
+ ),
+]
+
+server = Server("my-server")
+
+
+async def handle_subscribe(req: EventSubscribeRequest):
+ ctx = request_ctx.get()
+ subscribed = []
+ for pattern in req.params.topics:
+ await registry.add(str(ctx.request_id), pattern)
+ subscribed.append(SubscribedTopic(pattern=pattern))
+
+ retained: list[RetainedEvent] = []
+ for pattern in req.params.topics:
+ retained.extend(await store.get_matching(pattern))
+
+ return ServerResult(
+ EventSubscribeResult(subscribed=subscribed, retained=retained)
+ )
+
+
+async def handle_unsubscribe(req: EventUnsubscribeRequest):
+ ctx = request_ctx.get()
+ for pattern in req.params.topics:
+ await registry.remove(str(ctx.request_id), pattern)
+ return ServerResult(
+ EventUnsubscribeResult(unsubscribed=req.params.topics)
+ )
+
+
+async def handle_list(req: EventListRequest):
+ return ServerResult(EventListResult(topics=topics))
+
+
+server.request_handlers[EventSubscribeRequest] = handle_subscribe
+server.request_handlers[EventUnsubscribeRequest] = handle_unsubscribe
+server.request_handlers[EventListRequest] = handle_list
+```
+
+## Client-Side
+
+### Session ID
+
+After initialization, `session.session_id` returns the server-assigned session ID (`str | None`), sourced from `InitializeResult._meta["session_id"]`. This is useful for constructing session-scoped topic patterns:
+
+```python
+topic = f"app/sessions/{session.session_id}/messages"
+await session.subscribe_events([topic])
+```
+
+Session-scoped topics work for any per-session delivery pattern:
+
+```python
+# Non-messaging examples of session-scoped topics:
+build_topic = f"app/sessions/{session.session_id}/builds"
+task_topic = f"app/sessions/{session.session_id}/tasks"
+alerts_topic = f"app/sessions/{session.session_id}/alerts"
+```
+
+Returns `None` if the server does not provide a session ID in `_meta`.
+
+### Subscribing to Events
+
+Use `subscribe_events()` to register interest in one or more topic patterns:
+
+```python
+result = await session.subscribe_events(["build/+", "deploy/#"])
+
+for sub in result.subscribed:
+ print(f"Subscribed: {sub.pattern}")
+
+for rej in result.rejected:
+ print(f"Rejected: {rej.pattern} ({rej.reason})")
+
+# Retained values are delivered inline
+for event in result.retained:
+ print(f"Retained: {event.topic} = {event.payload}")
+```
+
+The `EventSubscribeResult` contains:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `subscribed` | `list[SubscribedTopic]` | Patterns the server accepted |
+| `rejected` | `list[RejectedTopic]` | Patterns the server refused, with reasons |
+| `retained` | `list[RetainedEvent]` | Current retained values for subscribed topics |
+
+### Receiving Events
+
+Register a handler to process incoming events. Two approaches:
+
+**Using `set_event_handler()`:**
+
+```python
+async def on_event(params: EventParams) -> None:
+ print(f"[{params.topic}] {params.payload}")
+
+session.set_event_handler(on_event)
+```
+
+**Using the `@on_event` decorator:**
+
+```python
+@session.on_event(topic_filter="build/+")
+async def on_build_event(params: EventParams) -> None:
+ print(f"Build: {params.payload}")
+```
+
+The optional `topic_filter` applies an additional client-side filter using the same wildcard syntax as subscription patterns. The filter is compiled once when the handler is registered and reused for every incoming event. Events that do not match the filter are silently dropped before reaching the handler.
+
+The client also tracks subscribed patterns internally. Once a client has at least one active subscription, events whose topic does not match any subscribed pattern are dropped before reaching the handler, even if the server sends them. A client that never calls `subscribe_events` has no subscription patterns registered and will pass every event received from the server through to the handler, subject only to the optional `topic_filter`. If you want strict subscription-only delivery, subscribe explicitly.
+
+### Unsubscribing
+
+```python
+result = await session.unsubscribe_events(["build/+"])
+# result.unsubscribed contains the patterns that were removed
+```
+
+### Listing Available Topics
+
+```python
+result = await session.list_events()
+for topic in result.topics:
+ print(f"{topic.pattern}: {topic.description} (retained={topic.retained})")
+```
+
+## Full Example
+
+A complete server and client exchanging events over an in-memory transport:
+
+```python
+import anyio
+from mcp.client.session import ClientSession
+from mcp.server.events import SubscriptionRegistry
+from mcp.server.lowlevel.server import Server, request_ctx
+from mcp.server.lowlevel import NotificationOptions
+from mcp.server.models import InitializationOptions
+from mcp.server.session import ServerSession
+from mcp.shared.message import SessionMessage
+from mcp.shared.session import RequestResponder
+from mcp.types import (
+ EventListRequest,
+ EventListResult,
+ EventParams,
+ EventSubscribeRequest,
+ EventSubscribeResult,
+ EventTopicDescriptor,
+ EventUnsubscribeRequest,
+ EventUnsubscribeResult,
+ ServerResult,
+ SubscribedTopic,
+)
+import mcp.types as types
+
+registry = SubscriptionRegistry()
+descriptors = [EventTopicDescriptor(pattern="chat/+", description="Chat messages")]
+
+
+def create_server() -> Server:
+ server = Server("event-demo")
+
+ async def on_subscribe(req: EventSubscribeRequest):
+ ctx = request_ctx.get()
+ subscribed = []
+ for p in req.params.topics:
+ await registry.add("demo", p)
+ subscribed.append(SubscribedTopic(pattern=p))
+ return ServerResult(EventSubscribeResult(subscribed=subscribed))
+
+ async def on_unsubscribe(req: EventUnsubscribeRequest):
+ for p in req.params.topics:
+ await registry.remove("demo", p)
+ return ServerResult(EventUnsubscribeResult(unsubscribed=req.params.topics))
+
+ async def on_list(req: EventListRequest):
+ return ServerResult(EventListResult(topics=descriptors))
+
+ server.request_handlers[EventSubscribeRequest] = on_subscribe
+ server.request_handlers[EventUnsubscribeRequest] = on_unsubscribe
+ server.request_handlers[EventListRequest] = on_list
+ return server
+
+
+async def main():
+ server = create_server()
+ s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage](10)
+ c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage](10)
+
+ received: list[EventParams] = []
+
+ async with (
+ ServerSession(
+ c2s_recv,
+ s2c_send,
+ InitializationOptions(
+ server_name="demo",
+ server_version="0.1.0",
+ capabilities=server.get_capabilities(NotificationOptions(), {}),
+ ),
+ ) as server_session,
+ ClientSession(s2c_recv, c2s_send) as client_session,
+ anyio.create_task_group() as tg,
+ ):
+
+ async def run_server():
+ async for msg in server_session.incoming_messages:
+ if isinstance(msg, RequestResponder):
+ with msg:
+ handler = server.request_handlers.get(type(msg.request.root))
+ if handler:
+ token = request_ctx.set(
+ types.RequestContext(
+ request_id=msg.request_id,
+ meta=msg.request_meta,
+ session=server_session,
+ lifespan_context={},
+ )
+ )
+ try:
+ await msg.respond(await handler(msg.request.root))
+ finally:
+ request_ctx.reset(token)
+
+ tg.start_soon(run_server)
+ await client_session.initialize()
+
+ # Subscribe and set handler
+ await client_session.subscribe_events(["chat/+"])
+
+ @client_session.on_event()
+ async def handle(params: EventParams) -> None:
+ received.append(params)
+
+ # Server emits an event
+ await server_session.emit_event(
+ topic="chat/general",
+ payload={"user": "alice", "text": "hello"},
+ )
+
+ await anyio.sleep(0.1)
+ print(f"Received {len(received)} event(s)")
+ for ev in received:
+ print(f" [{ev.topic}] {ev.payload}")
+
+ tg.cancel_scope.cancel()
+
+
+anyio.run(main)
+```
+
+## Types Reference
+
+| Type | Description |
+|------|-------------|
+| `EventParams` | Notification payload: topic, eventId, payload, timestamp, effects |
+| `EventEmitNotification` | Server-to-client notification wrapping `EventParams` |
+| `EventEffect` | Advisory effect hint (type + priority) |
+| `EventTopicDescriptor` | Describes a topic the server can publish to |
+| `EventsCapability` | Server capability declaration for events |
+| `EventSubscribeParams` | Client request parameters for subscribing |
+| `EventSubscribeResult` | Subscribe response: subscribed, rejected, retained |
+| `EventUnsubscribeParams` | Client request parameters for unsubscribing |
+| `EventUnsubscribeResult` | Unsubscribe response: list of removed patterns |
+| `EventListResult` | Response listing available topic descriptors |
+| `SubscribedTopic` | A successfully subscribed pattern |
+| `RejectedTopic` | A rejected pattern with reason |
+| `RetainedEvent` | A cached event delivered on subscribe |
+| `SubscriptionRegistry` | Server-side session-to-pattern registry with wildcard matching |
+| `RetainedValueStore` | Server-side per-topic retained value cache with expiry |
diff --git a/docs/index.md b/docs/index.md
index 061a2f5bc..61d97412c 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -60,9 +60,10 @@ npx -y @modelcontextprotocol/inspector
1. **[Install](installation.md)** the MCP SDK
2. **[Build servers](server.md)** - tools, resources, prompts, transports, ASGI mounting
3. **[Write clients](client.md)** - connect to servers, use tools/resources/prompts
-4. **[Explore authorization](authorization.md)** - add security to your servers
-5. **[Use low-level APIs](low-level-server.md)** - for advanced customization
-6. **[Protocol features](protocol.md)** - MCP primitives, server capabilities
+4. **[Push events](events.md)** - topic-based server-to-client notifications
+5. **[Explore authorization](authorization.md)** - add security to your servers
+6. **[Use low-level APIs](low-level-server.md)** - for advanced customization
+7. **[Protocol features](protocol.md)** - MCP primitives, server capabilities
## API Reference
diff --git a/docs/protocol.md b/docs/protocol.md
index 2c4604d8c..6941b0a6b 100644
--- a/docs/protocol.md
+++ b/docs/protocol.md
@@ -23,6 +23,7 @@ MCP servers declare capabilities during initialization:
| `tools` | `listChanged` | Tool discovery and execution |
| `logging` | - | Server logging configuration |
| `completions`| - | Argument completion suggestions |
+| `events` | `topics` | Topic-based server-to-client push |
## Ping
@@ -87,6 +88,7 @@ During initialization, the client and server exchange capability declarations. T
- `tools` -- declared when a `list_tools` handler is registered
- `logging` -- declared when a `set_logging_level` handler is registered
- `completions` -- declared when a `completion` handler is registered
+- `events` -- declared when an `EventSubscribeRequest` handler is registered
After initialization, clients can inspect server capabilities:
diff --git a/mkdocs.yml b/mkdocs.yml
index 6f327d006..2ad6cf443 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -18,6 +18,7 @@ nav:
- Writing Clients: client.md
- Protocol Features: protocol.md
- Low-Level Server: low-level-server.md
+ - Events: events.md
- Authorization: authorization.md
- Testing: testing.md
- Experimental:
diff --git a/pyproject.toml b/pyproject.toml
index 3d3e7a72c..f50d3ba98 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -36,6 +36,7 @@ dependencies = [
"pyjwt[crypto]>=2.10.1",
"typing-extensions>=4.9.0",
"typing-inspection>=0.4.1",
+ "python-ulid>=3.0.0",
]
[project.optional-dependencies]
diff --git a/src/mcp/client/events.py b/src/mcp/client/events.py
new file mode 100644
index 000000000..347d623d7
--- /dev/null
+++ b/src/mcp/client/events.py
@@ -0,0 +1,150 @@
+"""Client-side event utilities for MCP.
+
+ProvenanceEnvelope wraps events with client-assessed provenance metadata
+for safe injection into LLM context. EventQueue provides priority-aware
+buffering for events waiting to be processed.
+"""
+
+from __future__ import annotations
+
+from collections import deque
+from dataclasses import dataclass
+from typing import Any, ClassVar
+
+from mcp.types import EventParams
+
+__all__ = ["EventQueue", "ProvenanceEnvelope"]
+
+
+@dataclass
+class ProvenanceEnvelope:
+ """Client-side provenance wrapper for events injected into LLM context.
+
+ Clients generate this locally when honoring inject_context effects.
+ The server_trust field MUST be client-assessed, never server-supplied.
+ """
+
+ server: str
+ server_trust: str # Client-assessed trust tier (e.g., "trusted", "unknown")
+ topic: str
+ source: str | None = None
+ event_id: str | None = None
+ received_at: str | None = None # ISO 8601, client-stamped
+
+ def to_dict(self) -> dict[str, Any]:
+ """Serialize to dict, omitting None values."""
+ d: dict[str, Any] = {
+ "server": self.server,
+ "server_trust": self.server_trust,
+ "topic": self.topic,
+ }
+ if self.source is not None:
+ d["source"] = self.source
+ if self.event_id is not None:
+ d["event_id"] = self.event_id
+ if self.received_at is not None:
+ d["received_at"] = self.received_at
+ return d
+
+ def to_xml(self, payload_text: str = "") -> str:
+ """Format as XML element for LLM context injection.
+
+ Args:
+ payload_text: The event payload as a string (JSON or otherwise).
+ Inserted as the element body.
+
+ Note: All attribute values are XML-escaped via quoteattr to prevent
+ injection from attacker-controlled field values.
+ """
+ from xml.sax.saxutils import escape, quoteattr # noqa: PLC0415
+
+ attrs = " ".join(f"{k}={quoteattr(str(v))}" for k, v in self.to_dict().items())
+ return f"{escape(payload_text)}"
+
+ @classmethod
+ def from_event(
+ cls,
+ event: EventParams,
+ *,
+ server: str,
+ server_trust: str,
+ ) -> ProvenanceEnvelope:
+ """Create an envelope from an EventParams notification.
+
+ Extracts topic, source, and event_id from the event and stamps
+ received_at with the current UTC time.
+ """
+ from datetime import datetime, timezone # noqa: PLC0415
+
+ return cls(
+ server=server,
+ server_trust=server_trust,
+ topic=event.topic,
+ source=event.source,
+ event_id=event.eventId,
+ received_at=datetime.now(timezone.utc).isoformat(),
+ )
+
+
+class EventQueue:
+ """Priority-aware event buffer for client-side processing.
+
+ Events are enqueued with a priority derived from their requested_effects.
+ drain() returns events in priority order (urgent > high > normal > low).
+ """
+
+ _PRIORITY_ORDER: ClassVar[dict[str, int]] = {
+ "urgent": 0,
+ "high": 1,
+ "normal": 2,
+ "low": 3,
+ }
+
+ def __init__(self) -> None:
+ self._queues: dict[str, deque[EventParams]] = {p: deque() for p in self._PRIORITY_ORDER}
+
+ def enqueue(self, event: EventParams) -> None:
+ """Add an event to the appropriate priority queue.
+
+ Priority is derived from the highest-priority requested_effect.
+ Events with no requested_effects default to "normal".
+ """
+ priority = self._resolve_priority(event)
+ self._queues[priority].append(event)
+
+ def drain(self, max_count: int | None = None) -> list[EventParams]:
+ """Remove and return events in priority order.
+
+ Args:
+ max_count: Maximum events to return. None means drain all.
+
+ Returns:
+ Events ordered urgent -> high -> normal -> low.
+ """
+ result: list[EventParams] = []
+ for priority in self._PRIORITY_ORDER:
+ q = self._queues[priority]
+ while q:
+ if max_count is not None and len(result) >= max_count:
+ return result
+ result.append(q.popleft())
+ return result
+
+ def __len__(self) -> int:
+ return sum(len(q) for q in self._queues.values())
+
+ def __bool__(self) -> bool:
+ return any(self._queues.values())
+
+ def _resolve_priority(self, event: EventParams) -> str:
+ """Determine priority from highest-priority requested_effect."""
+ if not event.requestedEffects:
+ return "normal"
+ best = "low"
+ best_rank = self._PRIORITY_ORDER["low"]
+ for effect in event.requestedEffects:
+ rank = self._PRIORITY_ORDER.get(effect.priority, best_rank)
+ if rank < best_rank:
+ best = effect.priority
+ best_rank = rank
+ return best
diff --git a/src/mcp/client/session.py b/src/mcp/client/session.py
index 8519f15ce..6fca13109 100644
--- a/src/mcp/client/session.py
+++ b/src/mcp/client/session.py
@@ -1,4 +1,5 @@
import logging
+import re
from datetime import timedelta
from typing import Any, Protocol, overload
@@ -13,6 +14,7 @@
from mcp.shared.context import RequestContext
from mcp.shared.message import SessionMessage
from mcp.shared.session import BaseSession, ProgressFnT, RequestResponder
+from mcp.shared.topic_patterns import pattern_to_regex
from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS
DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0")
@@ -42,6 +44,10 @@ async def __call__(
) -> types.ListRootsResult | types.ErrorData: ... # pragma: no branch
+class EventHandlerFnT(Protocol):
+ async def __call__(self, params: types.EventParams) -> None: ... # pragma: no branch
+
+
class LoggingFnT(Protocol):
async def __call__(
self,
@@ -140,7 +146,15 @@ def __init__(
self._message_handler = message_handler or _default_message_handler
self._tool_output_schemas: dict[str, dict[str, Any] | None] = {}
self._server_capabilities: types.ServerCapabilities | None = None
+ self._session_id: str | None = None
self._experimental_features: ExperimentalClientFeatures | None = None
+ self._event_handler: EventHandlerFnT | None = None
+ self._event_topic_filter: str | None = None
+ self._event_topic_filter_regex: re.Pattern[str] | None = None
+ self._subscribed_patterns: set[str] = set()
+ # Cache compiled regexes for subscription patterns to avoid
+ # recompiling on every incoming event.
+ self._subscription_regex_cache: dict[str, re.Pattern[str]] = {}
# Experimental: Task handlers (use defaults if not provided)
self._task_handlers = experimental_task_handlers or ExperimentalTaskHandlers()
@@ -192,6 +206,16 @@ async def initialize(self) -> types.InitializeResult:
self._server_capabilities = result.capabilities
+ # FastMCP servers inject a server-assigned session_id into
+ # InitializeResult._meta so clients can synchronously read it after
+ # connect (e.g. to subscribe to session-scoped event topics like
+ # ``sessions/{session_id}/messages``). Non-FastMCP servers typically
+ # omit this, in which case ``self._session_id`` stays ``None``.
+ if result.meta is not None:
+ meta_session_id = result.meta.get("session_id")
+ if isinstance(meta_session_id, str):
+ self._session_id = meta_session_id
+
await self.send_notification(types.ClientNotification(types.InitializedNotification()))
return result
@@ -203,6 +227,17 @@ def get_server_capabilities(self) -> types.ServerCapabilities | None:
"""
return self._server_capabilities
+ @property
+ def session_id(self) -> str | None:
+ """The server-assigned session ID from InitializeResult._meta, if present.
+
+ This is set by FastMCP servers to enable client-side subscription to
+ session-scoped event topics like ``sessions/{session_id}/messages``.
+ Returns None if the server did not provide a session_id (e.g.,
+ non-FastMCP server).
+ """
+ return self._session_id
+
@property
def experimental(self) -> ExperimentalClientFeatures:
"""Experimental APIs for tasks and other features.
@@ -217,6 +252,114 @@ def experimental(self) -> ExperimentalClientFeatures:
self._experimental_features = ExperimentalClientFeatures(self)
return self._experimental_features
+ # ----- Event methods -----
+
+ async def subscribe_events(self, topics: list[str]) -> types.EventSubscribeResult:
+ """Send an events/subscribe request."""
+ result = await self.send_request(
+ types.ClientRequest(
+ types.EventSubscribeRequest(
+ params=types.EventSubscribeParams(topics=topics),
+ )
+ ),
+ types.EventSubscribeResult,
+ )
+ for sub in result.subscribed:
+ self._subscribed_patterns.add(sub.pattern)
+ if sub.pattern not in self._subscription_regex_cache:
+ self._subscription_regex_cache[sub.pattern] = pattern_to_regex(sub.pattern)
+ return result
+
+ async def unsubscribe_events(self, topics: list[str]) -> types.EventUnsubscribeResult:
+ """Send an events/unsubscribe request."""
+ result = await self.send_request(
+ types.ClientRequest(
+ types.EventUnsubscribeRequest(
+ params=types.EventUnsubscribeParams(topics=topics),
+ )
+ ),
+ types.EventUnsubscribeResult,
+ )
+ for pattern in result.unsubscribed:
+ self._subscribed_patterns.discard(pattern)
+ self._subscription_regex_cache.pop(pattern, None)
+ return result
+
+ async def list_events(self) -> types.EventListResult:
+ """Send an events/list request."""
+ return await self.send_request(
+ types.ClientRequest(types.EventListRequest()),
+ types.EventListResult,
+ )
+
+ def set_event_handler(
+ self,
+ handler: EventHandlerFnT,
+ *,
+ topic_filter: str | None = None,
+ ) -> None:
+ """Register a callback for incoming event notifications.
+
+ If *topic_filter* is provided, it is compiled once here and the
+ cached regex is reused for every incoming event. The filter uses
+ the same MQTT-style wildcard syntax as subscription patterns
+ (``+`` for a single segment, ``#`` as a trailing multi-segment
+ wildcard).
+ """
+ self._event_handler = handler
+ self._event_topic_filter = topic_filter
+ self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
+
+ def on_event(self, topic_filter: str | None = None):
+ """Decorator for registering an event handler."""
+
+ def decorator(fn: EventHandlerFnT) -> EventHandlerFnT:
+ self.set_event_handler(fn, topic_filter=topic_filter)
+ return fn
+
+ return decorator
+
+ def _topic_matches_subscriptions(self, topic: str) -> bool:
+ """Check if *topic* matches any of our subscribed patterns.
+
+ Compiled regexes are cached per subscription pattern so incoming
+ events do not pay a recompile cost on every match attempt.
+ """
+ for pattern in self._subscribed_patterns:
+ regex = self._subscription_regex_cache.get(pattern)
+ if regex is None:
+ regex = pattern_to_regex(pattern)
+ self._subscription_regex_cache[pattern] = regex
+ if regex.match(topic):
+ return True
+ return False
+
+ async def _handle_event(self, params: types.EventParams) -> None:
+ """Dispatch an incoming event to the registered handler.
+
+ Filtering order:
+
+ 1. If no handler is registered, drop the event.
+ 2. If the client has any active subscriptions, the topic must
+ match at least one of them. Events for unsubscribed topics
+ are dropped. (A client with zero subscriptions accepts any
+ topic the server chooses to deliver; this is the "pass
+ through" fallback documented in ``docs/events.md``.)
+ 3. If an additional ``topic_filter`` was provided to
+ ``set_event_handler``, the topic must also match that
+ filter.
+ """
+ if self._event_handler is None:
+ return
+
+ if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
+ return
+
+ if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic):
+ return
+
+ await self._event_handler(params)
+
async def send_ping(self) -> types.EmptyResult:
"""Send a ping request."""
return await self.send_request(
@@ -611,5 +754,7 @@ async def _received_notification(self, notification: types.ServerNotification) -
# Clients MAY use this to retry requests or update UI
# The notification contains the elicitationId of the completed elicitation
pass
+ case types.EventEmitNotification(params=params):
+ await self._handle_event(params)
case _:
pass
diff --git a/src/mcp/server/events.py b/src/mcp/server/events.py
new file mode 100644
index 000000000..5341c79ec
--- /dev/null
+++ b/src/mcp/server/events.py
@@ -0,0 +1,165 @@
+"""Event subscription registry and retained value store for MCP events.
+
+This module provides the server-side infrastructure for managing event
+subscriptions using MQTT-style topic wildcards.
+
+Wildcard rules:
+- ``+`` matches exactly one segment (between ``/`` separators)
+- ``#`` matches zero or more trailing segments (must be last segment)
+- Literal segments match exactly
+"""
+
+from __future__ import annotations
+
+import asyncio
+import re
+from datetime import datetime, timezone
+
+from mcp.shared.topic_patterns import pattern_to_regex as _pattern_to_regex
+from mcp.types import RetainedEvent
+
+
+class SubscriptionRegistry:
+ """Thread-safe registry mapping session IDs to topic subscription patterns.
+
+ Supports MQTT-style wildcards (``+`` for single segment, ``#`` for
+ trailing multi-segment). ``match()`` guarantees at-most-once delivery
+ per session regardless of how many patterns overlap.
+ """
+
+ def __init__(self) -> None:
+ self._lock = asyncio.Lock()
+ # session_id -> set of raw pattern strings
+ self._subscriptions: dict[str, set[str]] = {}
+ # Cache compiled regexes: pattern string -> compiled regex
+ self._compiled: dict[str, re.Pattern[str]] = {}
+
+ def _compile(self, pattern: str) -> re.Pattern[str]:
+ if pattern not in self._compiled:
+ self._compiled[pattern] = _pattern_to_regex(pattern)
+ return self._compiled[pattern]
+
+ async def add(self, session_id: str, pattern: str) -> None:
+ """Register a subscription for *session_id* on *pattern*.
+
+ Raises:
+ ValueError: If *pattern* has more than 8 segments.
+ """
+ segments = pattern.split("/")
+ if len(segments) > 8:
+ raise ValueError(f"Topic pattern exceeds maximum depth of 8 segments (got {len(segments)}): {pattern}")
+ async with self._lock:
+ self._subscriptions.setdefault(session_id, set()).add(pattern)
+ self._compile(pattern)
+
+ async def remove(self, session_id: str, pattern: str) -> None:
+ """Remove a single subscription."""
+ async with self._lock:
+ if session_id in self._subscriptions: # pragma: no branch
+ self._subscriptions[session_id].discard(pattern)
+ if not self._subscriptions[session_id]: # pragma: no branch
+ del self._subscriptions[session_id]
+
+ async def remove_all(self, session_id: str) -> None:
+ """Remove all subscriptions for *session_id* (disconnect cleanup)."""
+ async with self._lock:
+ self._subscriptions.pop(session_id, None)
+
+ async def match(self, topic: str) -> set[str]:
+ """Return session IDs whose subscriptions match *topic*.
+
+ Each session appears at most once (at-most-once delivery guarantee).
+ """
+ async with self._lock:
+ result: set[str] = set()
+ for session_id, patterns in self._subscriptions.items():
+ for pattern in patterns:
+ regex = self._compile(pattern)
+ if regex.match(topic):
+ result.add(session_id)
+ break # at-most-once per session
+ return result
+
+ async def get_subscriptions(self, session_id: str) -> set[str]:
+ """Return the set of patterns a session is subscribed to."""
+ async with self._lock:
+ return set(self._subscriptions.get(session_id, set()))
+
+
+class RetainedValueStore:
+ """Stores the most recent event per topic for replay on subscribe.
+
+ This is an *application-level* retained value store, distinct from
+ ``fastmcp/server/event_store.py`` which is an SSE transport-level
+ event store for Streamable HTTP resumability.
+
+ All mutating and reading methods are async and protected by an
+ ``asyncio.Lock`` to ensure safety under concurrent access,
+ mirroring the pattern used by ``SubscriptionRegistry``.
+ """
+
+ def __init__(self) -> None:
+ self._lock = asyncio.Lock()
+ self._store: dict[str, RetainedEvent] = {}
+ self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at
+ self._regex_cache: dict[str, re.Pattern[str]] = {}
+
+ async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None:
+ """Store or replace the retained value for *topic*."""
+ async with self._lock:
+ self._store[topic] = event
+ if expires_at is not None:
+ self._expires[topic] = expires_at
+ else:
+ self._expires.pop(topic, None)
+
+ async def get(self, topic: str) -> RetainedEvent | None:
+ """Retrieve the retained value for *topic*, or ``None`` if expired/absent."""
+ async with self._lock:
+ event = self._store.get(topic)
+ if event is None:
+ return None
+ if self._is_expired(topic):
+ del self._store[topic]
+ self._expires.pop(topic, None)
+ return None
+ return event
+
+ async def get_matching(self, pattern: str) -> list[RetainedEvent]:
+ """Return all non-expired retained events whose topic matches *pattern*."""
+ async with self._lock:
+ if pattern not in self._regex_cache:
+ self._regex_cache[pattern] = _pattern_to_regex(pattern)
+ regex = self._regex_cache[pattern]
+ result: list[RetainedEvent] = []
+ expired_topics: list[str] = []
+ for topic, event in self._store.items():
+ if self._is_expired(topic):
+ expired_topics.append(topic)
+ continue
+ if regex.match(topic):
+ result.append(event)
+ # Clean up expired entries
+ for topic in expired_topics:
+ del self._store[topic]
+ self._expires.pop(topic, None)
+ return result
+
+ async def delete(self, topic: str) -> None:
+ """Remove the retained value for *topic*."""
+ async with self._lock:
+ self._store.pop(topic, None)
+ self._expires.pop(topic, None)
+
+ def _is_expired(self, topic: str) -> bool:
+ """Check if a retained value has expired based on its ``expires_at``."""
+ expires_at = self._expires.get(topic)
+ if expires_at is None:
+ return False
+ try:
+ expiry = datetime.fromisoformat(expires_at)
+ if expiry.tzinfo is None:
+ expiry = expiry.replace(tzinfo=timezone.utc)
+ return datetime.now(timezone.utc) >= expiry
+ except (ValueError, TypeError):
+ return False
diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py
index 2dd1a8277..b9c2546fe 100644
--- a/src/mcp/server/lowlevel/server.py
+++ b/src/mcp/server/lowlevel/server.py
@@ -224,6 +224,11 @@ def get_capabilities(
if types.CompleteRequest in self.request_handlers:
completions_capability = types.CompletionsCapability()
+ # Set events capability if handler exists
+ events_capability = None
+ if types.EventSubscribeRequest in self.request_handlers:
+ events_capability = types.EventsCapability()
+
capabilities = types.ServerCapabilities(
prompts=prompts_capability,
resources=resources_capability,
@@ -231,6 +236,7 @@ def get_capabilities(
logging=logging_capability,
experimental=experimental_capabilities,
completions=completions_capability,
+ events=events_capability,
)
if self._experimental_handlers:
self._experimental_handlers.update_capabilities(capabilities)
diff --git a/src/mcp/server/session.py b/src/mcp/server/session.py
index 8f0baa3e9..8e89968cd 100644
--- a/src/mcp/server/session.py
+++ b/src/mcp/server/session.py
@@ -44,6 +44,7 @@ async def handle_list_prompts(ctx: RequestContext) -> list[types.Prompt]:
import anyio.lowlevel
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import AnyUrl
+from ulid import ULID
import mcp.types as types
from mcp.server.experimental.session_features import ExperimentalServerSessionFeatures
@@ -202,6 +203,46 @@ async def _received_notification(self, notification: types.ClientNotification) -
if self._initialization_state != InitializationState.Initialized: # pragma: no cover
raise RuntimeError("Received notification before initialization was complete")
+ async def emit_event(
+ self,
+ topic: str,
+ payload: Any,
+ *,
+ event_id: str | None = None,
+ timestamp: str | None = None,
+ retained: bool = False,
+ source: str | None = None,
+ correlation_id: str | None = None,
+ requested_effects: list[types.EventEffect] | None = None,
+ expires_at: str | None = None,
+ related_request_id: types.RequestId | None = None,
+ ) -> None:
+ """Push an event to the client on the given topic."""
+ if event_id is None:
+ event_id = str(ULID())
+ if timestamp is None:
+ from datetime import datetime, timezone
+
+ timestamp = datetime.now(timezone.utc).isoformat()
+ await self.send_notification(
+ types.ServerNotification(
+ types.EventEmitNotification(
+ params=types.EventParams(
+ topic=topic,
+ eventId=event_id,
+ payload=payload,
+ timestamp=timestamp,
+ retained=retained,
+ source=source,
+ correlationId=correlation_id,
+ requestedEffects=requested_effects,
+ expiresAt=expires_at,
+ ),
+ )
+ ),
+ related_request_id,
+ )
+
async def send_log_message(
self,
level: types.LoggingLevel,
diff --git a/src/mcp/shared/topic_patterns.py b/src/mcp/shared/topic_patterns.py
new file mode 100644
index 000000000..0552f353f
--- /dev/null
+++ b/src/mcp/shared/topic_patterns.py
@@ -0,0 +1,41 @@
+"""Shared helpers for MQTT-style topic pattern matching.
+
+Both the client (for subscription filtering) and the server (for the
+subscription registry and retained-event store) need to compile MQTT-style
+topic patterns into regular expressions. Keeping the implementation here
+avoids a client -> server import and guarantees identical semantics on both
+sides of the protocol.
+"""
+
+from __future__ import annotations
+
+import re
+
+__all__ = ["pattern_to_regex"]
+
+
+def pattern_to_regex(pattern: str) -> re.Pattern[str]:
+ """Convert an MQTT-style topic pattern to a compiled regex.
+
+ ``+`` becomes a single-segment match, ``#`` becomes a greedy
+ multi-segment match (only valid as the final segment).
+ """
+ parts = pattern.split("/")
+ regex_parts: list[str] = []
+ for i, part in enumerate(parts):
+ if part == "#":
+ if i != len(parts) - 1:
+ raise ValueError("'#' wildcard is only valid as the last segment")
+ # # matches zero or more trailing segments.
+ # If preceding segments exist, the / before # is optional
+ # so "myapp/#" matches both "myapp" and "myapp/anything".
+ # If # is the sole segment, it matches everything.
+ if regex_parts:
+ return re.compile("^" + "/".join(regex_parts) + "(/.*)?$")
+ else:
+ return re.compile("^.*$")
+ elif part == "+":
+ regex_parts.append("[^/]+")
+ else:
+ regex_parts.append(re.escape(part))
+ return re.compile("^" + "/".join(regex_parts) + "$")
diff --git a/src/mcp/types.py b/src/mcp/types.py
index 654c00660..a218ab1e7 100644
--- a/src/mcp/types.py
+++ b/src/mcp/types.py
@@ -520,9 +520,36 @@ class ServerCapabilities(BaseModel):
"""Present if the server offers autocompletion suggestions for prompts and resources."""
tasks: ServerTasksCapability | None = None
"""Present if the server supports task-augmented requests."""
+
+ events: "EventsCapability | None" = None
+ """Present if the server supports publishing events to clients."""
+
model_config = ConfigDict(extra="allow")
+class EventEffect(BaseModel):
+ """Advisory hint about how the client should handle an event."""
+
+ type: Literal["inject_context", "notify_user", "trigger_turn"]
+ priority: Literal["low", "normal", "high", "urgent"] = "normal"
+
+
+class EventTopicDescriptor(BaseModel):
+ """Describes a topic the server can publish to."""
+
+ pattern: str
+ description: str | None = None
+ retained: bool = False
+ schema_: dict[str, Any] | None = Field(None, alias="schema")
+
+
+class EventsCapability(BaseModel):
+ """Server capability for events."""
+
+ topics: list[EventTopicDescriptor] = []
+ instructions: str | None = None
+
+
TaskStatus = Literal["working", "input_required", "completed", "failed", "cancelled"]
# Task status constants
@@ -1419,6 +1446,128 @@ class LoggingMessageNotification(Notification[LoggingMessageNotificationParams,
params: LoggingMessageNotificationParams
+# ---------------------------------------------------------------------------
+# Events
+# ---------------------------------------------------------------------------
+
+
+class EventParams(NotificationParams):
+ """Parameters for events/emit notification."""
+
+ topic: str
+ eventId: str
+ payload: Any
+ timestamp: str | None = None
+ retained: bool = False
+ source: str | None = None
+ correlationId: str | None = None
+ requestedEffects: list[EventEffect] | None = None
+ expiresAt: str | None = None
+
+ @property
+ def event_id(self) -> str:
+ return self.eventId
+
+ @property
+ def correlation_id(self) -> str | None:
+ return self.correlationId
+
+ @property
+ def requested_effects(self) -> list[EventEffect] | None:
+ return self.requestedEffects
+
+ @property
+ def expires_at(self) -> str | None:
+ return self.expiresAt
+
+
+class EventEmitNotification(Notification[EventParams, Literal["events/emit"]]):
+ """Event notification sent from server to client."""
+
+ method: Literal["events/emit"] = "events/emit"
+ params: EventParams
+
+
+class EventSubscribeParams(RequestParams):
+ """Parameters for events/subscribe request."""
+
+ topics: list[str]
+
+
+class SubscribedTopic(BaseModel):
+ """A topic pattern that was successfully subscribed."""
+
+ pattern: str
+
+
+class RejectedTopic(BaseModel):
+ """A topic pattern that was rejected, with reason."""
+
+ pattern: str
+ reason: str
+
+
+class RetainedEvent(BaseModel):
+ """A retained event delivered on subscribe."""
+
+ topic: str
+ eventId: str
+ timestamp: str | None = None
+ payload: Any
+
+ @property
+ def event_id(self) -> str:
+ """Snake-case alias for eventId."""
+ return self.eventId
+
+
+class EventSubscribeResult(Result):
+ """Response to events/subscribe."""
+
+ subscribed: list[SubscribedTopic]
+ rejected: list[RejectedTopic] = []
+ retained: list[RetainedEvent] = []
+
+
+class EventSubscribeRequest(Request[EventSubscribeParams, Literal["events/subscribe"]]):
+ """Client request to subscribe to event topics."""
+
+ method: Literal["events/subscribe"] = "events/subscribe"
+ params: EventSubscribeParams
+
+
+class EventUnsubscribeParams(RequestParams):
+ """Parameters for events/unsubscribe request."""
+
+ topics: list[str]
+
+
+class EventUnsubscribeResult(Result):
+ """Response to events/unsubscribe."""
+
+ unsubscribed: list[str]
+
+
+class EventUnsubscribeRequest(Request[EventUnsubscribeParams, Literal["events/unsubscribe"]]):
+ """Client request to unsubscribe from event topics."""
+
+ method: Literal["events/unsubscribe"] = "events/unsubscribe"
+ params: EventUnsubscribeParams
+
+
+class EventListResult(Result):
+ """Response to events/list."""
+
+ topics: list[EventTopicDescriptor]
+
+
+class EventListRequest(Request[RequestParams | None, Literal["events/list"]]):
+ """Client request to list available event topics."""
+
+ method: Literal["events/list"] = "events/list"
+ params: RequestParams | None = None
+
+
IncludeContext = Literal["none", "thisServer", "allServers"]
@@ -1808,6 +1957,9 @@ class ElicitCompleteNotification(
| GetTaskPayloadRequest
| ListTasksRequest
| CancelTaskRequest
+ | EventSubscribeRequest
+ | EventUnsubscribeRequest
+ | EventListRequest
)
@@ -1969,6 +2121,7 @@ class ServerRequest(RootModel[ServerRequestType]):
| PromptListChangedNotification
| ElicitCompleteNotification
| TaskStatusNotification
+ | EventEmitNotification
)
@@ -1992,6 +2145,9 @@ class ServerNotification(RootModel[ServerNotificationType]):
| ListTasksResult
| CancelTaskResult
| CreateTaskResult
+ | EventSubscribeResult
+ | EventUnsubscribeResult
+ | EventListResult
)
diff --git a/tests/client/test_events.py b/tests/client/test_events.py
new file mode 100644
index 000000000..4d4e45605
--- /dev/null
+++ b/tests/client/test_events.py
@@ -0,0 +1,234 @@
+"""Tests for client-side event utilities: ProvenanceEnvelope and EventQueue."""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from mcp.client.events import EventQueue, ProvenanceEnvelope
+from mcp.types import EventEffect, EventParams
+
+# ---------------------------------------------------------------------------
+# ProvenanceEnvelope
+# ---------------------------------------------------------------------------
+
+
+class TestProvenanceEnvelope:
+ def test_to_dict_all_fields(self) -> None:
+ env = ProvenanceEnvelope(
+ server="ci-server",
+ server_trust="configured",
+ topic="builds/myapp/status",
+ source="ci/jenkins",
+ event_id="evt_a1b2c3d4",
+ received_at="2026-04-09T14:30:00Z",
+ )
+ d = env.to_dict()
+ assert d == {
+ "server": "ci-server",
+ "server_trust": "configured",
+ "topic": "builds/myapp/status",
+ "source": "ci/jenkins",
+ "event_id": "evt_a1b2c3d4",
+ "received_at": "2026-04-09T14:30:00Z",
+ }
+
+ def test_to_dict_optional_none(self) -> None:
+ env = ProvenanceEnvelope(
+ server="my-server",
+ server_trust="unknown",
+ topic="test/topic",
+ )
+ d = env.to_dict()
+ assert d == {
+ "server": "my-server",
+ "server_trust": "unknown",
+ "topic": "test/topic",
+ }
+ assert "source" not in d
+ assert "event_id" not in d
+ assert "received_at" not in d
+
+ def test_to_xml_basic(self) -> None:
+ env = ProvenanceEnvelope(
+ server="spellbook",
+ server_trust="trusted",
+ topic="spellbook/sessions/abc/messages",
+ source="tool/messaging_send",
+ )
+ xml = env.to_xml('{"text": "hello"}')
+ assert xml == (
+ ''
+ '{"text": "hello"}'
+ )
+ # event_id and received_at must not appear when not set
+ assert "event_id" not in xml
+ assert "received_at" not in xml
+
+ def test_to_xml_empty_payload(self) -> None:
+ env = ProvenanceEnvelope(server="s", server_trust="t", topic="x")
+ xml = env.to_xml()
+ assert xml.endswith(">")
+
+ def test_to_xml_with_special_chars_in_payload(self) -> None:
+ env = ProvenanceEnvelope(server="s", server_trust="t", topic="x")
+ xml = env.to_xml('')
+ # Payload body must be escaped
+ assert "