From 8b725da7af56cbb803782f9dad21818f7b13c3bd Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 9 Mar 2026 16:41:50 -0700 Subject: [PATCH 1/5] Invert loop and try catch. Exception can rarely occur outside the catch --- tests/worker/test_workflow.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index deedae964..f91a85881 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -7665,38 +7665,38 @@ async def test_workflow_missing_local_activity_no_activities(client: Client): async def heartbeat_activity( catch_err: bool = True, ) -> temporalio.activity.ActivityCancellationDetails | None: - while True: - try: + try: + while True: activity.heartbeat() # If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause. if activity.info().heartbeat_details: return activity.cancellation_details() await asyncio.sleep(0.1) - except (CancelledError, asyncio.CancelledError) as err: - if not catch_err: - raise err - return activity.cancellation_details() - finally: - activity.heartbeat("finally-complete") + except (CancelledError, asyncio.CancelledError) as err: + if not catch_err: + raise err + return activity.cancellation_details() + finally: + activity.heartbeat("finally-complete") @activity.defn def sync_heartbeat_activity( catch_err: bool = True, ) -> temporalio.activity.ActivityCancellationDetails | None: - while True: - try: + try: + while True: activity.heartbeat() # If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause. if activity.info().heartbeat_details: return activity.cancellation_details() time.sleep(0.1) - except (CancelledError, asyncio.CancelledError) as err: - if not catch_err: - raise err - return activity.cancellation_details() - finally: - activity.heartbeat("finally-complete") + except (CancelledError, asyncio.CancelledError) as err: + if not catch_err: + raise err + return activity.cancellation_details() + finally: + activity.heartbeat("finally-complete") @workflow.defn @@ -7709,6 +7709,7 @@ async def run( result.append( await workflow.execute_activity( sync_heartbeat_activity, + True, activity_id=activity_id, start_to_close_timeout=timedelta(seconds=10), heartbeat_timeout=timedelta(seconds=2), @@ -7718,6 +7719,7 @@ async def run( result.append( await workflow.execute_activity( heartbeat_activity, + True, activity_id=f"{activity_id}-2", start_to_close_timeout=timedelta(seconds=10), heartbeat_timeout=timedelta(seconds=2), @@ -8288,6 +8290,7 @@ async def test_previous_run_failure(client: Client): task_queue=worker.task_queue, retry_policy=RetryPolicy( initial_interval=timedelta(milliseconds=10), + maximum_attempts=2, ), ) result = await handle.result() From 9b5bbe87dd6c5d8ca6a49757ccffa9322a466889 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 10 Mar 2026 08:27:19 -0700 Subject: [PATCH 2/5] Add sleep before validating list activities --- tests/test_activity.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_activity.py b/tests/test_activity.py index 0d9bfecad..a356340e8 100644 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -506,7 +506,7 @@ async def test_list_activities(client: Client, env: WorkflowEnvironment): task_queue=task_queue, start_to_close_timeout=timedelta(seconds=5), ) - + await asyncio.sleep(1) executions = [ e async for e in client.list_activities(f'ActivityId = "{activity_id}"') ] From 86e48c39827c75d367626cacfd57c47c823ccfac Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 19 Mar 2026 11:53:47 -0700 Subject: [PATCH 3/5] Add RPC cancelled retries to assert_eventually --- tests/helpers/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index 1d2e886cc..b783b9003 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -71,6 +71,7 @@ async def assert_eventually( *, timeout: timedelta = timedelta(seconds=10), interval: timedelta = timedelta(milliseconds=200), + retry_on_rpc_cancelled: bool = True, ) -> T: start_sec = time.monotonic() while True: @@ -80,6 +81,11 @@ async def assert_eventually( except AssertionError: if timedelta(seconds=time.monotonic() - start_sec) >= timeout: raise + except RPCError as e: + if retry_on_rpc_cancelled and e.status == RPCStatusCode.CANCELLED: + continue + else: + raise await asyncio.sleep(interval.total_seconds()) From fe87ae5d2278e59b8336ec16ba104adecefb7d9a Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 19 Mar 2026 16:49:45 -0700 Subject: [PATCH 4/5] Switch from sleep to assert_eventually --- tests/test_activity.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/tests/test_activity.py b/tests/test_activity.py index a356340e8..cb171f734 100644 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -28,7 +28,7 @@ from temporalio.service import RPCError, RPCStatusCode from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -from tests.helpers import assert_eq_eventually +from tests.helpers import assert_eq_eventually, assert_eventually @activity.defn @@ -506,17 +506,19 @@ async def test_list_activities(client: Client, env: WorkflowEnvironment): task_queue=task_queue, start_to_close_timeout=timedelta(seconds=5), ) - await asyncio.sleep(1) - executions = [ - e async for e in client.list_activities(f'ActivityId = "{activity_id}"') - ] - assert len(executions) == 1 - execution = executions[0] - assert execution.activity_id == activity_id - assert execution.activity_type == "increment" - assert execution.task_queue == task_queue - assert execution.status == ActivityExecutionStatus.RUNNING - assert execution.state_transition_count is None # Not set until activity completes + async def check_executions(): + executions = [ + e async for e in client.list_activities(f'ActivityId = "{activity_id}"') + ] + assert len(executions) == 1 + execution = executions[0] + assert execution.activity_id == activity_id + assert execution.activity_type == "increment" + assert execution.task_queue == task_queue + assert execution.status == ActivityExecutionStatus.RUNNING + assert execution.state_transition_count is None # Not set until activity completes + + await assert_eventually(check_executions) async def test_count_activities(client: Client, env: WorkflowEnvironment): From f3add6893dcf62083d93a1dbca0d2d0e326e8850 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 19 Mar 2026 16:52:42 -0700 Subject: [PATCH 5/5] Format --- tests/test_activity.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_activity.py b/tests/test_activity.py index cb171f734..8e851c8b1 100644 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -506,6 +506,7 @@ async def test_list_activities(client: Client, env: WorkflowEnvironment): task_queue=task_queue, start_to_close_timeout=timedelta(seconds=5), ) + async def check_executions(): executions = [ e async for e in client.list_activities(f'ActivityId = "{activity_id}"') @@ -516,7 +517,9 @@ async def check_executions(): assert execution.activity_type == "increment" assert execution.task_queue == task_queue assert execution.status == ActivityExecutionStatus.RUNNING - assert execution.state_transition_count is None # Not set until activity completes + assert ( + execution.state_transition_count is None + ) # Not set until activity completes await assert_eventually(check_executions)