Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
430 changes: 430 additions & 0 deletions docs/events.md

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ npx -y @modelcontextprotocol/inspector
1. **[Install](installation.md)** the MCP SDK
2. **[Build servers](server.md)** - tools, resources, prompts, transports, ASGI mounting
3. **[Write clients](client.md)** - connect to servers, use tools/resources/prompts
4. **[Explore authorization](authorization.md)** - add security to your servers
5. **[Use low-level APIs](low-level-server.md)** - for advanced customization
6. **[Protocol features](protocol.md)** - MCP primitives, server capabilities
4. **[Push events](events.md)** - topic-based server-to-client notifications
5. **[Explore authorization](authorization.md)** - add security to your servers
6. **[Use low-level APIs](low-level-server.md)** - for advanced customization
7. **[Protocol features](protocol.md)** - MCP primitives, server capabilities

## API Reference

Expand Down
2 changes: 2 additions & 0 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MCP servers declare capabilities during initialization:
| `tools` | `listChanged` | Tool discovery and execution |
| `logging` | - | Server logging configuration |
| `completions`| - | Argument completion suggestions |
| `events` | `topics` | Topic-based server-to-client push |

## Ping

Expand Down Expand Up @@ -87,6 +88,7 @@ During initialization, the client and server exchange capability declarations. T
- `tools` -- declared when a `list_tools` handler is registered
- `logging` -- declared when a `set_logging_level` handler is registered
- `completions` -- declared when a `completion` handler is registered
- `events` -- declared when an `EventSubscribeRequest` handler is registered

After initialization, clients can inspect server capabilities:

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nav:
- Writing Clients: client.md
- Protocol Features: protocol.md
- Low-Level Server: low-level-server.md
- Events: events.md
- Authorization: authorization.md
- Testing: testing.md
- Experimental:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"pyjwt[crypto]>=2.10.1",
"typing-extensions>=4.9.0",
"typing-inspection>=0.4.1",
"python-ulid>=3.0.0",
]

[project.optional-dependencies]
Expand Down
150 changes: 150 additions & 0 deletions src/mcp/client/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Client-side event utilities for MCP.

ProvenanceEnvelope wraps events with client-assessed provenance metadata
for safe injection into LLM context. EventQueue provides priority-aware
buffering for events waiting to be processed.
"""

from __future__ import annotations

from collections import deque
from dataclasses import dataclass
from typing import Any, ClassVar

from mcp.types import EventParams

__all__ = ["EventQueue", "ProvenanceEnvelope"]


@dataclass
class ProvenanceEnvelope:
"""Client-side provenance wrapper for events injected into LLM context.

Clients generate this locally when honoring inject_context effects.
The server_trust field MUST be client-assessed, never server-supplied.
"""

server: str
server_trust: str # Client-assessed trust tier (e.g., "trusted", "unknown")
topic: str
source: str | None = None
event_id: str | None = None
received_at: str | None = None # ISO 8601, client-stamped

def to_dict(self) -> dict[str, Any]:
"""Serialize to dict, omitting None values."""
d: dict[str, Any] = {
"server": self.server,
"server_trust": self.server_trust,
"topic": self.topic,
}
if self.source is not None:
d["source"] = self.source
if self.event_id is not None:
d["event_id"] = self.event_id
if self.received_at is not None:
d["received_at"] = self.received_at
return d

def to_xml(self, payload_text: str = "") -> str:
"""Format as XML element for LLM context injection.

Args:
payload_text: The event payload as a string (JSON or otherwise).
Inserted as the element body.

Note: All attribute values are XML-escaped via quoteattr to prevent
injection from attacker-controlled field values.
"""
from xml.sax.saxutils import escape, quoteattr # noqa: PLC0415

attrs = " ".join(f"{k}={quoteattr(str(v))}" for k, v in self.to_dict().items())
return f"<mcp:event {attrs}>{escape(payload_text)}</mcp:event>"

@classmethod
def from_event(
cls,
event: EventParams,
*,
server: str,
server_trust: str,
) -> ProvenanceEnvelope:
"""Create an envelope from an EventParams notification.

