Skip to content

fix: graceful stream shutdown on exceptions in streaming actions#680

Open
andreahlert wants to merge 2 commits intoapache:mainfrom
andreahlert:issue-581-repro
Open

fix: graceful stream shutdown on exceptions in streaming actions#680
andreahlert wants to merge 2 commits intoapache:mainfrom
andreahlert:issue-581-repro

Conversation

@andreahlert
Copy link
Collaborator

Summary

  • Wraps the generator consumption loop in _run_single_step_streaming_action, _arun_single_step_streaming_action, _run_multi_step_streaming_action, and _arun_multi_step_streaming_action with try/except
  • When a streaming action catches an exception and yields a final state update (e.g. in a finally block), the stream now completes gracefully instead of propagating the exception and killing the connection
  • If no state was yielded before the exception, the original behavior is preserved (exception propagates)

Closes #581

Test plan

  • Reproduced the bug using the exact snippet from broken stream when handling raised exceptions #581 in a Docker container (FastAPI + uvicorn). Confirmed curl: (18) transfer closed with outstanding read data remaining and ValueError stack trace on server
  • Applied fix and verified stream completes with exit code 0, server logs clean (no exceptions)
  • Ran all 29 streaming-related tests in test_application.py (all passing)
  • Ran full test_application.py + test_action.py suite: 175 tests, zero failures

When a streaming action catches an exception and yields a final state
in a try/except/finally block, the stream now completes gracefully
instead of propagating the exception and killing the connection.

If the generator yields a valid state_update before the exception
propagates, the exception is suppressed and the stream terminates
normally. If no state was yielded, the exception propagates as before.

Closes apache#581
@andreahlert
Copy link
Collaborator Author

@skrawcz @kajocina This PR fixes the broken stream issue reported in #581. The generator consumption loops now catch exceptions gracefully when the action has already yielded a final state (e.g. in a finally block), instead of letting the exception kill the connection. All existing streaming tests pass. Would appreciate a review when you get a chance.

@skrawcz
Copy link
Contributor

skrawcz commented Mar 21, 2026

pre-commit issue

Copy link
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix for a real issue! The try/except approach in the single-step functions is clean — checking state_update is None correctly distinguishes between "generator crashed before completing" and "generator yielded a final state in its finally block."

Two items to address:

  1. caught_exc should be logged in the multi-step functions — it's currently assigned (caught_exc = e) but never read afterward. Please log it (e.g., logger.warning(...)) so that exceptions aren't silently swallowed. Users need visibility into what went wrong even when the stream completes gracefully.

  2. Tests needed. The reproduction case from #581 translates directly into a test. Please add tests covering:

    • Streaming action with try/except/finally that yields state update in finally → completes gracefully
    • Streaming action that raises without yielding state → exception still propagates
    • Both single-step and multi-step variants

Also worth discussing: the multi-step guard (if result is None: raise) is more permissive than the single-step guard (if state_update is None: raise). In multi-step, if the generator crashes after yielding some items but before producing the "final" result, the reducer silently runs on partial data. Is this the intended behavior?

action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth discussing: when the generator crashes after yielding some items but before its "final" result, this guard (if result is None) means the reducer runs on whatever partial item was last received. In contrast, the single-step guard (if state_update is None) verifies the generator actually fulfilled its contract. Should this also log a warning when the exception is caught with a non-None result, so users know the reducer ran on potentially incomplete data?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a warning that explicitly says "the reducer will run on potentially partial data" when the exception is caught with a non-None result. This gives users full visibility without changing the current behavior.

…aceful shutdown

- Add logger.warning with exc_info in all 4 streaming except blocks
- Remove dead caught_exc variable in multi-step functions
- Fix count increment asymmetry between sync and async single-step
- Add 8 tests covering graceful exception handling and propagation
@andreahlert
Copy link
Collaborator Author

Good fix for a real issue! The try/except approach in the single-step functions is clean — checking state_update is None correctly distinguishes between "generator crashed before completing" and "generator yielded a final state in its finally block."

Two items to address:

  1. caught_exc should be logged in the multi-step functions — it's currently assigned (caught_exc = e) but never read afterward. Please log it (e.g., logger.warning(...)) so that exceptions aren't silently swallowed. Users need visibility into what went wrong even when the stream completes gracefully.

  2. Tests needed. The reproduction case from broken stream when handling raised exceptions #581 translates directly into a test. Please add tests covering:

    • Streaming action with try/except/finally that yields state update in finally → completes gracefully
    • Streaming action that raises without yielding state → exception still propagates
    • Both single-step and multi-step variants

Also worth discussing: the multi-step guard (if result is None: raise) is more permissive than the single-step guard (if state_update is None: raise). In multi-step, if the generator crashes after yielding some items but before producing the "final" result, the reducer silently runs on partial data. Is this the intended behavior?

All three points addressed. Added logger.warning(...) with exc_info=True in all 4 functions (not just multi-step). Removed the dead caught_exc variable entirely.

Added 8 tests covering graceful shutdown and exception propagation for both single-step and multi-step, sync and async.

On the multi-step guard discussion: kept if result is None: raise but the warning now explicitly tells the user the reducer is running on potentially partial data, so there's full visibility without changing the current behavior.

Also ran additional review passes (on an isolated environment - thanks to Claude for that) that caught a few extra issues beyond what was flagged: added logging in the single-step functions too (not just multi-step), fixed a count increment asymmetry between sync and async single-step variants, and fixed test fixtures where stream_run wasn't a proper generator function (missing yield after raise).

IMHO, now we're ready to merge. But I think worth take an extra look.

@andreahlert andreahlert requested a review from skrawcz March 24, 2026 12:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

broken stream when handling raised exceptions

2 participants