-
Notifications
You must be signed in to change notification settings - Fork 0
MCP Events: Topic-based PUB/SUB server-to-client events #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
59c3265
ac9f3eb
980d377
692460f
2c5e19a
29cb386
85cc652
4a81199
8740f5b
85d574a
1b08567
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| """Client-side event utilities for MCP. | ||
|
|
||
| ProvenanceEnvelope wraps events with client-assessed provenance metadata | ||
| for safe injection into LLM context. EventQueue provides priority-aware | ||
| buffering for events waiting to be processed. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections import deque | ||
| from dataclasses import dataclass | ||
| from typing import Any, ClassVar | ||
|
|
||
| from mcp.types import EventParams | ||
|
|
||
| __all__ = ["EventQueue", "ProvenanceEnvelope"] | ||
|
|
||
|
|
||
| @dataclass | ||
| class ProvenanceEnvelope: | ||
| """Client-side provenance wrapper for events injected into LLM context. | ||
|
|
||
| Clients generate this locally when honoring inject_context effects. | ||
| The server_trust field MUST be client-assessed, never server-supplied. | ||
| """ | ||
|
|
||
| server: str | ||
| server_trust: str # Client-assessed trust tier (e.g., "trusted", "unknown") | ||
| topic: str | ||
| source: str | None = None | ||
| event_id: str | None = None | ||
| received_at: str | None = None # ISO 8601, client-stamped | ||
|
|
||
| def to_dict(self) -> dict[str, Any]: | ||
| """Serialize to dict, omitting None values.""" | ||
| d: dict[str, Any] = { | ||
| "server": self.server, | ||
| "server_trust": self.server_trust, | ||
| "topic": self.topic, | ||
| } | ||
| if self.source is not None: | ||
| d["source"] = self.source | ||
| if self.event_id is not None: | ||
| d["event_id"] = self.event_id | ||
| if self.received_at is not None: | ||
| d["received_at"] = self.received_at | ||
| return d | ||
|
|
||
| def to_xml(self, payload_text: str = "") -> str: | ||
| """Format as XML element for LLM context injection. | ||
|
|
||
| Args: | ||
| payload_text: The event payload as a string (JSON or otherwise). | ||
| Inserted as the element body. | ||
|
|
||
| Note: All attribute values are XML-escaped via quoteattr to prevent | ||
| injection from attacker-controlled field values. | ||
| """ | ||
| from xml.sax.saxutils import escape, quoteattr # noqa: PLC0415 | ||
|
|
||
| attrs = " ".join(f"{k}={quoteattr(str(v))}" for k, v in self.to_dict().items()) | ||
| return f"<mcp:event {attrs}>{escape(payload_text)}</mcp:event>" | ||
|
|
||
| @classmethod | ||
| def from_event( | ||
| cls, | ||
| event: EventParams, | ||
| *, | ||
| server: str, | ||
| server_trust: str, | ||
| ) -> ProvenanceEnvelope: | ||
| """Create an envelope from an EventParams notification. | ||
|
|
||
| Extracts topic, source, and event_id from the event and stamps | ||
| received_at with the current UTC time. | ||
| """ | ||
| from datetime import datetime, timezone # noqa: PLC0415 | ||
|
|
||
| return cls( | ||
| server=server, | ||
| server_trust=server_trust, | ||
| topic=event.topic, | ||
| source=event.source, | ||
| event_id=event.eventId, | ||
| received_at=datetime.now(timezone.utc).isoformat(), | ||
| ) | ||
|
|
||
|
|
||
| class EventQueue: | ||
| """Priority-aware event buffer for client-side processing. | ||
|
|
||
| Events are enqueued with a priority derived from their requested_effects. | ||
| drain() returns events in priority order (urgent > high > normal > low). | ||
| """ | ||
|
|
||
| _PRIORITY_ORDER: ClassVar[dict[str, int]] = { | ||
| "urgent": 0, | ||
| "high": 1, | ||
| "normal": 2, | ||
| "low": 3, | ||
| } | ||
|
|
||
| def __init__(self) -> None: | ||
| self._queues: dict[str, deque[EventParams]] = {p: deque() for p in self._PRIORITY_ORDER} | ||
|
|
||
| def enqueue(self, event: EventParams) -> None: | ||
| """Add an event to the appropriate priority queue. | ||
|
|
||
| Priority is derived from the highest-priority requested_effect. | ||
| Events with no requested_effects default to "normal". | ||
| """ | ||
| priority = self._resolve_priority(event) | ||
| self._queues[priority].append(event) | ||
|
|
||
| def drain(self, max_count: int | None = None) -> list[EventParams]: | ||
| """Remove and return events in priority order. | ||
|
|
||
| Args: | ||
| max_count: Maximum events to return. None means drain all. | ||
|
|
||
| Returns: | ||
| Events ordered urgent -> high -> normal -> low. | ||
| """ | ||
| result: list[EventParams] = [] | ||
| for priority in self._PRIORITY_ORDER: | ||
| q = self._queues[priority] | ||
| while q: | ||
| if max_count is not None and len(result) >= max_count: | ||
| return result | ||
| result.append(q.popleft()) | ||
| return result | ||
|
|
||
| def __len__(self) -> int: | ||
| return sum(len(q) for q in self._queues.values()) | ||
|
|
||
| def __bool__(self) -> bool: | ||
| return any(self._queues.values()) | ||
|
|
||
| def _resolve_priority(self, event: EventParams) -> str: | ||
| """Determine priority from highest-priority requested_effect.""" | ||
| if not event.requestedEffects: | ||
| return "normal" | ||
| best = "low" | ||
| best_rank = self._PRIORITY_ORDER["low"] | ||
| for effect in event.requestedEffects: | ||
| rank = self._PRIORITY_ORDER.get(effect.priority, best_rank) | ||
| if rank < best_rank: | ||
| best = effect.priority | ||
| best_rank = rank | ||
| return best |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import re | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import timedelta | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Protocol, overload | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -13,6 +14,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.context import RequestContext | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.message import SessionMessage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.session import BaseSession, ProgressFnT, RequestResponder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.topic_patterns import pattern_to_regex | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -42,6 +44,10 @@ async def __call__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> types.ListRootsResult | types.ErrorData: ... # pragma: no branch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class EventHandlerFnT(Protocol): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def __call__(self, params: types.EventParams) -> None: ... # pragma: no branch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class LoggingFnT(Protocol): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def __call__( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -140,7 +146,15 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._message_handler = message_handler or _default_message_handler | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._tool_output_schemas: dict[str, dict[str, Any] | None] = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._server_capabilities: types.ServerCapabilities | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._session_id: str | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._experimental_features: ExperimentalClientFeatures | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_handler: EventHandlerFnT | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_topic_filter: str | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_topic_filter_regex: re.Pattern[str] | None = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscribed_patterns: set[str] = set() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Cache compiled regexes for subscription patterns to avoid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # recompiling on every incoming event. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscription_regex_cache: dict[str, re.Pattern[str]] = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Experimental: Task handlers (use defaults if not provided) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._task_handlers = experimental_task_handlers or ExperimentalTaskHandlers() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -192,6 +206,16 @@ async def initialize(self) -> types.InitializeResult: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._server_capabilities = result.capabilities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # FastMCP servers inject a server-assigned session_id into | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # InitializeResult._meta so clients can synchronously read it after | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # connect (e.g. to subscribe to session-scoped event topics like | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # ``sessions/{session_id}/messages``). Non-FastMCP servers typically | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # omit this, in which case ``self._session_id`` stays ``None``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if result.meta is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| meta_session_id = result.meta.get("session_id") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(meta_session_id, str): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._session_id = meta_session_id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self.send_notification(types.ClientNotification(types.InitializedNotification())) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -203,6 +227,17 @@ def get_server_capabilities(self) -> types.ServerCapabilities | None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self._server_capabilities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def session_id(self) -> str | None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """The server-assigned session ID from InitializeResult._meta, if present. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This is set by FastMCP servers to enable client-side subscription to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| session-scoped event topics like ``sessions/{session_id}/messages``. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns None if the server did not provide a session_id (e.g., | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| non-FastMCP server). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self._session_id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def experimental(self) -> ExperimentalClientFeatures: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Experimental APIs for tasks and other features. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -217,6 +252,114 @@ def experimental(self) -> ExperimentalClientFeatures: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._experimental_features = ExperimentalClientFeatures(self) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self._experimental_features | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # ----- Event methods ----- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def subscribe_events(self, topics: list[str]) -> types.EventSubscribeResult: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send an events/subscribe request.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = await self.send_request( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.ClientRequest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.EventSubscribeRequest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params=types.EventSubscribeParams(topics=topics), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.EventSubscribeResult, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for sub in result.subscribed: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscribed_patterns.add(sub.pattern) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if sub.pattern not in self._subscription_regex_cache: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscription_regex_cache[sub.pattern] = pattern_to_regex(sub.pattern) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def unsubscribe_events(self, topics: list[str]) -> types.EventUnsubscribeResult: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send an events/unsubscribe request.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = await self.send_request( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.ClientRequest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.EventUnsubscribeRequest( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params=types.EventUnsubscribeParams(topics=topics), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.EventUnsubscribeResult, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for pattern in result.unsubscribed: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscribed_patterns.discard(pattern) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscription_regex_cache.pop(pattern, None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def list_events(self) -> types.EventListResult: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send an events/list request.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return await self.send_request( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.ClientRequest(types.EventListRequest()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| types.EventListResult, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def set_event_handler( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| handler: EventHandlerFnT, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| *, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic_filter: str | None = None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Register a callback for incoming event notifications. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If *topic_filter* is provided, it is compiled once here and the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cached regex is reused for every incoming event. The filter uses | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| the same MQTT-style wildcard syntax as subscription patterns | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (``+`` for a single segment, ``#`` as a trailing multi-segment | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| wildcard). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_handler = handler | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_topic_filter = topic_filter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+295
to
+310
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+295
to
+311
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renaming this to
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def on_event(self, topic_filter: str | None = None): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Decorator for registering an event handler.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def decorator(fn: EventHandlerFnT) -> EventHandlerFnT: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.set_event_handler(fn, topic_filter=topic_filter) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fn | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return decorator | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Check if *topic* matches any of our subscribed patterns. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Compiled regexes are cached per subscription pattern so incoming | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| events do not pay a recompile cost on every match attempt. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for pattern in self._subscribed_patterns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| regex = self._subscription_regex_cache.get(pattern) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if regex is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| regex = pattern_to_regex(pattern) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._subscription_regex_cache[pattern] = regex | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if regex.match(topic): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+322
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The MQTT topic matching logic is duplicated here and in |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+322
to
+336
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The MQTT-style topic matching logic in
Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def _handle_event(self, params: types.EventParams) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Dispatch an incoming event to the registered handler. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Filtering order: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 1. If no handler is registered, drop the event. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 2. If the client has any active subscriptions, the topic must | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| match at least one of them. Events for unsubscribed topics | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| are dropped. (A client with zero subscriptions accepts any | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic the server chooses to deliver; this is the "pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| through" fallback documented in ``docs/events.md``.) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 3. If an additional ``topic_filter`` was provided to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ``set_event_handler``, the topic must also match that | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filter. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._event_handler is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior. According to Removing the
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._event_handler(params) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+322
to
+361
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example For example, you could have a function like: # in a new mcp/shared/events_utils.py
import re
def topic_matches_pattern(topic: str, pattern: str) -> bool:
# ... implementation of regex conversion and matching ...This would simplify both
Comment on lines
+352
to
+361
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Awaiting the event handler directly in the reader loop means that a slow or blocking handler will stop the client from receiving any further messages (including responses to pending requests). Consider spawning a background task for each handler execution, though be mindful of event ordering if that is a requirement. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def send_ping(self) -> types.EmptyResult: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Send a ping request.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return await self.send_request( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -611,5 +754,7 @@ async def _received_notification(self, notification: types.ServerNotification) - | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Clients MAY use this to retry requests or update UI | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # The notification contains the elicitationId of the completed elicitation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case types.EventEmitNotification(params=params): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._handle_event(params) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case _: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation only supports a single global event handler. If a user uses the
@on_eventdecorator multiple times, each subsequent call will overwrite the previous handler. This is likely unexpected behavior for a PUB/SUB system. Consider maintaining a list of handlers and their associated filters.