Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 90 additions & 4 deletions src/mcp/client/stdio.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import os
import subprocess
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Literal, TextIO
from typing import Callable, Literal, TextIO

import anyio
import anyio.lowlevel
Expand All @@ -24,6 +25,32 @@

logger = logging.getLogger(__name__)


def _is_jupyter_environment() -> bool:
"""Detect if code is running in a Jupyter notebook environment.

In Jupyter environments, sys.stderr doesn't work as expected when passed
to subprocess, so we need to handle stderr differently.

Returns:
bool: True if running in Jupyter/IPython notebook environment
"""
try:
# Check for IPython kernel
from IPython import get_ipython

ipython = get_ipython()
if ipython is not None:
# Check if it's a notebook kernel (not just IPython terminal)
if "IPKernelApp" in ipython.config:
return True
# Also check for ZMQInteractiveShell which indicates notebook
if ipython.__class__.__name__ == "ZMQInteractiveShell":
return True
except (ImportError, AttributeError):
pass
return False

# Environment variables to inherit by default
DEFAULT_INHERITED_ENV_VARS = (
[
Expand Down Expand Up @@ -105,6 +132,12 @@ class StdioServerParameters(BaseModel):
async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):
"""Client transport for stdio: this will connect to a server by spawning a
process and communicating with it over stdin/stdout.

Args:
server: Parameters for the server process
errlog: TextIO stream for stderr output. In Jupyter environments,
stderr is captured and printed to work around Jupyter's
limitations with subprocess stderr handling.
"""
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
Expand All @@ -115,16 +148,20 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

# Detect Jupyter environment for stderr handling
is_jupyter = _is_jupyter_environment()

try:
command = _get_executable_command(server.command)

# Open process with stderr piped for capture
# Open process with stderr handling based on environment
process = await _create_platform_compatible_process(
command=command,
args=server.args,
env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()),
errlog=errlog,
cwd=server.cwd,
capture_stderr=is_jupyter,
)
except OSError:
# Clean up streams if process creation fails
Expand Down Expand Up @@ -177,9 +214,41 @@ async def stdin_writer():
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()

async def stderr_reader():
"""Read stderr from the process and output it appropriately.

In Jupyter environments, stderr is captured as a pipe and printed
to make it visible in the notebook output. In normal environments,
stderr is passed directly to sys.stderr.

See: https://github.com/modelcontextprotocol/python-sdk/issues/156
"""
if not process.stderr:
return

try:
async for chunk in TextReceiveStream(
process.stderr,
encoding=server.encoding,
errors=server.encoding_error_handler,
):
# In Jupyter, print to stdout with red color for visibility
# In normal environments, write to the provided errlog
if is_jupyter:
# Use ANSI red color for stderr in Jupyter
print(f"\033[91m{chunk}\033[0m", end="", flush=True)
else:
errlog.write(chunk)
errlog.flush()
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()

async with anyio.create_task_group() as tg, process:
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
# Only start stderr reader if we're capturing stderr (Jupyter mode)
if is_jupyter and process.stderr:
tg.start_soon(stderr_reader)
try:
yield read_stream, write_stream
finally:
Expand Down Expand Up @@ -232,19 +301,36 @@ async def _create_platform_compatible_process(
env: dict[str, str] | None = None,
errlog: TextIO = sys.stderr,
cwd: Path | str | None = None,
capture_stderr: bool = False,
):
"""Creates a subprocess in a platform-compatible way.

Unix: Creates process in a new session/process group for killpg support
Windows: Creates process in a Job Object for reliable child termination

Args:
command: The executable command to run
args: Command line arguments
env: Environment variables for the process
errlog: TextIO stream for stderr (used when capture_stderr=False)
cwd: Working directory for the process
capture_stderr: If True, stderr is captured as a pipe for async reading.
This is needed for Jupyter environments where passing
sys.stderr directly doesn't work properly.

Returns:
Process with stdin, stdout, and optionally stderr streams
"""
# Determine stderr handling: PIPE for capture, or redirect to errlog
stderr_target = subprocess.PIPE if capture_stderr else errlog

if sys.platform == "win32": # pragma: no cover
process = await create_windows_process(command, args, env, errlog, cwd)
process = await create_windows_process(command, args, env, stderr_target, cwd)
else: # pragma: lax no cover
process = await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
stderr=stderr_target,
cwd=cwd,
start_new_session=True,
)
Expand Down
21 changes: 13 additions & 8 deletions src/mcp/os/win32/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
import sys
from pathlib import Path
from typing import BinaryIO, TextIO, cast
from typing import BinaryIO, TextIO, Union, cast

import anyio
from anyio import to_thread
Expand Down Expand Up @@ -66,7 +66,7 @@ class FallbackProcess:
"""A fallback process wrapper for Windows to handle async I/O
when using subprocess.Popen, which provides sync-only FileIO objects.

This wraps stdin and stdout into async-compatible
This wraps stdin, stdout, and optionally stderr into async-compatible
streams (FileReadStream, FileWriteStream),
so that MCP clients expecting async streams can work properly.
"""
Expand All @@ -75,10 +75,12 @@ def __init__(self, popen_obj: subprocess.Popen[bytes]):
self.popen: subprocess.Popen[bytes] = popen_obj
self.stdin_raw = popen_obj.stdin # type: ignore[assignment]
self.stdout_raw = popen_obj.stdout # type: ignore[assignment]
self.stderr = popen_obj.stderr # type: ignore[assignment]
self.stderr_raw = popen_obj.stderr # type: ignore[assignment]