Extracts topic, source, and event_id from the event and stamps
received_at with the current UTC time.
"""
from datetime import datetime, timezone # noqa: PLC0415

return cls(
server=server,
server_trust=server_trust,
topic=event.topic,
source=event.source,
event_id=event.eventId,
received_at=datetime.now(timezone.utc).isoformat(),
)


class EventQueue:
"""Priority-aware event buffer for client-side processing.

Events are enqueued with a priority derived from their requested_effects.
drain() returns events in priority order (urgent > high > normal > low).
"""

_PRIORITY_ORDER: ClassVar[dict[str, int]] = {
"urgent": 0,
"high": 1,
"normal": 2,
"low": 3,
}

def __init__(self) -> None:
self._queues: dict[str, deque[EventParams]] = {p: deque() for p in self._PRIORITY_ORDER}

def enqueue(self, event: EventParams) -> None:
"""Add an event to the appropriate priority queue.

Priority is derived from the highest-priority requested_effect.
Events with no requested_effects default to "normal".
"""
priority = self._resolve_priority(event)
self._queues[priority].append(event)

def drain(self, max_count: int | None = None) -> list[EventParams]:
"""Remove and return events in priority order.

Args:
max_count: Maximum events to return. None means drain all.

Returns:
Events ordered urgent -> high -> normal -> low.
"""
result: list[EventParams] = []
for priority in self._PRIORITY_ORDER:
q = self._queues[priority]
while q:
if max_count is not None and len(result) >= max_count:
return result
result.append(q.popleft())
return result

def __len__(self) -> int:
return sum(len(q) for q in self._queues.values())

def __bool__(self) -> bool:
return any(self._queues.values())

def _resolve_priority(self, event: EventParams) -> str:
"""Determine priority from highest-priority requested_effect."""
if not event.requestedEffects:
return "normal"
best = "low"
best_rank = self._PRIORITY_ORDER["low"]
for effect in event.requestedEffects:
rank = self._PRIORITY_ORDER.get(effect.priority, best_rank)
if rank < best_rank:
best = effect.priority
best_rank = rank
return best
145 changes: 145 additions & 0 deletions src/mcp/client/session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from datetime import timedelta
from typing import Any, Protocol, overload

Expand All @@ -13,6 +14,7 @@
from mcp.shared.context import RequestContext
from mcp.shared.message import SessionMessage
from mcp.shared.session import BaseSession, ProgressFnT, RequestResponder
from mcp.shared.topic_patterns import pattern_to_regex
from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS

DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0")
Expand Down Expand Up @@ -42,6 +44,10 @@ async def __call__(
) -> types.ListRootsResult | types.ErrorData: ... # pragma: no branch


class EventHandlerFnT(Protocol):
async def __call__(self, params: types.EventParams) -> None: ... # pragma: no branch


class LoggingFnT(Protocol):
async def __call__(
self,
Expand Down Expand Up @@ -140,7 +146,15 @@ def __init__(
self._message_handler = message_handler or _default_message_handler
self._tool_output_schemas: dict[str, dict[str, Any] | None] = {}
self._server_capabilities: types.ServerCapabilities | None = None
self._session_id: str | None = None
self._experimental_features: ExperimentalClientFeatures | None = None
self._event_handler: EventHandlerFnT | None = None
self._event_topic_filter: str | None = None
self._event_topic_filter_regex: re.Pattern[str] | None = None
Comment on lines +151 to +153
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation only supports a single global event handler. If a user uses the @on_event decorator multiple times, each subsequent call will overwrite the previous handler. This is likely unexpected behavior for a PUB/SUB system. Consider maintaining a list of handlers and their associated filters.

Suggested change
self._event_handler: EventHandlerFnT | None = None
self._event_topic_filter: str | None = None
self._event_topic_filter_regex: re.Pattern[str] | None = None
self._event_handlers: list[tuple[EventHandlerFnT, re.Pattern[str] | None]] = []

self._subscribed_patterns: set[str] = set()
# Cache compiled regexes for subscription patterns to avoid
# recompiling on every incoming event.
self._subscription_regex_cache: dict[str, re.Pattern[str]] = {}

# Experimental: Task handlers (use defaults if not provided)
self._task_handlers = experimental_task_handlers or ExperimentalTaskHandlers()
Expand Down Expand Up @@ -192,6 +206,16 @@ async def initialize(self) -> types.InitializeResult:

self._server_capabilities = result.capabilities

# FastMCP servers inject a server-assigned session_id into
# InitializeResult._meta so clients can synchronously read it after
# connect (e.g. to subscribe to session-scoped event topics like
# ``sessions/{session_id}/messages``). Non-FastMCP servers typically
# omit this, in which case ``self._session_id`` stays ``None``.
if result.meta is not None:
meta_session_id = result.meta.get("session_id")
if isinstance(meta_session_id, str):
self._session_id = meta_session_id

await self.send_notification(types.ClientNotification(types.InitializedNotification()))

return result
Expand All @@ -203,6 +227,17 @@ def get_server_capabilities(self) -> types.ServerCapabilities | None:
"""
return self._server_capabilities

