Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
b8d1ee3
Add Kafka stream backend architecture with split KV/stream protocols
claude Mar 27, 2026
44f570d
Simplify to single-backend architecture with ops layer wired to all c…
claude Mar 27, 2026
51afd25
Add queue commit/nack semantics and retry support for Kafka resilience
claude Mar 27, 2026
a5a6584
Concurrent task execution per worker and Kafka consumer heartbeat
claude Mar 27, 2026
29685ca
Revert "Concurrent task execution per worker and Kafka consumer heart…
claude Mar 27, 2026
de16450
Add worker_id to Kafka client IDs for observability
claude Mar 27, 2026
8af3122
Route activity system through ops layer with Kafka activity topic
claude Mar 27, 2026
7099247
Split backends into packages with domain-specific modules
claude Mar 27, 2026
35a6a2a
Go full async and rename backend methods to descriptive names
claude Mar 27, 2026
84423a9
Add Kafka integration tests and CI workflow
claude Mar 27, 2026
a4d87cf
Fix CI: use correct Kafka image tag, override addopts, fix readiness …
claude Mar 27, 2026
659f9fe
Switch to apache/kafka:3.9.2 for CI Kafka service
claude Mar 27, 2026
969f6f6
CI: disable fail-fast, add verbose test output for debugging
claude Mar 27, 2026
f763709
Fix Kafka consumer hangs: per-topic group IDs, retry loop, faster hea…
claude Mar 27, 2026
48c3e2d
Use manual partition assignment instead of consumer groups
claude Mar 27, 2026
b50156c
Fix force_metadata_update — use partition discovery retry loop
claude Mar 27, 2026
584994c
Remove consumer group_id to avoid group coordinator hangs
claude Mar 27, 2026
66082e7
CI: capture Kafka test output in job summary on failure
claude Mar 27, 2026
918363a
CI: post Kafka test output as PR comment on failure
claude Mar 27, 2026
896ecdc
Use subscribe() with per-topic group IDs, set rebalance delay to 0
claude Mar 27, 2026
a861c9a
Use admin metadata for partition discovery in manual assignment
claude Mar 27, 2026
1a96fed
CI: use curl for PR comment instead of github-script
claude Mar 27, 2026
67922c9
CI: upload test output as artifact instead of PR comment
claude Mar 27, 2026
86fde70
Add debug prints to queue_pop and test_push_and_pop
claude Mar 27, 2026
686f3d2
Update uv.lock after adding kafka extra dependency
claude Mar 27, 2026
985d483
CI: emit test output as warning annotations for API access
claude Mar 27, 2026
5f6384a
Better debug output: print consumer state on timeout, filter annotations
claude Mar 27, 2026
ce54be7
CI: filter annotations to only show failures and debug output
claude Mar 27, 2026
e7fb281
CI: re-trigger after transient Docker pull failure
claude Mar 27, 2026
3f792b1
CI: retry after transient failures
claude Mar 27, 2026
47c0324
CI: use apache/kafka:latest to avoid Docker pull issues with pinned tag
claude Mar 27, 2026
d0aa733
CI: switch to confluentinc/cp-kafka:7.7.1 for reliable Docker pulls
claude Mar 27, 2026
92ab91c
CI: use docker run instead of service containers for Kafka
claude Mar 27, 2026
e4b3946
Add docker-compose.kafka.yml, clean up debug prints, use docker run i…
claude Mar 27, 2026
424d41c
Fix queue_pop message buffer and produce() key type handling
tcdent Mar 27, 2026
cabf769
Kafka consumer groups, full async, producer-side topic creation
tcdent Mar 27, 2026
a9fdbdd
Class-based backend architecture, eliminate ops passthrough layer
tcdent Mar 27, 2026
b2e28c5
Flatten backend modules, remove dead code, clean up noise
tcdent Mar 27, 2026
a3c3f6d
Kafka headers, stateless activity backend, schedule backend, pool sup…
tcdent Mar 28, 2026
17f9819
Extract activity from backends into producer/consumer pattern
tcdent Mar 28, 2026
4259527
Typed worker messages, Task as pure data, multiprocessing IPC
tcdent Mar 28, 2026
2c7ef1f
Partitioned Redis queues with scan-based fair dequeue
tcdent Mar 28, 2026
b1ef5d5
Remove pubsub, inline dequeue, queue.complete, activity over IPC
tcdent Mar 29, 2026
b382ef3
Schedule backend, session cleanup, dead code removal, resiliency tests
tcdent Mar 29, 2026
4f9ba7c
Kafka state: raise NotImplementedError, drop in-memory KV/counter caches
tcdent Mar 29, 2026
68e1b94
Fix KafkaStateBackend instantiation (no longer takes backend arg)
tcdent Mar 30, 2026
042e81e
Restore docstrings stripped during refactor
tcdent Mar 30, 2026
a068772
Update Kafka integration tests and fix queue interface mismatch
tcdent Mar 30, 2026
78d8f79
Fix Kafka CI: add OFFSETS_TOPIC_REPLICATION_FACTOR, test timeout
tcdent Mar 30, 2026
d1a5ccf
Remove --timeout flag (pytest-timeout not installed)
tcdent Mar 30, 2026
084f8ed
Queue fairness benchmark: partition-level metrics, fix stale APIs
tcdent Mar 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
# -----------------------------------------------------------------------
# Unit tests — no external services (fakeredis + SQLite)
# -----------------------------------------------------------------------

test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.12", "3.13"]

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true

- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}

- name: Install dependencies
run: uv sync --dev

- name: Run unit tests
run: |
uv run pytest tests/ \
--ignore=tests/test_kafka_integration.py \
-o "addopts=" \
-v --tb=long

