-
Notifications
You must be signed in to change notification settings - Fork 165
Description
Problem
Temporal activities that read/write files on local disk can't scale to multiple worker instances. Disk is local to each machine, so there's no shared state between workers.
The workaround is manually downloading from and uploading to object storage (S3/GCS) at the start and end of every activity. This is repetitive boilerplate that every team reimplements, and it's easy to get wrong (missing cleanup, uploading on error, etc.).
Other orchestrators have recognized this gap:
- Flyte has
FlyteDirectory: automatic upload/download of directories between tasks - Argo has artifact passing: declarative tarballing between steps
Temporal has no equivalent.
Proposal
A contrib.workdir module that provides a Workspace: a local directory backed by remote storage. Pull before execution, push after.
from temporalio.contrib.workdir import Workspace
@activity.defn
async def process(input: ProcessInput) -> ProcessOutput:
async with Workspace("gs://bucket/state/job-123") as ws:
# ws.path is a local Path, read/write files normally
config = json.loads((ws.path / "config.json").read_text())
(ws.path / "result.csv").write_text(compute(config))
# Clean exit: packed and uploaded
# Exception: no upload, remote state unchangedA thin Temporal decorator resolves template variables from activity.info():
from temporalio.contrib.workdir import workspace, get_workspace_path
@workspace("gs://bucket/{workflow_id}/{activity_type}")
@activity.defn
async def process(input: ProcessInput) -> ProcessOutput:
ws = get_workspace_path()
...Storage backend auto-detected from URL scheme via fsspec (GCS, S3, Azure, local, memory).
Questions
- Does a generic utility like this fit the
contribmodel? Existing modules are integrations with external services. - Is
fsspecacceptable as an optional dependency? - Happy to submit a PR if there's interest. Wanted to check alignment first.