@property
def session_id(self) -> str | None:
"""The server-assigned session ID from InitializeResult._meta, if present.

This is set by FastMCP servers to enable client-side subscription to
session-scoped event topics like ``sessions/{session_id}/messages``.
Returns None if the server did not provide a session_id (e.g.,
non-FastMCP server).
"""
return self._session_id

@property
def experimental(self) -> ExperimentalClientFeatures:
"""Experimental APIs for tasks and other features.
Expand All @@ -217,6 +252,114 @@ def experimental(self) -> ExperimentalClientFeatures:
self._experimental_features = ExperimentalClientFeatures(self)
return self._experimental_features

# ----- Event methods -----

async def subscribe_events(self, topics: list[str]) -> types.EventSubscribeResult:
"""Send an events/subscribe request."""
result = await self.send_request(
types.ClientRequest(
types.EventSubscribeRequest(
params=types.EventSubscribeParams(topics=topics),
)
),
types.EventSubscribeResult,
)
for sub in result.subscribed:
self._subscribed_patterns.add(sub.pattern)
if sub.pattern not in self._subscription_regex_cache:
self._subscription_regex_cache[sub.pattern] = pattern_to_regex(sub.pattern)
return result

async def unsubscribe_events(self, topics: list[str]) -> types.EventUnsubscribeResult:
"""Send an events/unsubscribe request."""
result = await self.send_request(
types.ClientRequest(
types.EventUnsubscribeRequest(
params=types.EventUnsubscribeParams(topics=topics),
)
),
types.EventUnsubscribeResult,
)
for pattern in result.unsubscribed:
self._subscribed_patterns.discard(pattern)
self._subscription_regex_cache.pop(pattern, None)
return result

async def list_events(self) -> types.EventListResult:
"""Send an events/list request."""
return await self.send_request(
types.ClientRequest(types.EventListRequest()),
types.EventListResult,
)

def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.

If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
self._event_handler = handler
self._event_topic_filter = topic_filter
Comment on lines +295 to +310
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The topic_filter provided to set_event_handler should be pre-compiled into a regex pattern during registration rather than being reconstructed and recompiled on every incoming event in _handle_event. This will improve performance when processing high volumes of events.

self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
Comment on lines +295 to +311
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Renaming this to add_event_handler and appending to a list would allow multiple handlers to coexist, fixing the overwriting issue when using the decorator or multiple manual registrations.

Suggested change
def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.
If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
self._event_handler = handler
self._event_topic_filter = topic_filter
self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
def add_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.
If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
self._event_handlers.append((handler, regex))


def on_event(self, topic_filter: str | None = None):
"""Decorator for registering an event handler."""

def decorator(fn: EventHandlerFnT) -> EventHandlerFnT:
self.set_event_handler(fn, topic_filter=topic_filter)
return fn

return decorator

def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if *topic* matches any of our subscribed patterns.

Compiled regexes are cached per subscription pattern so incoming
events do not pay a recompile cost on every match attempt.
"""
for pattern in self._subscribed_patterns:
regex = self._subscription_regex_cache.get(pattern)
if regex is None:
regex = pattern_to_regex(pattern)
self._subscription_regex_cache[pattern] = regex
if regex.match(topic):
return True
return False
Comment on lines +322 to +335
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT topic matching logic is duplicated here and in _handle_event. It also contains the same bug regarding the root # wildcard as found in src/mcp/server/events.py. Consider moving the pattern-to-regex conversion logic to a shared utility module (e.g., src/mcp/shared/topic_utils.py) to ensure consistency and maintainability.


