Skip to content

MCP Events: Topic-based PUB/SUB server-to-client events#1

Open
elijahr wants to merge 8 commits intomainfrom
mcp-events
Open

MCP Events: Topic-based PUB/SUB server-to-client events#1
elijahr wants to merge 8 commits intomainfrom
mcp-events

Conversation

@elijahr
Copy link
Copy Markdown

@elijahr elijahr commented Apr 8, 2026

Summary

  • Adds events/subscribe, events/unsubscribe, events/list, and events/emit as new MCP protocol methods
  • SubscriptionRegistry with MQTT-style wildcard matching (+/#) and at-most-once delivery
  • RetainedValueStore with TTL expiry and async safety
  • Client-side event handler with topic filtering and subscription tracking
  • 64 tests covering types, registry, and end-to-end roundtrip

Context

Reference implementation for MCP SEP: topic-based server-to-client events.
This is the protocol foundation layer that fastmcp and opencode build on.

Test plan

  • uv run pytest tests/test_event_types.py tests/test_subscription_registry.py tests/test_event_roundtrip.py -x -v

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request restructures the project documentation into a dedicated docs/ directory and introduces a new events system for server-to-client notifications. It also implements idle session cleanup for Streamable HTTP, adds metadata support for resources, and improves transport reliability and UTF-8 handling. Review feedback suggests enhancing logging consistency in the conformance client and refactoring duplicated validation and error-response logic in the HTTP session manager to improve maintainability.

Comment on lines +344 to +346
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <server-url>", file=sys.stderr)
sys.exit(1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The main entry point uses print() to stderr for usage instructions. Consider using logging.error() for consistency with the rest of the script's logging setup.

Comment on lines +75 to +78
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise RuntimeError("session_idle_timeout is not supported in stateless mode")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for validating session_idle_timeout is duplicated in the constructor. Consider moving this validation to a helper method or using a property setter to ensure consistency.

Comment on lines +311 to +317
error_response = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(
code=INVALID_REQUEST,
message="Session not found",
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error response construction is repeated. Consider defining a helper method to generate standard JSON-RPC error responses to reduce code duplication.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 8, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements an events system with publish/subscribe functionality, adds metadata support to resources, and improves OAuth 2.1 conformance. It also refactors the documentation, introduces conformance testing tools, and adds idle session management for Streamable HTTP. The review identified a bug in the MQTT-style topic matching regex for root wildcards and recommended refactoring duplicated matching logic into a shared utility. Additional feedback suggested caching compiled regex patterns for performance and moving local imports to the module level.

raise ValueError("'#' wildcard is only valid as the last segment")
# Use (/.*)?$ so that # matches zero or more trailing segments.
# e.g. "a/#" -> "^a(/.*)?$" matches "a", "a/b", "a/b/c"
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The regex generated for the MQTT multi-level wildcard # is incorrect when it is the only segment in the pattern. If pattern is #, regex_parts will be empty, and the resulting regex will be ^(/.*)?$. This regex only matches strings starting with a forward slash (or an empty string), failing to match common topics like foo. Per MQTT specification, a root # should match all topics.

Suggested change
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$")
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") if regex_parts else re.compile("^.*$")

return event

async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regex for the topic pattern is compiled on every call to get_matching. For better performance, especially when many retained events are stored, consider caching the compiled regex patterns, similar to how SubscriptionRegistry does it.

Comment on lines +283 to +304
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
import re as _re

for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
if _re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(_re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if _re.match(regex, topic):
return True
return False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT topic matching logic is duplicated here and in _handle_event. It also contains the same bug regarding the root # wildcard as found in src/mcp/server/events.py. Consider moving the pattern-to-regex conversion logic to a shared utility module (e.g., src/mcp/shared/topic_utils.py) to ensure consistency and maintainability.


def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
import re as _re
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Importing re inside a method that may be called frequently (on every incoming event) is inefficient. Move the import to the top level of the module.

Comment on lines +221 to +225
from ulid import ULID

event_id = str(ULID())
if timestamp is None:
from datetime import datetime, timezone
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since python-ulid is now a required dependency in pyproject.toml, these imports should be moved to the top level of the module instead of being inside the method.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 8, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request performs a major documentation restructure, moving content from the README to a dedicated docs/ directory and adding several project policy files. It introduces a new events system for server-to-client notifications with wildcard subscriptions, adds metadata support to resources, and improves server reliability through idle session timeouts and better transport-close handling. A review comment correctly identifies an opportunity to reduce code duplication by centralizing the MQTT-style topic matching logic into a shared utility.

Comment on lines +284 to +338
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
if re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if re.match(regex, topic):
return True
return False

async def _handle_event(self, params: types.EventParams) -> None:
"""Dispatch an incoming event to the registered handler."""
if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
return

if self._event_topic_filter is not None:
parts = self._event_topic_filter.split("/")
regex_parts: list[str] = []
matched = False
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
matched = bool(re.match(regex, params.topic))
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
matched = bool(re.match(regex, params.topic))
if not matched:
return

await self._event_handler(params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in _topic_matches_subscriptions and _handle_event. A similar function, _pattern_to_regex, also exists in src/mcp/server/events.py.

To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example mcp.shared.events_utils.

For example, you could have a function like:

# in a new mcp/shared/events_utils.py
import re

def topic_matches_pattern(topic: str, pattern: str) -> bool:
    # ... implementation of regex conversion and matching ...

This would simplify both _topic_matches_subscriptions and _handle_event significantly.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 9, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive event system for the MCP Python SDK, enabling server-to-client push notifications with MQTT-style topic wildcards and retained values. Key enhancements include metadata support for resources, session idle timeouts for Streamable HTTP, improved transport closure handling, and a significant overhaul of the documentation. Feedback focuses on performance optimizations for event matching, specifically recommending the caching of compiled regex patterns to avoid overhead during high event volumes. Additionally, a logic correction was suggested to ensure that events are dropped when no matching subscriptions exist, aligning the implementation with the provided documentation.

Comment on lines +265 to +273
def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications."""
self._event_handler = handler
self._event_topic_filter = topic_filter
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The topic_filter provided to set_event_handler should be pre-compiled into a regex pattern during registration rather than being reconstructed and recompiled on every incoming event in _handle_event. This will improve performance when processing high volumes of events.

if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior.

According to docs/events.md:247, events that do not match any active subscription should be dropped. However, the current implementation if self._subscribed_patterns and ... only performs the check if the client has at least one subscription. This means a client with zero subscriptions will receive all events pushed by the server, but as soon as it subscribes to a single topic, it becomes restricted.

Removing the self._subscribed_patterns and check will ensure that events are only processed if they match an active subscription, aligning the code with the documented behavior.

Suggested change
if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
if not self._topic_matches_subscriptions(params.topic):

Comment on lines +284 to +307
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
if re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if re.match(regex, topic):
return True
return False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT-style topic matching logic in _topic_matches_subscriptions is inefficient and duplicated.

  1. Performance: The code repeatedly splits pattern strings and constructs regex patterns for every incoming event. This will become a significant CPU bottleneck for clients with multiple subscriptions or high event volume. Compiled regexes should be cached.
  2. Duplication: The logic for converting MQTT wildcards (+, #) to regex is duplicated in _handle_event (lines 316-336) and is also present in src/mcp/server/events.py.
  3. Validation: This implementation lacks the validation enforced on the server side (e.g., ensuring # is the last segment).

Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the ClientSession (e.g., using a dict[str, re.Pattern]).

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 9, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major update to the MCP Python SDK, featuring a new event system for server-to-client push notifications, support for resource metadata, and an idle timeout mechanism for stateful HTTP sessions. The project structure is refined by moving detailed documentation to a dedicated docs/ directory and adding formal roadmap, versioning, and dependency policies. Feedback suggests standardizing timeout validation logic and error messages, avoiding hardcoded identifiers in session error responses, and optimizing memory usage when closing response streams by avoiding full dictionary copies.

Comment on lines +75 to +76
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The validation logic for session_idle_timeout should be consistent with other timeout configurations in the SDK. Consider using a more descriptive error message or a standard validation pattern if available.

Comment on lines +311 to +317
error_response = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(
code=INVALID_REQUEST,
message="Session not found",
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error response for a missing session ID uses a hardcoded 'server-error' ID. Consider using a more dynamic or standard identifier if the protocol allows, or ensure this ID is documented as a specific error case.

for id, stream in self._response_streams.items():
# Snapshot: stream.send() wakes the waiter, whose finally pops
# from _response_streams before the next __next__() call.
for id, stream in list(self._response_streams.items()):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using list(self._response_streams.items()) creates a full copy of the dictionary items. If this dictionary is large, consider using an iterator or a more memory-efficient approach to avoid unnecessary allocations during connection closure.

elijahr added 4 commits April 8, 2026 23:08
…nt events

Implements events/subscribe, events/unsubscribe, events/list, and events/emit
as new MCP protocol methods. Includes subscription registry with MQTT-style
wildcard matching (+/#), retained value store with TTL expiry, and client-side
event handler with topic filtering and subscription tracking.

Reference implementation for MCP SEP (Specification Enhancement Proposal)
for topic-based server-to-client events.
elijahr added 4 commits April 8, 2026 23:27
Read session_id from InitializeResult._meta after initialize and expose
as session.session_id property. Enables client-side construction of
session-scoped event topics for the {session_id} authorization convention.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant