A pure-Python async pub/sub queue library that turns any S3-compatible object store into a lightweight message queue — no AWS account required, no message broker to deploy.
Works with Hetzner Object Storage, MinIO, Cloudflare R2, Backblaze B2, and AWS S3.
- pawn-queue
import asyncio
from pawn_queue import PawnQueue
async def main():
async with await PawnQueue.from_config("config.yaml") as pawnqueue:
# 1. Register a topic (folder in S3)
await pawnqueue.create_topic("orders")
# 2. Publish messages
producer = await pawnqueue.register_producer("order-service")
await producer.publish("orders", {"order_id": 42, "item": "widget"})
# 3. Consume — pull mode
consumer = await pawnqueue.register_consumer("billing", topics=["orders"])
messages = await consumer.poll()
for msg in messages:
print(msg.payload) # {"order_id": 42, "item": "widget"}
await msg.ack() # removes from queue
asyncio.run(main())# Push mode — runs until cancelled
async def handler(msg):
print(msg.payload)
await msg.ack()
await consumer.listen(handler)| Concept | S3 representation |
|---|---|
| Topic | A "folder" prefix {topic}/, marked by {topic}/.topic |
| Message | {topic}/messages/{timestamp}-{uuid}.json |
| Claim / Lease | {topic}/leases/{msg-uuid}.lease |
| Dead Letter | {topic}/dead-letter/{msg-uuid}.json |
| Producer registration | _registry/producers/{producer-id}.json |
| Consumer registration | _registry/consumers/{consumer-id}.json |
- Message keys are prefixed with a UTC timestamp so
list_objects_v2returns them in FIFO order natively. - Every producer and consumer gets a stable UUIDv4 at registration time, persisted in S3. Re-registering with the same name is idempotent — the same ID is returned.
- Concurrency is resolved entirely through S3 primitives: no Redis, no ZooKeeper, no sidecar process.
{bucket}/
├── _registry/
│ ├── producers/{producer-uuid}.json
│ └── consumers/{consumer-uuid}.json
│
├── {topic}/
│ ├── .topic ← marker: topic exists
│ ├── messages/{ts}-{uuid}.json ← pending messages (FIFO by key)
│ ├── leases/{uuid}.lease ← active claim records
│ └── dead-letter/{uuid}.json ← nacked / failed messages
│
└── _probe/{uuid}.probe ← temporary: compat probe only
graph TB
subgraph App["Your Application"]
P1[Producer A]
P2[Producer B]
C1[Consumer X]
C2[Consumer Y]
C3[Consumer Z]
end
subgraph PawnQueue["pawn-queue Library"]
REG[Registry]
PROD[Producer]
CONS[Consumer]
LEASE[LeaseManager]
CFG[Config\nYAML + env vars]
JANITOR[Janitor\nbackground task]
end
subgraph S3["S3-Compatible Bucket (Hetzner / MinIO / R2 / AWS)"]
direction TB
subgraph REGDIR["_registry/"]
PREG["producers/{id}.json"]
CREG["consumers/{id}.json"]
end
subgraph TOPIC["orders/ (topic)"]
MSG["messages/{ts}-{uuid}.json"]
LEASEDIR["leases/{uuid}.lease"]
DLQ["dead-letter/{uuid}.json"]
end
end
CFG -->|configures| PawnQueue
P1 & P2 --> PROD
C1 & C2 & C3 --> CONS
PROD -->|register + publish| REG
CONS -->|register + poll/listen| REG
REG -->|write| PREG & CREG
PROD -->|PUT message JSON| MSG
CONS -->|list → claim lease| LEASE
LEASE -->|PUT lease| LEASEDIR
LEASE -->|DELETE message + lease on ack| MSG
LEASE -->|COPY to dead-letter on nack| DLQ
JANITOR -->|DELETE expired leases| LEASEDIR
JANITOR -.->|restores visibility| MSG
sequenceDiagram
actor App
participant PawnQueue
participant S3 as S3 Bucket
Note over App,S3: Bootstrap
App->>PawnQueue: PawnQueue.from_config("config.yaml")
App->>PawnQueue: await pawnqueue.setup()
PawnQueue->>S3: HEAD bucket (verify access)
PawnQueue->>S3: PUT _probe/{uuid} twice → detect conditional write support
S3-->>PawnQueue: strategy resolved: csprng_verify (Hetzner) or conditional_write (AWS/R2)
PawnQueue-->>App: ready
Note over App,S3: Topic Registration
App->>PawnQueue: create_topic("orders")
PawnQueue->>S3: PUT orders/.topic
S3-->>PawnQueue: 200 OK
Note over App,S3: Producer Registration
App->>PawnQueue: register_producer("order-svc")
PawnQueue->>S3: list _registry/producers/ (idempotency check)
PawnQueue->>S3: PUT _registry/producers/{uuid}.json
S3-->>PawnQueue: 200 OK
PawnQueue-->>App: Producer(id=uuid)
Note over App,S3: Publish Message
App->>PawnQueue: producer.publish("orders", payload)
PawnQueue->>S3: PUT orders/messages/{timestamp}-{uuid}.json
S3-->>PawnQueue: 200 OK + ETag
PawnQueue-->>App: message_id
Note over App,S3: Consumer Registration
App->>PawnQueue: register_consumer("billing", topics=["orders"])
PawnQueue->>S3: PUT _registry/consumers/{uuid}.json
S3-->>PawnQueue: 200 OK
PawnQueue-->>App: Consumer(id=uuid)
Note over App,S3: Poll & Claim
App->>PawnQueue: consumer.poll()
PawnQueue->>S3: DELETE expired leases (janitor pass)
PawnQueue->>S3: list_objects_v2(prefix="orders/messages/")
S3-->>PawnQueue: [{ts}-{uuid}.json, ...]
PawnQueue->>S3: GET orders/messages/{ts}-{uuid}.json
S3-->>PawnQueue: message JSON
PawnQueue->>S3: lease acquire (see Lease Acquisition below)
S3-->>PawnQueue: lease confirmed
PawnQueue-->>App: [Message(id, payload, ack(), nack())]
Note over App,S3: Acknowledge
App->>PawnQueue: await msg.ack()
PawnQueue->>S3: DELETE orders/messages/{uuid}.json
PawnQueue->>S3: DELETE orders/leases/{uuid}.lease
S3-->>PawnQueue: 204 No Content (×2)
Note over App,S3: Nack → Dead Letter
App->>PawnQueue: await msg.nack()
PawnQueue->>S3: COPY message → orders/dead-letter/{uuid}.json
PawnQueue->>S3: DELETE orders/messages/{uuid}.json
PawnQueue->>S3: DELETE orders/leases/{uuid}.lease
S3-->>PawnQueue: 204 No Content
sequenceDiagram
participant C as Consumer
participant S3 as S3 Bucket
alt conditional_write (AWS S3 / Cloudflare R2)
C->>S3: PUT leases/{uuid}.lease [If-None-Match: *]
alt Nobody else claimed first
S3-->>C: 201 Created — LEASE GRANTED
else Another consumer beat us
S3-->>C: 412 PreconditionFailed — BACK OFF
end
else csprng_verify (Hetzner / Ceph / MinIO)
Note over C,S3: Step 1: Write (last writer wins)
C->>S3: PUT leases/{uuid}.lease
S3-->>C: 200 OK
Note over C,S3: Step 2: Jitter (let all PUTs reach S3)
C->>C: sleep(random 100–400 ms)
Note over C,S3: Step 3: Verify (check if our ETag survived)
C->>S3: HEAD leases/{uuid}.lease
S3-->>C: ETag: {stored_md5}
alt our ETag matches all rounds
Note over C,S3: Step 4: Post-verify (guard against late writes)
C->>C: sleep(jitter_min_ms)
C->>S3: HEAD leases/{uuid}.lease
S3-->>C: ETag: {stored_md5}
alt ETag still ours
C-->>C: LEASE GRANTED ✓
else ETag changed
C-->>C: BACK OFF — lost to late write
end
else our ETag doesn't match
C-->>C: BACK OFF — competitor won
end
end
Why csprng_verify is safe:
Two consumers always write different bytes because each generates a unique 256-bit CSPRNG nonce (collision probability ≈ 2⁻²⁵⁶). S3 last-write-wins semantics mean exactly one set of bytes survives. Because Hetzner provides read-after-write consistency, the HEAD response always reflects the surviving write.
The late-write edge case (and how it is closed): In steps 1–3, a consumer whose PUT arrives at S3 after a competitor has already passed all verify rounds can silently overwrite the lease — giving two consumers the impression that their own bytes survived. Step 4 (post-verify confirmation) closes this window: after all retry-HEADs succeed, the winning consumer sleeps for jitter_min_ms (≥ network one-way latency) and then performs one final HEAD. Any late write that arrived at S3 during the step-3 verification window will now be visible, and the losing consumer backs off.
sequenceDiagram
participant ConsumerX as Consumer X (crashed)
participant ConsumerY as Consumer Y (healthy)
participant S3 as S3 Bucket
Note over ConsumerX,S3: Consumer X claims a message then crashes
ConsumerX->>S3: PUT leases/{uuid}.lease (expires_at = now + 30s)
S3-->>ConsumerX: 200 OK
ConsumerX-xConsumerX: 💥 crash — no ack, lease never refreshed
Note over ConsumerY,S3: Consumer Y starts next poll — janitor runs first
ConsumerY->>S3: list_objects_v2(prefix="orders/leases/")
S3-->>ConsumerY: [{uuid}.lease, ...]
ConsumerY->>S3: GET orders/leases/{uuid}.lease
S3-->>ConsumerY: {expires_at: "<past>"}
ConsumerY->>ConsumerY: expires_at < now → stale lease
ConsumerY->>S3: DELETE orders/leases/{uuid}.lease
S3-->>ConsumerY: 204 No Content
Note over ConsumerY,S3: Message is now visible again — Consumer Y claims it
ConsumerY->>S3: list_objects_v2(prefix="orders/messages/")
S3-->>ConsumerY: [{ts}-{uuid}.json]
ConsumerY->>S3: lease acquire (csprng_verify or conditional_write)
S3-->>ConsumerY: LEASE GRANTED
ConsumerY->>ConsumerY: process message …
ConsumerY->>S3: ack → DELETE message + lease
Lease TTL is controlled by polling.visibility_timeout_seconds (default: 30 s). Active consumers renew their lease every polling.lease_refresh_interval_seconds (default: 10 s) via the background _lease_refresher task.
graph LR
subgraph asyncio_Event_Loop["asyncio Event Loop"]
LISTEN["consumer.listen(handler)"]
POLL["_poll_loop()<br/>polls at interval"]
REFRESH["_lease_refresher()<br/>refresh leases"]
HANDLER["handler(msg)<br/>user code"]
end
LISTEN -->|gather| POLL
LISTEN -->|gather| REFRESH
POLL -->|each message| HANDLER
HANDLER -->|ack or nack| S3[(S3 Bucket)]
REFRESH -->|renews| S3
LISTEN -->|cancel on exit| POLL
LISTEN -->|cancel on exit| REFRESH
listen() runs until the task is cancelled (e.g., Ctrl-C). If the handler raises an exception, nack() is called automatically to move the message to dead-letter.
Uses the If-None-Match: * request header on PutObject.
- Backend support: AWS S3 (≥ Aug 2024), Cloudflare R2. Not Hetzner/Ceph.
- Guarantee: Atomic. Exactly one writer succeeds; all others receive
412 PreconditionFailed. - No jitter needed.
A cryptographic compare-and-swap protocol using S3 ETags. See the Lease Acquisition diagram.
| Property | Details |
|---|---|
| Nonce entropy | 256 bits — secrets.token_bytes(32) |
| Collision probability | ≈ 2⁻²⁵⁶ (negligible) |
| Required S3 properties | Last-write-wins + read-after-write consistency |
| Works on Hetzner? | ✅ Yes — Ceph RGW provides both |
| Duplicate risk | Closed by post-verify confirmation (Step 4) |
4-step algorithm:
- Write — PUT the lease object (unique nonce bytes). S3 last-write-wins; the local winner is unknown.
- Jitter — Sleep a random interval (
jitter_min_ms–jitter_max_ms). This window must exceed the S3 network one-way latency so all concurrent PUT requests have time to arrive and be applied at S3 before verification begins. - Verify — HEAD the lease key
verify_retries + 1times, checking that the stored ETag matchesMD5(our_bytes). Any mismatch means a competitor's write survived — back off immediately. - Post-verify confirmation — Sleep
jitter_min_msonce more, then perform a final HEAD. This closes the late-write window: a competitor whose PUT was still in-flight during step 3 has now had time to reach S3. If the ETag changed, back off.
The late-write edge case: Without Step 4, a slow-network PUT from consumer B could arrive at S3 after consumer A completed all step-3 verifications. B then also passes its own verifications, giving both A and B the impression that they hold the lease — resulting in a duplicate delivery. Step 4 catches exactly this scenario.
Tuning jitter_min_ms: Set it to at least 2× the expected one-way network latency to S3. The default (100 ms) is appropriate for most European data-centre deployments. Reduce with caution on low-latency networks.
When concurrency.strategy = auto (the default), the library runs a probe at startup:
- Writes a probe object to
_probe/{uuid}.probe. - Attempts a second write of the same key with
If-None-Match: *. - If the second write fails with
412→conditional_writeselected. - If the second write succeeds silently →
csprng_verifyselected. - Probe object is deleted.
This selection is logged at INFO level and stored for the session lifetime.
Copy config.example.yaml and fill in your credentials:
s3:
endpoint_url: "https://fsn1.your-objectstorage.com" # Hetzner FSN1
bucket_name: "my-queue-bucket"
aws_access_key_id: "..."
aws_secret_access_key: "..."
region_name: "eu-central-1"
use_ssl: true
polling:
interval_seconds: 5 # how often consumer.poll() loops in listen() mode
max_messages_per_poll: 10 # max messages returned per poll() call
visibility_timeout_seconds: 30 # lease TTL — message reappears if not acked within this time
lease_refresh_interval_seconds: 10 # background task renews lease at this interval
jitter_max_ms: 200 # unused in csprng_verify (has its own jitter config)
concurrency:
strategy: "auto" # auto | conditional_write | csprng_verify
csprng_verify:
jitter_min_ms: 100 # minimum write→verify sleep
jitter_max_ms: 400 # maximum write→verify sleep
verify_retries: 2 # number of additional HEAD re-checks
verify_retry_delay_ms: 150 # sleep between re-checks
registry:
heartbeat_interval_seconds: 60 # how often last_seen is updated in _registry/All settings can be overridden via environment variables (S3, polling, concurrency, registry):
| Setting | Env var |
|---|---|
| S3 | |
| S3 endpoint | PAWNQUEUE_S3_ENDPOINT_URL |
| Bucket name | PAWNQUEUE_S3_BUCKET_NAME |
| Access key | PAWNQUEUE_S3_ACCESS_KEY |
| Secret key | PAWNQUEUE_S3_SECRET_KEY |
| Region | PAWNQUEUE_S3_REGION |
| Use SSL | PAWNQUEUE_S3_USE_SSL |
| Polling | |
| Poll interval | PAWNQUEUE_POLLING_INTERVAL_SECONDS |
| Max per poll | PAWNQUEUE_POLLING_MAX_MESSAGES_PER_POLL |
| Visibility timeout | PAWNQUEUE_POLLING_VISIBILITY_TIMEOUT |
| Lease refresh interval | PAWNQUEUE_POLLING_LEASE_REFRESH_INTERVAL |
| Jitter max | PAWNQUEUE_POLLING_JITTER_MAX_MS |
| Concurrency | |
| Strategy | PAWNQUEUE_CONCURRENCY_STRATEGY |
| csprng jitter min | PAWNQUEUE_CSPRNG_JITTER_MIN_MS |
| csprng jitter max | PAWNQUEUE_CSPRNG_JITTER_MAX_MS |
| csprng verify retries | PAWNQUEUE_CSPRNG_VERIFY_RETRIES |
| csprng retry delay | PAWNQUEUE_CSPRNG_RETRY_DELAY_MS |
| Registry | |
| Heartbeat interval | PAWNQUEUE_REGISTRY_HEARTBEAT_INTERVAL |
Use PawnQueueBuilder for dynamic configuration or testing:
from pawn_queue import PawnQueueBuilder
pawnqueue = await (
PawnQueueBuilder()
.s3(
endpoint_url="http://localhost:9000",
bucket_name="my-queue",
access_key="minioadmin",
secret_key="minioadmin",
)
.polling(interval_seconds=2, max_messages_per_poll=20)
.concurrency(strategy="conditional_write")
.build()
)
async with pawnqueue:
# use pawnqueue
...Programmatically set some values, use env vars for others:
from pawn_queue import PawnQueueBuilder
pawnqueue = await (
PawnQueueBuilder()
.s3(
endpoint_url="http://localhost:9000",
bucket_name="my-queue",
access_key="minioadmin",
secret_key="minioadmin",
)
.from_env() # fill remaining values from PAWNQUEUE_* env vars
.build()
)Full configuration from environment variables:
from pawn_queue import PawnQueueBuilder
# All config comes from PAWNQUEUE_* env vars
pawnqueue = await PawnQueueBuilder().from_env().build()# Docker environment
export PAWNQUEUE_S3_ENDPOINT_URL=http://minio:9000
export PAWNQUEUE_S3_BUCKET_NAME=queues
export PAWNQUEUE_S3_ACCESS_KEY=minioadmin
export PAWNQUEUE_S3_SECRET_KEY=minioadmin
export PAWNQUEUE_POLLING_INTERVAL_SECONDS=1
export PAWNQUEUE_CONCURRENCY_STRATEGY=csprng_verify# Create from YAML file, dict, or PawnQueueConfig object
pawnqueue = await PawnQueue.from_config("config.yaml")
# Explicit lifecycle
await pawnqueue.setup() # opens S3 connection, runs compat probe
await pawnqueue.teardown() # closes connection
# Context manager (recommended)
async with await PawnQueue.from_config("config.yaml") as pawnqueue:
...
# Topics
await pawnqueue.create_topic("orders") # idempotent
topics: list[str] = await pawnqueue.list_topics()
# Producers & consumers
producer = await pawnqueue.register_producer("my-service") # idempotent by name
consumer = await pawnqueue.register_consumer("my-worker", topics=["orders", "events"])producer.id # stable UUIDv4
producer.name # registration name
msg_id: str = await producer.publish("orders", {"key": "value"})Raises TopicNotFoundError if the topic has not been registered. Raises PublishError on S3 failure.
consumer.id # stable UUIDv4
consumer.name # registration name
# Pull mode
messages: list[Message] = await consumer.poll()
# Push mode (blocks until cancelled)
async def handler(msg: Message) -> None:
await msg.ack()
await consumer.listen(handler)msg.id # str — UUIDv4
msg.topic # str
msg.producer_id # str — the producer's UUID
msg.created_at # str — ISO-8601 UTC timestamp
msg.payload # dict — user-provided JSON
await msg.ack() # deletes message + lease from S3
await msg.nack() # copies to dead-letter, deletes message + leaseCalling ack() or nack() more than once is safe (subsequent calls are no-ops).
pip install pawn-queueRuntime dependencies: aioboto3 >= 13, pydantic >= 2, pyyaml >= 6
Python: 3.11+
Uses moto's ThreadedMotoServer — no real S3 account needed.
pip install -e ".[dev]" # also installs flask, flask-cors for moto server
pytest tests/ -vSet credentials via environment variables or copy e2e/config.template.yaml to e2e/config.yaml:
export PAWNQUEUE_S3_ENDPOINT_URL=https://fsn1.your-objectstorage.com
export PAWNQUEUE_S3_BUCKET_NAME=your-existing-bucket
export PAWNQUEUE_S3_ACCESS_KEY=your-access-key
export PAWNQUEUE_S3_SECRET_KEY=your-secret-key
export PAWNQUEUE_S3_REGION=eu-central-1
pip install -e ".[e2e]"
pytest e2e/ -v --timeout=120Isolation: E2E tests write under a unique pawnqueue-e2e-{uuid}/ prefix per run and delete all created objects on teardown. Your existing bucket data is never touched.
E2E test coverage:
| File | What it tests |
|---|---|
test_e2e_compat_probe.py |
Strategy auto-detection; 10-way concurrent csprng_verify — exactly 1 winner |
test_e2e_topics.py |
Create, list, idempotency, ghost-topic error |
test_e2e_produce_consume.py |
Publish → poll → ack/nack/listen lifecycle end-to-end |
test_e2e_concurrency.py |
20 msgs × 3 concurrent consumers — no duplicates, parametrized over both strategies |
test_e2e_registry.py |
6 registrations, UUID uniqueness, idempotent re-registration |
test_e2e_dead_letter.py |
nack → dead-letter payload fidelity; mix of ack + nack |
-
Create a PyPI account at pypi.org if you don't have one.
-
Generate an API token:
- Go to pypi.org/manage/account/
- Scroll to "API tokens" section
- Click "Create token"
- Scope: "Entire account" (or project-specific if you prefer)
- Copy the token (you won't see it again)
-
Add the token to your GitHub repository:
- Go to your GitHub repo → Settings → Secrets and variables → Actions
- Click "New repository secret"
- Name:
PYPI_API_TOKEN - Secret: Paste your PyPI API token
- Click "Add secret"
-
(Optional) For TestPyPI (testing before production):
- Create an account at test.pypi.org
- Generate an API token there
- Add it as
TEST_PYPI_API_TOKENGitHub secret
The easiest method — the workflow automatically triggers on release.
-
Update version in
pyproject.toml:[project] name = "pawn-queue" version = "0.2.0" # Increment version
-
Create a GitHub Release:
- Go to your GitHub repo → Releases → "Create a new release"
- Tag version:
v0.2.0(must matchpyproject.tomlversion, prefixed withv) - Release title:
Release 0.2.0 - Description: Summarize changes
- Click "Publish release"
The workflow will:
- ✅ Verify version matches tag
- ✅ Run unit tests
- ✅ Build distributions (wheel + sdist)
- ✅ Upload to PyPI
- ✅ Attach releases as GitHub Release assets
- Go to GitHub repo → Actions → "Publish to PyPI"
- Click "Run workflow"
- Choose PyPI environment:
pypi(production) ortestpypi(testing) - Click "Run workflow"
Useful for testing the workflow with TestPyPI before pushing to production.
After a successful publish:
# Install the latest version from PyPI
pip install --upgrade pawn-queue
# Verify it works
python -c "from pawn_queue import PawnQueue; print('Success!')"Or check on pypi.org/project/pawn-queue.
The csprng_verify strategy provides strong exactly-once delivery guarantees through its 4-step cryptographic protocol (write → jitter → verify → post-verify confirmation). The post-verify confirmation step (Step 4) specifically closes the late-write edge case where a slow-network competitor PUT arrives at S3 after all verification rounds have passed.
However, network conditions are unbounded, and defensive programming is always valuable. Consumers should still be idempotent where the cost is low:
async def handler(msg: Message) -> None:
order_id = msg.payload["order_id"]
# Idempotency guard: skip if already processed (e.g., checked in your DB)
if await already_processed(order_id):
await msg.ack() # safe no-op — we already handled it
return
await process_order(order_id)
await mark_processed(order_id)
await msg.ack()For atomic exactly-once semantics with no probabilistic component, use AWS S3 or Cloudflare R2 (both support conditional_write). The library selects this automatically when the backend supports it.
Apache License 2.0 - see LICENSE file for details