Comment on lines +322 to +336
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT-style topic matching logic in _topic_matches_subscriptions is inefficient and duplicated.

  1. Performance: The code repeatedly splits pattern strings and constructs regex patterns for every incoming event. This will become a significant CPU bottleneck for clients with multiple subscriptions or high event volume. Compiled regexes should be cached.
  2. Duplication: The logic for converting MQTT wildcards (+, #) to regex is duplicated in _handle_event (lines 316-336) and is also present in src/mcp/server/events.py.
  3. Validation: This implementation lacks the validation enforced on the server side (e.g., ensuring # is the last segment).

Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the ClientSession (e.g., using a dict[str, re.Pattern]).

async def _handle_event(self, params: types.EventParams) -> None:
"""Dispatch an incoming event to the registered handler.

Filtering order:

1. If no handler is registered, drop the event.
2. If the client has any active subscriptions, the topic must
match at least one of them. Events for unsubscribed topics
are dropped. (A client with zero subscriptions accepts any
topic the server chooses to deliver; this is the "pass
through" fallback documented in ``docs/events.md``.)
3. If an additional ``topic_filter`` was provided to
``set_event_handler``, the topic must also match that
filter.
"""
if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior.

According to docs/events.md:247, events that do not match any active subscription should be dropped. However, the current implementation if self._subscribed_patterns and ... only performs the check if the client has at least one subscription. This means a client with zero subscriptions will receive all events pushed by the server, but as soon as it subscribes to a single topic, it becomes restricted.

Removing the self._subscribed_patterns and check will ensure that events are only processed if they match an active subscription, aligning the code with the documented behavior.

Suggested change
if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
if not self._topic_matches_subscriptions(params.topic):

return

if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic):
return

await self._event_handler(params)
Comment on lines +322 to +361
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in _topic_matches_subscriptions and _handle_event. A similar function, _pattern_to_regex, also exists in src/mcp/server/events.py.

To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example mcp.shared.events_utils.

For example, you could have a function like:

# in a new mcp/shared/events_utils.py
import re

def topic_matches_pattern(topic: str, pattern: str) -> bool:
    # ... implementation of regex conversion and matching ...

This would simplify both _topic_matches_subscriptions and _handle_event significantly.

Comment on lines +352 to +361
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Awaiting the event handler directly in the reader loop means that a slow or blocking handler will stop the client from receiving any further messages (including responses to pending requests). Consider spawning a background task for each handler execution, though be mindful of event ordering if that is a requirement.


async def send_ping(self) -> types.EmptyResult:
"""Send a ping request."""
return await self.send_request(
Expand Down Expand Up @@ -611,5 +754,7 @@ async def _received_notification(self, notification: types.ServerNotification) -
# Clients MAY use this to retry requests or update UI
# The notification contains the elicitationId of the completed elicitation
pass
case types.EventEmitNotification(params=params):
await self._handle_event(params)
case _:
pass
Loading
Loading