-
Notifications
You must be signed in to change notification settings - Fork 583
fix(openai-agents): Patch models functions following library refactor #5449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
02af5df
a7f08cc
3679c96
0047eb3
722bf00
b9e9d6b
0a2ede8
e2987b8
0970319
168e405
48aff66
70873f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -66,141 +66,128 @@ def _inject_trace_propagation_headers( | |||||
| headers[key] = value | ||||||
|
|
||||||
|
|
||||||
| def _create_get_model_wrapper( | ||||||
| original_get_model: "Callable[..., Any]", | ||||||
| ) -> "Callable[..., Any]": | ||||||
| def _get_model( | ||||||
| original_get_model: "Callable[..., agents.Model]", | ||||||
| agent: "agents.Agent", | ||||||
| run_config: "agents.RunConfig", | ||||||
| ) -> "agents.Model": | ||||||
| """ | ||||||
| Wraps the agents.Runner._get_model method to wrap the get_response method of the model to create a AI client span. | ||||||
|
|
||||||
| Responsible for | ||||||
| - creating and managing AI client spans. | ||||||
| - adding trace propagation headers to tools with type HostedMCPTool. | ||||||
| - setting the response model on agent invocation spans. | ||||||
| """ | ||||||
| # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) | ||||||
| # because we only patch its direct methods, all underlying data can remain unchanged. | ||||||
| model = copy.copy(original_get_model(agent, run_config)) | ||||||
|
|
||||||
| # Capture the request model name for spans (agent.model can be None when using defaults) | ||||||
| request_model_name = model.model if hasattr(model, "model") else str(model) | ||||||
| agent._sentry_request_model = request_model_name | ||||||
|
|
||||||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture response model | ||||||
| if hasattr(model, "_fetch_response"): | ||||||
| original_fetch_response = model._fetch_response | ||||||
|
|
||||||
| @wraps(original_fetch_response) | ||||||
| async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| response = await original_fetch_response(*args, **kwargs) | ||||||
| if hasattr(response, "model") and response.model: | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: I think an alternative approach that you could take here in order to make this conditional more concise (by removing the
Suggested change
This would also make things consistent with what you have on line 119 within the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to rethink this response model business because we probably shouldn't be setting response models on agent spans either. This was put in before I knew as much about agents, but there are libraries that let you vary the request model during agent execution as well (in particular Claude Code wrapped by the Claude Agent SDK). Created an issue to track #5458 (and the plan would be to clean up as part of that issue). |
||||||
| agent._sentry_response_model = str(response.model) | ||||||
| return response | ||||||
|
|
||||||
| model._fetch_response = wrapped_fetch_response | ||||||
|
|
||||||
| original_get_response = model.get_response | ||||||
|
|
||||||
| @wraps(original_get_response) | ||||||
| async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| mcp_tools = kwargs.get("tools") | ||||||
| hosted_tools = [] | ||||||
| if mcp_tools is not None: | ||||||
| hosted_tools = [ | ||||||
| tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) | ||||||
| ] | ||||||
|
|
||||||
| with ai_client_span(agent, kwargs) as span: | ||||||
| for hosted_tool in hosted_tools: | ||||||
| _inject_trace_propagation_headers(hosted_tool, span=span) | ||||||
|
|
||||||
| result = await original_get_response(*args, **kwargs) | ||||||
|
|
||||||
| # Get response model captured from _fetch_response and clean up | ||||||
| response_model = getattr(agent, "_sentry_response_model", None) | ||||||
| if response_model: | ||||||
| delattr(agent, "_sentry_response_model") | ||||||
|
|
||||||
| _set_response_model_on_agent_span(agent, response_model) | ||||||
| update_ai_client_span(span, result, response_model, agent) | ||||||
|
|
||||||
| return result | ||||||
|
|
||||||
| model.get_response = wrapped_get_response | ||||||
|
|
||||||
| # Also wrap stream_response for streaming support | ||||||
| if hasattr(model, "stream_response"): | ||||||
| original_stream_response = model.stream_response | ||||||
|
|
||||||
| @wraps(original_stream_response) | ||||||
| async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| # Uses explicit try/finally instead of context manager to ensure cleanup | ||||||
| # even if the consumer abandons the stream (GeneratorExit). | ||||||
|
Comment on lines
+136
to
+137
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment looks like it needs to be updated or removed as there's no try/finally below and the code uses a context manager 😅
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're completely right, and it doesn't look like we catch I'll keep this commit atomic since we're moving the contents of |
||||||
| span_kwargs = dict(kwargs) | ||||||
| if len(args) > 0: | ||||||
| span_kwargs["system_instructions"] = args[0] | ||||||
| if len(args) > 1: | ||||||
| span_kwargs["input"] = args[1] | ||||||
|
|
||||||
| @wraps( | ||||||
| original_get_model.__func__ | ||||||
| if hasattr(original_get_model, "__func__") | ||||||
| else original_get_model | ||||||
| ) | ||||||
| def wrapped_get_model( | ||||||
| cls: "agents.Runner", agent: "agents.Agent", run_config: "agents.RunConfig" | ||||||
| ) -> "agents.Model": | ||||||
| # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) | ||||||
| # because we only patch its direct methods, all underlying data can remain unchanged. | ||||||
| model = copy.copy(original_get_model(agent, run_config)) | ||||||
|
|
||||||
| # Capture the request model name for spans (agent.model can be None when using defaults) | ||||||
| request_model_name = model.model if hasattr(model, "model") else str(model) | ||||||
| agent._sentry_request_model = request_model_name | ||||||
|
|
||||||
| # Wrap _fetch_response if it exists (for OpenAI models) to capture response model | ||||||
| if hasattr(model, "_fetch_response"): | ||||||
| original_fetch_response = model._fetch_response | ||||||
|
|
||||||
| @wraps(original_fetch_response) | ||||||
| async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| response = await original_fetch_response(*args, **kwargs) | ||||||
| if hasattr(response, "model") and response.model: | ||||||
| agent._sentry_response_model = str(response.model) | ||||||
| return response | ||||||
|
|
||||||
| model._fetch_response = wrapped_fetch_response | ||||||
|
|
||||||
| original_get_response = model.get_response | ||||||
|
|
||||||
| @wraps(original_get_response) | ||||||
| async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| mcp_tools = kwargs.get("tools") | ||||||
| hosted_tools = [] | ||||||
| if mcp_tools is not None: | ||||||
| hosted_tools = [ | ||||||
| tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) | ||||||
| ] | ||||||
| if len(args) > 3: | ||||||
| mcp_tools = args[3] | ||||||
|
|
||||||
| if mcp_tools is not None: | ||||||
| hosted_tools = [ | ||||||
| tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) | ||||||
| ] | ||||||
|
|
||||||
| with ai_client_span(agent, kwargs) as span: | ||||||
| with ai_client_span(agent, span_kwargs) as span: | ||||||
| for hosted_tool in hosted_tools: | ||||||
| _inject_trace_propagation_headers(hosted_tool, span=span) | ||||||
|
|
||||||
| result = await original_get_response(*args, **kwargs) | ||||||
|
|
||||||
| # Get response model captured from _fetch_response and clean up | ||||||
| response_model = getattr(agent, "_sentry_response_model", None) | ||||||
| if response_model: | ||||||
| delattr(agent, "_sentry_response_model") | ||||||
|
|
||||||
| _set_response_model_on_agent_span(agent, response_model) | ||||||
| update_ai_client_span(span, result, response_model, agent) | ||||||
|
|
||||||
| return result | ||||||
|
|
||||||
| model.get_response = wrapped_get_response | ||||||
|
|
||||||
| # Also wrap stream_response for streaming support | ||||||
| if hasattr(model, "stream_response"): | ||||||
| original_stream_response = model.stream_response | ||||||
|
|
||||||
| @wraps(original_stream_response) | ||||||
| async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": | ||||||
| # Uses explicit try/finally instead of context manager to ensure cleanup | ||||||
| # even if the consumer abandons the stream (GeneratorExit). | ||||||
| span_kwargs = dict(kwargs) | ||||||
| if len(args) > 0: | ||||||
| span_kwargs["system_instructions"] = args[0] | ||||||
| if len(args) > 1: | ||||||
| span_kwargs["input"] = args[1] | ||||||
|
|
||||||
| hosted_tools = [] | ||||||
| if len(args) > 3: | ||||||
| mcp_tools = args[3] | ||||||
|
|
||||||
| if mcp_tools is not None: | ||||||
| hosted_tools = [ | ||||||
| tool | ||||||
| for tool in mcp_tools | ||||||
| if isinstance(tool, HostedMCPTool) | ||||||
| ] | ||||||
|
|
||||||
| with ai_client_span(agent, span_kwargs) as span: | ||||||
| for hosted_tool in hosted_tools: | ||||||
| _inject_trace_propagation_headers(hosted_tool, span=span) | ||||||
|
|
||||||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) | ||||||
|
|
||||||
| streaming_response = None | ||||||
| ttft_recorded = False | ||||||
| # Capture start time locally to avoid race conditions with concurrent requests | ||||||
| start_time = time.perf_counter() | ||||||
|
|
||||||
| async for event in original_stream_response(*args, **kwargs): | ||||||
| # Detect first content token (text delta event) | ||||||
| if not ttft_recorded and hasattr(event, "delta"): | ||||||
| ttft = time.perf_counter() - start_time | ||||||
| span.set_data( | ||||||
| SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft | ||||||
| ) | ||||||
| ttft_recorded = True | ||||||
|
|
||||||
| # Capture the full response from ResponseCompletedEvent | ||||||
| if hasattr(event, "response"): | ||||||
| streaming_response = event.response | ||||||
| yield event | ||||||
|
|
||||||
| # Update span with response data (usage, output, model) | ||||||
| if streaming_response: | ||||||
| response_model = ( | ||||||
| str(streaming_response.model) | ||||||
| if hasattr(streaming_response, "model") | ||||||
| and streaming_response.model | ||||||
| else None | ||||||
| ) | ||||||
| _set_response_model_on_agent_span(agent, response_model) | ||||||
| update_ai_client_span( | ||||||
| span, streaming_response, response_model, agent | ||||||
| ) | ||||||
|
|
||||||
| model.stream_response = wrapped_stream_response | ||||||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) | ||||||
|
|
||||||
| return model | ||||||
| streaming_response = None | ||||||
| ttft_recorded = False | ||||||
| # Capture start time locally to avoid race conditions with concurrent requests | ||||||
| start_time = time.perf_counter() | ||||||
|
|
||||||
| return wrapped_get_model | ||||||
| async for event in original_stream_response(*args, **kwargs): | ||||||
| # Detect first content token (text delta event) | ||||||
| if not ttft_recorded and hasattr(event, "delta"): | ||||||
| ttft = time.perf_counter() - start_time | ||||||
| span.set_data( | ||||||
| SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft | ||||||
| ) | ||||||
| ttft_recorded = True | ||||||
|
|
||||||
| # Capture the full response from ResponseCompletedEvent | ||||||
| if hasattr(event, "response"): | ||||||
| streaming_response = event.response | ||||||
| yield event | ||||||
|
|
||||||
| # Update span with response data (usage, output, model) | ||||||
| if streaming_response: | ||||||
| response_model = ( | ||||||
| str(streaming_response.model) | ||||||
| if hasattr(streaming_response, "model") | ||||||
| and streaming_response.model | ||||||
|
Comment on lines
+182
to
+183
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also a nitpick but similar to the comment I left above |
||||||
| else None | ||||||
| ) | ||||||
| _set_response_model_on_agent_span(agent, response_model) | ||||||
| update_ai_client_span( | ||||||
| span, streaming_response, response_model, agent | ||||||
| ) | ||||||
|
|
||||||
| model.stream_response = wrapped_stream_response | ||||||
|
|
||||||
| return model | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Model patch applied to wrong module
Medium Severity
In the v0.8+ branch, the wrapper is built from
turn_preparation.get_modelbut assigned toagents.run_internal.run_loop.get_model. This leavesturn_preparation.get_modelunpatched, so the new model path can bypass_get_modelinstrumentation inpatches/models.py.