Skip to content

feat: Add streaming and concurrent file reads to ArrowScan to reduce memory usage and increase throughput#3046

Open
sumedhsakdeo wants to merge 12 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036
Open

feat: Add streaming and concurrent file reads to ArrowScan to reduce memory usage and increase throughput#3046
sumedhsakdeo wants to merge 12 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036

Conversation

@sumedhsakdeo
Copy link

@sumedhsakdeo sumedhsakdeo commented Feb 15, 2026

Summary

Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.

This PR adds three parameters to to_arrow_batch_reader() that give users control over memory usage and parallelism:

  • batch_size — Controls the number of rows per batch passed to PyArrow's ds.Scanner. Default is PyArrow's built-in 131,072 rows.
  • streaming — When True, batches are yielded as they are produced without materializing entire files into memory. Uses a bounded queue with backpressure instead of executor.map + list().
  • concurrent_files — Number of files to read concurrently when streaming=True. A semaphore limits active file readers, and a bounded queue (max 16 batches) provides backpressure to cap memory usage.

Problem

The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously
before any are yielded to the consumer.

Solution

Before: OOM on large tables

 batches = table.scan().to_arrow_batch_reader()

After: bounded memory, tunable parallelism

batches = table.scan().to_arrow_batch_reader(
    streaming=True,
    concurrent_files=4,
    batch_size=10000,
)

Default behavior is unchanged — streaming=False preserves the existing executor.map + list() path for backwards compatibility.

Architecture

When streaming=True, batches flow through _bounded_concurrent_batches:

  1. All file tasks are submitted to the shared thread pool
  2. A Semaphore(concurrent_files) limits how many files are read simultaneously
  3. Workers push batches into a bounded Queue(maxsize=16) — when full, workers block (backpressure)
  4. The consumer yields batches from the queue via blocking queue.get()
  5. A sentinel value signals completion — no timeout-based polling
  6. On early termination (consumer stops), extra semaphore permits are released to unblock waiting workers, and the queue is drained

Ordering semantics:

Configuration File ordering Within-file ordering
Default (streaming=False) Batches grouped by file, in task submission order Row order
streaming=True Interleaved across files (no grouping guarantee) Row order within each file

PR Stack

Breakdown of this large PR into smaller PRs:

  1. PR 0: batch_size forwarding
  2. PR 1: streaming flag — stop materializing entire files
  3. PR 2: concurrent_files — bounded concurrent streaming
  4. PR 3: benchmark

Benchmark results

32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):

Config Throughput (rows/s) TTFR (ms) Peak Arrow Memory
default (all threads) 188,873,659 62.9 640.3 MB
streaming, cf=1 59,990,631 28.4 10.3 MB
streaming, cf=2 107,234,805 28.2 42.0 MB
streaming, cf=4 177,983,848 28.2 103.7 MB
streaming, cf=8 212,076,702 32.8 266.4 MB
streaming, cf=16 217,289,396 41.0 491.0 MB

TTFR = Time to First Record

Note on throughput plateau at cf=8: This benchmark runs against local filesystem where Parquet reads are CPU-bound (decompression + decoding). Throughput plateaus once enough threads saturate available cores. On cloud storage (S3/GCS/ADLS), reads are I/O-bound with 50-200ms per-file latency, so higher concurrent_files values (16-64+) would continue to show throughput gains until network bandwidth saturates. The optimal concurrent_files will be higher for remote storage than what this local benchmark suggests.

Positional deletes, row filters, and limit are handled correctly in all modes.

Are these changes tested?

Yes. 23 new unit tests across two test files, plus a micro-benchmark:

  • tests/io/test_pyarrow.py (14 tests): batch_size controls rows per batch, streaming yields all rows correctly, streaming respects limit, within-file ordering preserved, positional deletes applied correctly in all three modes (default,
    streaming, concurrent), positional deletes with limit, concurrent_files < 1 raises ValueError
  • tests/io/test_bounded_concurrent_batches.py (9 tests): single/multi-file correctness, incremental streaming, backpressure blocks producers when queue is full, error propagation from workers to consumer, early termination cancels workers
    cleanly, concurrency limit enforced, empty task list, ArrowScan integration with limit
  • tests/benchmark/test_read_benchmark.py: read throughput micro-benchmark across 6 configurations measuring rows/sec, TTFR, and peak Arrow memory

Are there any user-facing changes?

Yes. Three new optional parameters on DataScan.to_arrow_batch_reader():

  • batch_size: int | None — number of rows per batch (default: PyArrow's 131,072)
  • streaming: bool — yield batches without materializing entire files (default: False)
  • concurrent_files: int — number of files to read concurrently when streaming (default: 1)

All parameters are optional with backwards-compatible defaults. Existing code is unaffected.

Documentation updated in mkdocs/docs/api.md with usage examples and ordering semantics.

@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch 3 times, most recently from ab8c31b to 7ad9910 Compare February 15, 2026 01:47
Add batch_size parameter to _task_to_record_batches,
_record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches,
and DataScan.to_arrow_batch_reader so users can control the number of
rows per RecordBatch returned by PyArrow's Scanner.

Closes partially apache#3036

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 7ad9910 to c86f0be Compare February 15, 2026 02:10
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from c86f0be to 05e07d1 Compare February 15, 2026 02:27
sumedhsakdeo and others added 3 commits February 14, 2026 18:28
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When streaming=True, batches are yielded as they are produced by PyArrow
without materializing entire files into memory. Files are still processed
sequentially, preserving file ordering. The inner method handles the
global limit correctly when called with all tasks, avoiding double-counting.

This addresses the OOM issue in apache#3036 for single-file-at-a-time streaming.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline:
- Queue backpressure caps memory (scan.max-buffered-batches, default 16)
- Semaphore limits concurrent file reads (concurrent_files param)
- Cancel event with timeouts on all blocking ops (no lock over IO)
- Error propagation and early termination support

When streaming=True and concurrent_files > 1, batches are yielded
as they arrive from parallel file reads. File ordering is not
guaranteed (documented).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 05e07d1 to 05f330d Compare February 15, 2026 02:29
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 05f330d to 240a860 Compare February 15, 2026 02:32
sumedhsakdeo and others added 2 commits February 14, 2026 18:45
Setting `mock.call_count = 0` does not actually reset the mock's
internal call tracking, causing the second assertion to see accumulated
calls from both test phases. Use `reset_mock()` instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sumedhsakdeo and others added 2 commits February 15, 2026 05:06
Add a parametrized benchmark case for default (executor.map) with
max_workers=4 to compare memory/throughput against unbounded threading.
Add TTFR (time to first record) measurement across all configurations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant