diff --git a/sentry_sdk/integrations/openai_agents/__init__.py b/sentry_sdk/integrations/openai_agents/__init__.py index 0c551fd9bd..9b3a670c2c 100644 --- a/sentry_sdk/integrations/openai_agents/__init__.py +++ b/sentry_sdk/integrations/openai_agents/__init__.py @@ -4,7 +4,7 @@ from functools import wraps from .patches import ( - _create_get_model_wrapper, + _get_model, _get_all_tools, _create_run_wrapper, _create_run_streamed_wrapper, @@ -30,9 +30,10 @@ try: # AgentRunner methods moved in v0.8 # https://github.com/openai/openai-agents-python/commit/3ce7c24d349b77bb750062b7e0e856d9ff48a5d5#diff-7470b3a5c5cbe2fcbb2703dc24f326f45a5819d853be2b1f395d122d278cd911 - from agents.run_internal import run_loop + from agents.run_internal import run_loop, turn_preparation except ImportError: run_loop = None + turn_preparation = None def _patch_runner() -> None: @@ -52,12 +53,6 @@ def _patch_runner() -> None: _patch_agent_run() -def _patch_model() -> None: - agents.run.AgentRunner._get_model = classmethod( - _create_get_model_wrapper(agents.run.AgentRunner._get_model), - ) - - class OpenAIAgentsIntegration(Integration): """ NOTE: With version 0.8.0, the class methods below have been refactored to functions. @@ -73,7 +68,7 @@ class OpenAIAgentsIntegration(Integration): - `Runner.run()` and `Runner.run_streamed()` are thin wrappers for `DEFAULT_AGENT_RUNNER.run()` and `DEFAULT_AGENT_RUNNER.run_streamed()`. - `DEFAULT_AGENT_RUNNER.run()` and `DEFAULT_AGENT_RUNNER.run_streamed()` are patched in `_patch_runner()` with `_create_run_wrapper()` and `_create_run_streamed_wrapper()`, respectively. 3. In a loop, the agent repeatedly calls the Responses API, maintaining a conversation history that includes previous messages and tool results, which is passed to each call. - - A Model instance is created at the start of the loop by calling the `Runner._get_model()`. We patch the Model instance using `_create_get_model_wrapper()` in `_patch_model()`. + - A Model instance is created at the start of the loop by calling the `Runner._get_model()`. We patch the Model instance using `patches._get_model()`. - Available tools are also deteremined at the start of the loop, with `Runner._get_all_tools()`. We patch Tool instances by iterating through the returned tools in `patches._get_all_tools()`. - In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patched_run_single_turn()` and `patched_run_single_turn_streamed()`. 4. On loop termination, `RunImpl.execute_final_output()` is called. The function is patched with `patched_execute_final_output()`. @@ -90,7 +85,6 @@ class OpenAIAgentsIntegration(Integration): @staticmethod def setup_once() -> None: _patch_error_tracing() - _patch_model() _patch_runner() library_version = parse_version(OPENAI_AGENTS_VERSION) @@ -109,6 +103,14 @@ async def new_wrapped_get_all_tools( ) agents.run.get_all_tools = new_wrapped_get_all_tools + + @wraps(turn_preparation.get_model) + def new_wrapped_get_model( + agent: "agents.Agent", run_config: "agents.RunConfig" + ) -> "agents.Model": + return _get_model(turn_preparation.get_model, agent, run_config) + + agents.run_internal.run_loop.get_model = new_wrapped_get_model return original_get_all_tools = AgentRunner._get_all_tools @@ -122,3 +124,13 @@ async def old_wrapped_get_all_tools( return await _get_all_tools(original_get_all_tools, agent, context_wrapper) agents.run.AgentRunner._get_all_tools = classmethod(old_wrapped_get_all_tools) + + original_get_model = AgentRunner._get_model + + @wraps(AgentRunner._get_model.__func__) + def old_wrapped_get_model( + cls: "agents.Runner", agent: "agents.Agent", run_config: "agents.RunConfig" + ) -> "agents.Model": + return _get_model(original_get_model, agent, run_config) + + agents.run.AgentRunner._get_model = classmethod(old_wrapped_get_model) diff --git a/sentry_sdk/integrations/openai_agents/patches/__init__.py b/sentry_sdk/integrations/openai_agents/patches/__init__.py index ab3948bdc1..fe06200793 100644 --- a/sentry_sdk/integrations/openai_agents/patches/__init__.py +++ b/sentry_sdk/integrations/openai_agents/patches/__init__.py @@ -1,4 +1,4 @@ -from .models import _create_get_model_wrapper # noqa: F401 +from .models import _get_model # noqa: F401 from .tools import _get_all_tools # noqa: F401 from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 from .agent_run import _patch_agent_run # noqa: F401 diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index 5d4d71185f..6b5dceef97 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -66,141 +66,128 @@ def _inject_trace_propagation_headers( headers[key] = value -def _create_get_model_wrapper( - original_get_model: "Callable[..., Any]", -) -> "Callable[..., Any]": +def _get_model( + original_get_model: "Callable[..., agents.Model]", + agent: "agents.Agent", + run_config: "agents.RunConfig", +) -> "agents.Model": """ - Wraps the agents.Runner._get_model method to wrap the get_response method of the model to create a AI client span. - Responsible for - creating and managing AI client spans. - adding trace propagation headers to tools with type HostedMCPTool. - setting the response model on agent invocation spans. """ + # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) + # because we only patch its direct methods, all underlying data can remain unchanged. + model = copy.copy(original_get_model(agent, run_config)) + + # Capture the request model name for spans (agent.model can be None when using defaults) + request_model_name = model.model if hasattr(model, "model") else str(model) + agent._sentry_request_model = request_model_name + + # Wrap _fetch_response if it exists (for OpenAI models) to capture response model + if hasattr(model, "_fetch_response"): + original_fetch_response = model._fetch_response + + @wraps(original_fetch_response) + async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": + response = await original_fetch_response(*args, **kwargs) + if hasattr(response, "model") and response.model: + agent._sentry_response_model = str(response.model) + return response + + model._fetch_response = wrapped_fetch_response + + original_get_response = model.get_response + + @wraps(original_get_response) + async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": + mcp_tools = kwargs.get("tools") + hosted_tools = [] + if mcp_tools is not None: + hosted_tools = [ + tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) + ] + + with ai_client_span(agent, kwargs) as span: + for hosted_tool in hosted_tools: + _inject_trace_propagation_headers(hosted_tool, span=span) + + result = await original_get_response(*args, **kwargs) + + # Get response model captured from _fetch_response and clean up + response_model = getattr(agent, "_sentry_response_model", None) + if response_model: + delattr(agent, "_sentry_response_model") + + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, result, response_model, agent) + + return result + + model.get_response = wrapped_get_response + + # Also wrap stream_response for streaming support + if hasattr(model, "stream_response"): + original_stream_response = model.stream_response + + @wraps(original_stream_response) + async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": + # Uses explicit try/finally instead of context manager to ensure cleanup + # even if the consumer abandons the stream (GeneratorExit). + span_kwargs = dict(kwargs) + if len(args) > 0: + span_kwargs["system_instructions"] = args[0] + if len(args) > 1: + span_kwargs["input"] = args[1] - @wraps( - original_get_model.__func__ - if hasattr(original_get_model, "__func__") - else original_get_model - ) - def wrapped_get_model( - cls: "agents.Runner", agent: "agents.Agent", run_config: "agents.RunConfig" - ) -> "agents.Model": - # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) - # because we only patch its direct methods, all underlying data can remain unchanged. - model = copy.copy(original_get_model(agent, run_config)) - - # Capture the request model name for spans (agent.model can be None when using defaults) - request_model_name = model.model if hasattr(model, "model") else str(model) - agent._sentry_request_model = request_model_name - - # Wrap _fetch_response if it exists (for OpenAI models) to capture response model - if hasattr(model, "_fetch_response"): - original_fetch_response = model._fetch_response - - @wraps(original_fetch_response) - async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": - response = await original_fetch_response(*args, **kwargs) - if hasattr(response, "model") and response.model: - agent._sentry_response_model = str(response.model) - return response - - model._fetch_response = wrapped_fetch_response - - original_get_response = model.get_response - - @wraps(original_get_response) - async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": - mcp_tools = kwargs.get("tools") hosted_tools = [] - if mcp_tools is not None: - hosted_tools = [ - tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) - ] + if len(args) > 3: + mcp_tools = args[3] + + if mcp_tools is not None: + hosted_tools = [ + tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) + ] - with ai_client_span(agent, kwargs) as span: + with ai_client_span(agent, span_kwargs) as span: for hosted_tool in hosted_tools: _inject_trace_propagation_headers(hosted_tool, span=span) - result = await original_get_response(*args, **kwargs) - - # Get response model captured from _fetch_response and clean up - response_model = getattr(agent, "_sentry_response_model", None) - if response_model: - delattr(agent, "_sentry_response_model") - - _set_response_model_on_agent_span(agent, response_model) - update_ai_client_span(span, result, response_model, agent) - - return result - - model.get_response = wrapped_get_response - - # Also wrap stream_response for streaming support - if hasattr(model, "stream_response"): - original_stream_response = model.stream_response - - @wraps(original_stream_response) - async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": - # Uses explicit try/finally instead of context manager to ensure cleanup - # even if the consumer abandons the stream (GeneratorExit). - span_kwargs = dict(kwargs) - if len(args) > 0: - span_kwargs["system_instructions"] = args[0] - if len(args) > 1: - span_kwargs["input"] = args[1] - - hosted_tools = [] - if len(args) > 3: - mcp_tools = args[3] - - if mcp_tools is not None: - hosted_tools = [ - tool - for tool in mcp_tools - if isinstance(tool, HostedMCPTool) - ] - - with ai_client_span(agent, span_kwargs) as span: - for hosted_tool in hosted_tools: - _inject_trace_propagation_headers(hosted_tool, span=span) - - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - - streaming_response = None - ttft_recorded = False - # Capture start time locally to avoid race conditions with concurrent requests - start_time = time.perf_counter() - - async for event in original_stream_response(*args, **kwargs): - # Detect first content token (text delta event) - if not ttft_recorded and hasattr(event, "delta"): - ttft = time.perf_counter() - start_time - span.set_data( - SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - ttft_recorded = True - - # Capture the full response from ResponseCompletedEvent - if hasattr(event, "response"): - streaming_response = event.response - yield event - - # Update span with response data (usage, output, model) - if streaming_response: - response_model = ( - str(streaming_response.model) - if hasattr(streaming_response, "model") - and streaming_response.model - else None - ) - _set_response_model_on_agent_span(agent, response_model) - update_ai_client_span( - span, streaming_response, response_model, agent - ) - - model.stream_response = wrapped_stream_response + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - return model + streaming_response = None + ttft_recorded = False + # Capture start time locally to avoid race conditions with concurrent requests + start_time = time.perf_counter() - return wrapped_get_model + async for event in original_stream_response(*args, **kwargs): + # Detect first content token (text delta event) + if not ttft_recorded and hasattr(event, "delta"): + ttft = time.perf_counter() - start_time + span.set_data( + SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft + ) + ttft_recorded = True + + # Capture the full response from ResponseCompletedEvent + if hasattr(event, "response"): + streaming_response = event.response + yield event + + # Update span with response data (usage, output, model) + if streaming_response: + response_model = ( + str(streaming_response.model) + if hasattr(streaming_response, "model") + and streaming_response.model + else None + ) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span( + span, streaming_response, response_model, agent + ) + + model.stream_response = wrapped_stream_response + + return model