# -----------------------------------------------------------------------
# Kafka integration tests — real broker via docker run
# -----------------------------------------------------------------------
test-kafka:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Start Kafka broker
run: |
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_LOG_CLEANER_MIN_COMPACTION_LAG_MS=0 \
-e KAFKA_LOG_CLEANER_MIN_CLEANABLE_RATIO=0.01 \
-e KAFKA_LOG_RETENTION_MS=60000 \
-e KAFKA_NUM_PARTITIONS=1 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e CLUSTER_ID=ciTestCluster0001 \
apache/kafka:3.9.0

- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true

- name: Set up Python
run: uv python install 3.12

- name: Install dependencies
run: uv sync --dev --extra kafka

- name: Wait for Kafka to be ready
run: |
echo "Waiting for Kafka..."
for i in $(seq 1 30); do
if nc -z localhost 9092 2>/dev/null; then
echo "Kafka port is open"
sleep 5
echo "Kafka is ready"
exit 0
fi
echo " attempt $i/30..."
sleep 2
done
echo "Kafka failed to start"
docker logs kafka
exit 1

- name: Run Kafka integration tests
timeout-minutes: 2
run: |
uv run pytest tests/test_kafka_integration.py \
-o "addopts=" \
-v --tb=long 2>&1 | tee /tmp/kafka_test_output.txt
exit ${PIPESTATUS[0]}
env:
AGENTEXEC_STATE_BACKEND: agentexec.state.kafka
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
AGENTEXEC_KAFKA_DEFAULT_PARTITIONS: "2"
AGENTEXEC_KAFKA_REPLICATION_FACTOR: "1"

- name: Show Kafka logs on failure
if: failure()
run: docker logs kafka 2>&1 | tail -50

- name: Create failure check annotation with output
if: failure()
run: |
if [ -f /tmp/kafka_test_output.txt ]; then
grep -E '\[queue_|FAILED|ERROR|AssertionError|TIMEOUT|short test summary' /tmp/kafka_test_output.txt | tail -9 | while IFS= read -r line; do
echo "::warning::$line"
done
fi
48 changes: 48 additions & 0 deletions docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Kafka development environment for running integration tests locally.
#
# Usage:
# docker compose -f docker-compose.kafka.yml up -d
#
# KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
# AGENTEXEC_STATE_BACKEND=agentexec.state.kafka \
# uv run pytest tests/test_kafka_integration.py -v
#
# docker compose -f docker-compose.kafka.yml down
#
# Kafka UI available at http://localhost:8080

services:
kafka:
image: apache/kafka:3.9.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: "agentexec-dev-cluster-01"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
healthcheck:
test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
interval: 5s
timeout: 10s
retries: 15
start_period: 15s

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: agentexec
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
kafka:
condition: service_healthy
75 changes: 75 additions & 0 deletions examples/queue-fairness/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Queue Fairness Benchmark

Validates that the scan-based partitioned queue distributes work fairly across both workers and partition keys.

## Background

agentexec uses Redis `SCAN` to iterate partition queues during dequeue. SCAN returns keys in hash-table order, which is effectively random — this gives us pseudo-random partition selection without explicit shuffling or round-robin bookkeeping.

This benchmark measures two dimensions of fairness:

- **Worker fairness**: Are tasks spread evenly across workers?
- **Partition fairness**: Are all partitions served at a similar pace, or do some starve while others get immediate attention?

## Usage

```bash
uv run python examples/queue-fairness/run.py
uv run python examples/queue-fairness/run.py --partitions 1000 --tasks-per-partition 10 --workers 8
```

Requires a running Redis instance (`REDIS_URL` environment variable).

## What it does

1. Enqueues `partitions * tasks_per_partition` tasks, each routed to a named partition queue
2. Spawns N async workers that pop, simulate work, then release the partition lock via `complete()`
3. Records timing data for every task: which worker, which partition, wait time, pickup time
4. Reports fairness metrics at the end

## Results

At 1000 partitions, 10 tasks each (10,000 total), 8 workers:

### Worker fairness

Each worker processed between 1243 and 1257 tasks (ideal: 1250). Standard deviation of 5.2 across 8 workers — essentially uniform distribution.

```
Worker 0: 1257 tasks (12.6%)
Worker 1: 1249 tasks (12.5%)
Worker 2: 1248 tasks (12.5%)
Worker 3: 1257 tasks (12.6%)
Worker 4: 1246 tasks (12.5%)
Worker 5: 1243 tasks (12.4%)
Worker 6: 1247 tasks (12.5%)
Worker 7: 1253 tasks (12.5%)
```

### Partition fairness

The "first-task pickup time" measures when each partition's first task gets served, relative to the start. A fair system serves all partitions at roughly the same pace — no partition should wait significantly longer than others for its first task.

```
First-task pickup time (seconds after start):
Mean: 15.606s
Median: 15.685s
Stdev: 9.030s
Min: 0.019s
Max: 31.103s
```

The median first pickup (15.7s) lands at almost exactly half the total runtime (31.6s), which is what you'd expect from a uniform distribution. No partitions were flagged as starved (first pickup > 2x the median).

### Throughput

Throughput held steady at ~317 tasks/sec across all partition counts tested (50, 200, 1000). SCAN-based dequeue does not degrade as the number of partitions grows.

## Why it works

Redis `SCAN` iterates the hash table in slot order, which is determined by the hash of each key. Since partition keys hash to different slots, the iteration order is effectively random and changes as keys are added or removed. This gives us:

- **No hot spots**: No partition is systematically visited first or last
- **No coordination**: Workers don't need to agree on which partition to try next
- **Free rebalancing**: As partitions drain and their keys disappear, SCAN naturally skips them
- **Lock-aware skipping**: Locked partitions are skipped immediately, so workers don't block on busy partitions — they move on to the next available one
Loading
Loading