Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def assert_eventually(
*,
timeout: timedelta = timedelta(seconds=10),
interval: timedelta = timedelta(milliseconds=200),
retry_on_rpc_cancelled: bool = True,
) -> T:
start_sec = time.monotonic()
while True:
Expand All @@ -80,6 +81,11 @@ async def assert_eventually(
except AssertionError:
if timedelta(seconds=time.monotonic() - start_sec) >= timeout:
raise
except RPCError as e:
if retry_on_rpc_cancelled and e.status == RPCStatusCode.CANCELLED:
continue
else:
raise
await asyncio.sleep(interval.total_seconds())


Expand Down
27 changes: 16 additions & 11 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from temporalio.service import RPCError, RPCStatusCode
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from tests.helpers import assert_eq_eventually
from tests.helpers import assert_eq_eventually, assert_eventually


@activity.defn
Expand Down Expand Up @@ -507,16 +507,21 @@ async def test_list_activities(client: Client, env: WorkflowEnvironment):
start_to_close_timeout=timedelta(seconds=5),
)

executions = [
e async for e in client.list_activities(f'ActivityId = "{activity_id}"')
]
assert len(executions) == 1
execution = executions[0]
assert execution.activity_id == activity_id
assert execution.activity_type == "increment"
assert execution.task_queue == task_queue
assert execution.status == ActivityExecutionStatus.RUNNING
assert execution.state_transition_count is None # Not set until activity completes
async def check_executions():
executions = [
e async for e in client.list_activities(f'ActivityId = "{activity_id}"')
]
assert len(executions) == 1
execution = executions[0]
assert execution.activity_id == activity_id
assert execution.activity_type == "increment"
assert execution.task_queue == task_queue
assert execution.status == ActivityExecutionStatus.RUNNING
assert (
execution.state_transition_count is None
) # Not set until activity completes

await assert_eventually(check_executions)


async def test_count_activities(client: Client, env: WorkflowEnvironment):
Expand Down
35 changes: 19 additions & 16 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7725,38 +7725,38 @@ async def test_workflow_missing_local_activity_no_activities(client: Client):
async def heartbeat_activity(
catch_err: bool = True,
) -> temporalio.activity.ActivityCancellationDetails | None:
while True:
try:
try:
while True:
activity.heartbeat()
# If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause.
if activity.info().heartbeat_details:
return activity.cancellation_details()
await asyncio.sleep(0.1)
except (CancelledError, asyncio.CancelledError) as err:
if not catch_err:
raise err
return activity.cancellation_details()
finally:
activity.heartbeat("finally-complete")
except (CancelledError, asyncio.CancelledError) as err:
if not catch_err:
raise err
return activity.cancellation_details()
finally:
activity.heartbeat("finally-complete")


@activity.defn
def sync_heartbeat_activity(
catch_err: bool = True,
) -> temporalio.activity.ActivityCancellationDetails | None:
while True:
try:
try:
while True:
activity.heartbeat()
# If we have heartbeat details, we are on the second attempt, we have retried due to pause/unpause.
if activity.info().heartbeat_details:
return activity.cancellation_details()
time.sleep(0.1)
except (CancelledError, asyncio.CancelledError) as err:
if not catch_err:
raise err
return activity.cancellation_details()
finally:
activity.heartbeat("finally-complete")
except (CancelledError, asyncio.CancelledError) as err:
if not catch_err:
raise err
return activity.cancellation_details()
finally:
activity.heartbeat("finally-complete")


@workflow.defn
Expand All @@ -7769,6 +7769,7 @@ async def run(
result.append(
await workflow.execute_activity(
sync_heartbeat_activity,
True,
activity_id=activity_id,
start_to_close_timeout=timedelta(seconds=10),
heartbeat_timeout=timedelta(seconds=2),
Expand All @@ -7778,6 +7779,7 @@ async def run(
result.append(
await workflow.execute_activity(
heartbeat_activity,
True,
activity_id=f"{activity_id}-2",
start_to_close_timeout=timedelta(seconds=10),
heartbeat_timeout=timedelta(seconds=2),
Expand Down Expand Up @@ -8348,6 +8350,7 @@ async def test_previous_run_failure(client: Client):
task_queue=worker.task_queue,
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=10),
maximum_attempts=2,
),
)
result = await handle.result()
Expand Down
Loading