MCP Events: Topic-based PUB/SUB server-to-client events#1
MCP Events: Topic-based PUB/SUB server-to-client events#1
Conversation
There was a problem hiding this comment.
Code Review
This pull request restructures the project documentation into a dedicated docs/ directory and introduces a new events system for server-to-client notifications. It also implements idle session cleanup for Streamable HTTP, adds metadata support for resources, and improves transport reliability and UTF-8 handling. Review feedback suggests enhancing logging consistency in the conformance client and refactoring duplicated validation and error-response logic in the HTTP session manager to improve maintainability.
| if len(sys.argv) < 2: | ||
| print(f"Usage: {sys.argv[0]} <server-url>", file=sys.stderr) | ||
| sys.exit(1) |
| if session_idle_timeout is not None and session_idle_timeout <= 0: | ||
| raise ValueError("session_idle_timeout must be a positive number of seconds") | ||
| if stateless and session_idle_timeout is not None: | ||
| raise RuntimeError("session_idle_timeout is not supported in stateless mode") |
| error_response = JSONRPCError( | ||
| jsonrpc="2.0", | ||
| id="server-error", | ||
| error=ErrorData( | ||
| code=INVALID_REQUEST, | ||
| message="Session not found", | ||
| ), |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements an events system with publish/subscribe functionality, adds metadata support to resources, and improves OAuth 2.1 conformance. It also refactors the documentation, introduces conformance testing tools, and adds idle session management for Streamable HTTP. The review identified a bug in the MQTT-style topic matching regex for root wildcards and recommended refactoring duplicated matching logic into a shared utility. Additional feedback suggested caching compiled regex patterns for performance and moving local imports to the module level.
src/mcp/server/events.py
Outdated
| raise ValueError("'#' wildcard is only valid as the last segment") | ||
| # Use (/.*)?$ so that # matches zero or more trailing segments. | ||
| # e.g. "a/#" -> "^a(/.*)?$" matches "a", "a/b", "a/b/c" | ||
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") |
There was a problem hiding this comment.
The regex generated for the MQTT multi-level wildcard # is incorrect when it is the only segment in the pattern. If pattern is #, regex_parts will be empty, and the resulting regex will be ^(/.*)?$. This regex only matches strings starting with a forward slash (or an empty string), failing to match common topics like foo. Per MQTT specification, a root # should match all topics.
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") | |
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") if regex_parts else re.compile("^.*$") |
| return event | ||
|
|
||
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | ||
| """Return all non-expired retained events whose topic matches *pattern*.""" |
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| import re as _re | ||
|
|
||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| if _re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(_re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if _re.match(regex, topic): | ||
| return True | ||
| return False |
There was a problem hiding this comment.
The MQTT topic matching logic is duplicated here and in _handle_event. It also contains the same bug regarding the root # wildcard as found in src/mcp/server/events.py. Consider moving the pattern-to-regex conversion logic to a shared utility module (e.g., src/mcp/shared/topic_utils.py) to ensure consistency and maintainability.
src/mcp/client/session.py
Outdated
|
|
||
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| import re as _re |
src/mcp/server/session.py
Outdated
| from ulid import ULID | ||
|
|
||
| event_id = str(ULID()) | ||
| if timestamp is None: | ||
| from datetime import datetime, timezone |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request performs a major documentation restructure, moving content from the README to a dedicated docs/ directory and adding several project policy files. It introduces a new events system for server-to-client notifications with wildcard subscriptions, adds metadata support to resources, and improves server reliability through idle session timeouts and better transport-close handling. A review comment correctly identifies an opportunity to reduce code duplication by centralizing the MQTT-style topic matching logic into a shared utility.
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| return False | ||
|
|
||
| async def _handle_event(self, params: types.EventParams) -> None: | ||
| """Dispatch an incoming event to the registered handler.""" | ||
| if self._event_handler is None: | ||
| return | ||
|
|
||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | ||
| return | ||
|
|
||
| if self._event_topic_filter is not None: | ||
| parts = self._event_topic_filter.split("/") | ||
| regex_parts: list[str] = [] | ||
| matched = False | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| matched = bool(re.match(regex, params.topic)) | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| matched = bool(re.match(regex, params.topic)) | ||
| if not matched: | ||
| return | ||
|
|
||
| await self._event_handler(params) |
There was a problem hiding this comment.
The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in _topic_matches_subscriptions and _handle_event. A similar function, _pattern_to_regex, also exists in src/mcp/server/events.py.
To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example mcp.shared.events_utils.
For example, you could have a function like:
# in a new mcp/shared/events_utils.py
import re
def topic_matches_pattern(topic: str, pattern: str) -> bool:
# ... implementation of regex conversion and matching ...This would simplify both _topic_matches_subscriptions and _handle_event significantly.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for the MCP Python SDK, enabling server-to-client push notifications with MQTT-style topic wildcards and retained values. Key enhancements include metadata support for resources, session idle timeouts for Streamable HTTP, improved transport closure handling, and a significant overhaul of the documentation. Feedback focuses on performance optimizations for event matching, specifically recommending the caching of compiled regex patterns to avoid overhead during high event volumes. Additionally, a logic correction was suggested to ensure that events are dropped when no matching subscriptions exist, aligning the implementation with the provided documentation.
| def set_event_handler( | ||
| self, | ||
| handler: EventHandlerFnT, | ||
| *, | ||
| topic_filter: str | None = None, | ||
| ) -> None: | ||
| """Register a callback for incoming event notifications.""" | ||
| self._event_handler = handler | ||
| self._event_topic_filter = topic_filter |
There was a problem hiding this comment.
| if self._event_handler is None: | ||
| return | ||
|
|
||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): |
There was a problem hiding this comment.
The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior.
According to docs/events.md:247, events that do not match any active subscription should be dropped. However, the current implementation if self._subscribed_patterns and ... only performs the check if the client has at least one subscription. This means a client with zero subscriptions will receive all events pushed by the server, but as soon as it subscribes to a single topic, it becomes restricted.
Removing the self._subscribed_patterns and check will ensure that events are only processed if they match an active subscription, aligning the code with the documented behavior.
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | |
| if not self._topic_matches_subscriptions(params.topic): | |
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| return False | ||
|
|
There was a problem hiding this comment.
The MQTT-style topic matching logic in _topic_matches_subscriptions is inefficient and duplicated.
- Performance: The code repeatedly splits pattern strings and constructs regex patterns for every incoming event. This will become a significant CPU bottleneck for clients with multiple subscriptions or high event volume. Compiled regexes should be cached.
- Duplication: The logic for converting MQTT wildcards (
+,#) to regex is duplicated in_handle_event(lines 316-336) and is also present insrc/mcp/server/events.py. - Validation: This implementation lacks the validation enforced on the server side (e.g., ensuring
#is the last segment).
Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the ClientSession (e.g., using a dict[str, re.Pattern]).
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a major update to the MCP Python SDK, featuring a new event system for server-to-client push notifications, support for resource metadata, and an idle timeout mechanism for stateful HTTP sessions. The project structure is refined by moving detailed documentation to a dedicated docs/ directory and adding formal roadmap, versioning, and dependency policies. Feedback suggests standardizing timeout validation logic and error messages, avoiding hardcoded identifiers in session error responses, and optimizing memory usage when closing response streams by avoiding full dictionary copies.
| if session_idle_timeout is not None and session_idle_timeout <= 0: | ||
| raise ValueError("session_idle_timeout must be a positive number of seconds") |
| error_response = JSONRPCError( | ||
| jsonrpc="2.0", | ||
| id="server-error", | ||
| error=ErrorData( | ||
| code=INVALID_REQUEST, | ||
| message="Session not found", | ||
| ), |
| for id, stream in self._response_streams.items(): | ||
| # Snapshot: stream.send() wakes the waiter, whose finally pops | ||
| # from _response_streams before the next __next__() call. | ||
| for id, stream in list(self._response_streams.items()): |
…nt events Implements events/subscribe, events/unsubscribe, events/list, and events/emit as new MCP protocol methods. Includes subscription registry with MQTT-style wildcard matching (+/#), retained value store with TTL expiry, and client-side event handler with topic filtering and subscription tracking. Reference implementation for MCP SEP (Specification Enhancement Proposal) for topic-based server-to-client events.
Read session_id from InitializeResult._meta after initialize and expose
as session.session_id property. Enables client-side construction of
session-scoped event topics for the {session_id} authorization convention.
Summary
Context
Reference implementation for MCP SEP: topic-based server-to-client events.
This is the protocol foundation layer that fastmcp and opencode build on.
Test plan
uv run pytest tests/test_event_types.py tests/test_subscription_registry.py tests/test_event_roundtrip.py -x -v