self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
# Wrap stderr as async stream if it was captured as PIPE
self.stderr = FileReadStream(cast(BinaryIO, self.stderr_raw)) if self.stderr_raw else None

async def __aenter__(self):
"""Support async context manager entry."""
Expand All @@ -99,12 +101,14 @@ async def __aexit__(
await self.stdin.aclose()
if self.stdout:
await self.stdout.aclose()
if self.stderr:
await self.stderr.aclose()
if self.stdin_raw:
self.stdin_raw.close()
if self.stdout_raw:
self.stdout_raw.close()
if self.stderr:
self.stderr.close()
if self.stderr_raw:
self.stderr_raw.close()

async def wait(self):
"""Async wait for process completion."""
Expand Down Expand Up @@ -133,7 +137,7 @@ async def create_windows_process(
command: str,
args: list[str],
env: dict[str, str] | None = None,
errlog: TextIO | None = sys.stderr,
errlog: Union[TextIO, int, None] = sys.stderr,
cwd: Path | str | None = None,
) -> Process | FallbackProcess:
"""Creates a subprocess in a Windows-compatible way with Job Object support.
Expand All @@ -150,7 +154,8 @@ async def create_windows_process(
command (str): The executable to run
args (list[str]): List of command line arguments
env (dict[str, str] | None): Environment variables
errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr)
errlog: Where to send stderr output. Can be a TextIO stream (like sys.stderr),
subprocess.PIPE (-1) for capturing stderr, or None.
cwd (Path | str | None): Working directory for the subprocess

Returns:
Expand Down Expand Up @@ -191,7 +196,7 @@ async def _create_windows_fallback_process(
command: str,
args: list[str],
env: dict[str, str] | None = None,
errlog: TextIO | None = sys.stderr,
errlog: Union[TextIO, int, None] = sys.stderr,
cwd: Path | str | None = None,
) -> FallbackProcess:
"""Create a subprocess using subprocess.Popen as a fallback when anyio fails.
Expand Down
79 changes: 79 additions & 0 deletions tests/client/test_stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from mcp.client.stdio import (
StdioServerParameters,
_create_platform_compatible_process,
_is_jupyter_environment,
_terminate_process_tree,
stdio_client,
)
Expand Down Expand Up @@ -620,3 +621,81 @@ def sigterm_handler(signum, frame):
f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. "
f"Expected between 2-4 seconds (2s stdin timeout + termination time)."
)


class TestJupyterStderrSupport:
"""Tests for Jupyter notebook stderr logging support.

See: https://github.com/modelcontextprotocol/python-sdk/issues/156
"""

def test_jupyter_detection_not_in_jupyter(self):
"""Test that _is_jupyter_environment returns False when not in Jupyter."""
# In a normal Python environment (like pytest), this should return False
result = _is_jupyter_environment()
assert result is False, "Should not detect Jupyter in normal Python environment"

def test_jupyter_detection_handles_missing_ipython(self):
"""Test that _is_jupyter_environment handles missing IPython gracefully."""
# This test verifies the ImportError handling works
# by calling the function when IPython may or may not be installed
result = _is_jupyter_environment()
# Should return False (not crash) regardless of IPython availability
assert isinstance(result, bool)

@pytest.mark.anyio
async def test_stderr_captured_in_process(self):
"""Test that stderr output from a subprocess can be captured."""
# Create a script that writes to stderr
script = textwrap.dedent(
'''
import sys
sys.stderr.write("test error message\\n")
sys.stderr.flush()
# Exit immediately
sys.exit(0)
'''
)

server_params = StdioServerParameters(
command=sys.executable,
args=["-c", script],
)

# The stdio_client should handle this without hanging
with anyio.move_on_after(3.0) as cancel_scope:
async with stdio_client(server_params) as (read_stream, write_stream):
await anyio.sleep(0.5) # Give process time to write and exit

assert not cancel_scope.cancelled_caught, "stdio_client should not hang on stderr output"

@pytest.mark.anyio
async def test_stderr_with_continuous_output(self):
"""Test that continuous stderr output doesn't block the client."""
# Create a script that writes to stderr continuously then exits
script = textwrap.dedent(
'''
import sys
import time

for i in range(5):
sys.stderr.write(f"stderr line {i}\\n")
sys.stderr.flush()
time.sleep(0.1)

# Exit after writing
sys.exit(0)
'''
)

server_params = StdioServerParameters(
command=sys.executable,
args=["-c", script],
)

# The client should handle continuous stderr without blocking
with anyio.move_on_after(5.0) as cancel_scope:
async with stdio_client(server_params) as (read_stream, write_stream):
await anyio.sleep(1.0) # Wait for stderr output

assert not cancel_scope.cancelled_caught, "stdio_client should handle continuous stderr output"
Loading