Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
_get_model,
_get_all_tools,
_run_single_turn,
_run_single_turn_streamed,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
Expand Down Expand Up @@ -78,7 +79,7 @@ class OpenAIAgentsIntegration(Integration):
3. In a loop, the agent repeatedly calls the Responses API, maintaining a conversation history that includes previous messages and tool results, which is passed to each call.
- A Model instance is created at the start of the loop by calling the `Runner._get_model()`. We patch the Model instance using `patches._get_model()`.
- Available tools are also deteremined at the start of the loop, with `Runner._get_all_tools()`. We patch Tool instances by iterating through the returned tools in `patches._get_all_tools()`.
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patches._run_single_turn()` and `patched_run_single_turn_streamed()`.
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patches._run_single_turn()` and `patches._run_single_turn_streamed()`.
4. On loop termination, `RunImpl.execute_final_output()` is called. The function is patched with `patched_execute_final_output()`.

Local tools are run based on the return value from the Responses API as a post-API call step in the above loop.
Expand Down Expand Up @@ -128,6 +129,16 @@ async def new_wrapped_run_single_turn(

agents.run.run_single_turn = new_wrapped_run_single_turn

@wraps(run_loop.run_single_turn_streamed)
async def new_wrapped_run_single_turn_streamed(
*args: "Any", **kwargs: "Any"
) -> "SingleStepResult":
return await _run_single_turn_streamed(
run_loop.run_single_turn_streamed, *args, **kwargs
)

agents.run.run_single_turn_streamed = new_wrapped_run_single_turn_streamed

return

original_get_all_tools = AgentRunner._get_all_tools
Expand Down Expand Up @@ -163,3 +174,17 @@ async def old_wrapped_run_single_turn(
agents.run.AgentRunner._run_single_turn = classmethod(
old_wrapped_run_single_turn
)

original_run_single_turn_streamed = AgentRunner._run_single_turn_streamed

@wraps(AgentRunner._run_single_turn_streamed.__func__)
async def old_wrapped_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "SingleStepResult":
return await _run_single_turn_streamed(
original_run_single_turn_streamed, *args, **kwargs
)

agents.run.AgentRunner._run_single_turn_streamed = classmethod(
old_wrapped_run_single_turn_streamed
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _get_model # noqa: F401
from .tools import _get_all_tools # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _run_single_turn, _patch_agent_run # noqa: F401
from .agent_run import _run_single_turn, _run_single_turn_streamed, _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
122 changes: 57 additions & 65 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,70 @@ async def _run_single_turn(
return result


async def _run_single_turn_streamed(
original_run_single_turn_streamed: "Callable[..., Awaitable[SingleStepResult]]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched _run_single_turn_streamed that
- creates agent invocation spans for streaming if there is no already active agent invocation span.
- ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks", False)
)

span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)

try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
_close_streaming_workflow_span(agent)
reraise(*exc_info)

return result


def _patch_agent_run() -> None:
"""
Patches AgentRunner methods to create agent invocation spans.
This directly patches the execution flow to track when agents start and stop.
"""

# Store original methods
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

Expand Down Expand Up @@ -193,71 +249,7 @@ async def patched_execute_final_output(

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Patched _run_single_turn_streamed that
- creates agent invocation spans for streaming if there is no already active agent invocation span.
- ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5]
if len(args) > 5
else kwargs.get("should_run_agent_start_hooks", False)
)

span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)

try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
exc_info = sys.exc_info()
with capture_internal_exceptions():
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
_close_streaming_workflow_span(agent)
reraise(*exc_info)

return result

# Apply patches
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
Loading