fix: graceful stream shutdown on exceptions in streaming actions#680
fix: graceful stream shutdown on exceptions in streaming actions#680andreahlert wants to merge 2 commits intoapache:mainfrom
Conversation
When a streaming action catches an exception and yields a final state in a try/except/finally block, the stream now completes gracefully instead of propagating the exception and killing the connection. If the generator yields a valid state_update before the exception propagates, the exception is suppressed and the stream terminates normally. If no state was yielded, the exception propagates as before. Closes apache#581
|
@skrawcz @kajocina This PR fixes the broken stream issue reported in #581. The generator consumption loops now catch exceptions gracefully when the action has already yielded a final state (e.g. in a |
|
pre-commit issue |
skrawcz
left a comment
There was a problem hiding this comment.
Good fix for a real issue! The try/except approach in the single-step functions is clean — checking state_update is None correctly distinguishes between "generator crashed before completing" and "generator yielded a final state in its finally block."
Two items to address:
-
caught_excshould be logged in the multi-step functions — it's currently assigned (caught_exc = e) but never read afterward. Please log it (e.g.,logger.warning(...)) so that exceptions aren't silently swallowed. Users need visibility into what went wrong even when the stream completes gracefully. -
Tests needed. The reproduction case from #581 translates directly into a test. Please add tests covering:
- Streaming action with
try/except/finallythat yields state update infinally→ completes gracefully - Streaming action that raises without yielding state → exception still propagates
- Both single-step and multi-step variants
- Streaming action with
Also worth discussing: the multi-step guard (if result is None: raise) is more permissive than the single-step guard (if state_update is None: raise). In multi-step, if the generator crashes after yielding some items but before producing the "final" result, the reducer silently runs on partial data. Is this the intended behavior?
| action=action.name, | ||
| app_id=app_id, | ||
| partition_key=partition_key, | ||
| sequence_id=sequence_id, |
There was a problem hiding this comment.
Worth discussing: when the generator crashes after yielding some items but before its "final" result, this guard (if result is None) means the reducer runs on whatever partial item was last received. In contrast, the single-step guard (if state_update is None) verifies the generator actually fulfilled its contract. Should this also log a warning when the exception is caught with a non-None result, so users know the reducer ran on potentially incomplete data?
There was a problem hiding this comment.
Added a warning that explicitly says "the reducer will run on potentially partial data" when the exception is caught with a non-None result. This gives users full visibility without changing the current behavior.
…aceful shutdown - Add logger.warning with exc_info in all 4 streaming except blocks - Remove dead caught_exc variable in multi-step functions - Fix count increment asymmetry between sync and async single-step - Add 8 tests covering graceful exception handling and propagation
All three points addressed. Added logger.warning(...) with exc_info=True in all 4 functions (not just multi-step). Removed the dead caught_exc variable entirely. Added 8 tests covering graceful shutdown and exception propagation for both single-step and multi-step, sync and async. On the multi-step guard discussion: kept if result is None: raise but the warning now explicitly tells the user the reducer is running on potentially partial data, so there's full visibility without changing the current behavior. Also ran additional review passes (on an isolated environment - thanks to Claude for that) that caught a few extra issues beyond what was flagged: added logging in the single-step functions too (not just multi-step), fixed a count increment asymmetry between sync and async single-step variants, and fixed test fixtures where stream_run wasn't a proper generator function (missing yield after raise). IMHO, now we're ready to merge. But I think worth take an extra look. |
Summary
_run_single_step_streaming_action,_arun_single_step_streaming_action,_run_multi_step_streaming_action, and_arun_multi_step_streaming_actionwithtry/exceptfinallyblock), the stream now completes gracefully instead of propagating the exception and killing the connectionCloses #581
Test plan
curl: (18) transfer closed with outstanding read data remainingandValueErrorstack trace on servertest_application.py(all passing)test_application.py+test_action.pysuite: 175 tests, zero failures