Closed
Conversation
Add batch_size parameter to _task_to_record_batches, _record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches, and DataScan.to_arrow_batch_reader so users can control the number of rows per RecordBatch returned by PyArrow's Scanner. Closes partially apache#3036 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When streaming=True, batches are yielded as they are produced by PyArrow without materializing entire files into memory. Files are still processed sequentially, preserving file ordering. The inner method handles the global limit correctly when called with all tasks, avoiding double-counting. This addresses the OOM issue in apache#3036 for single-file-at-a-time streaming. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline: - Queue backpressure caps memory (scan.max-buffered-batches, default 16) - Semaphore limits concurrent file reads (concurrent_files param) - Cancel event with timeouts on all blocking ops (no lock over IO) - Error propagation and early termination support When streaming=True and concurrent_files > 1, batches are yielded as they arrive from parallel file reads. File ordering is not guaranteed (documented). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Measures records/sec and peak memory across streaming, concurrent_files, and batch_size configurations to validate performance characteristics of the new scan modes introduced for apache#3036. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
Closes #3036
Summary
Adds a read throughput micro-benchmark to measure records/sec and peak Arrow memory
across
streamingandconcurrent_filesconfigurations introduced in PRs 0-2.Synthetic Data
id(int64),value(float64),label(string),flag(bool),ts(timestamp[us, tz=UTC])pa.total_allocated_bytes()(PyArrow C++ memory pool, not Python heap via tracemalloc)Configurations (6 parameterized tests)
All tests use PyArrow's default
batch_size=131,072. The variable under test is the concurrency model:streaming=Falsestreaming=True, concurrent_files=1streaming=True, concurrent_files=2streaming=True, concurrent_files=4streaming=True, concurrent_files=8streaming=True, concurrent_files=16Benchmark Results (local SSD, macOS, 16-core, Python 3.13)
default(executor.map, all files parallel)streaming, concurrent_files=1streaming, concurrent_files=2streaming, concurrent_files=4streaming, concurrent_files=8streaming, concurrent_files=16Key observations
concurrent_files=1reduces peak memory 63x (637 MB → 10 MB) — processes one file at a time, ideal for memory-constrained environmentsconcurrent_files=4matches default throughput (178M vs 196M rows/s) at 82% less memory (114 MB vs 637 MB)concurrent_files=8beats default by 15% (225M vs 196M rows/s) at 58% less memory (269 MB vs 637 MB) — the sweet spot on this hardwareconcurrent_files=16plateaus atconcurrent_files=8— on local SSD, GIL contention and memory bandwidth become the bottleneck rather than IO. On network storage (S3/GCS) where IO latency dominates, higher concurrency values areexpected to scale further
concurrent_files, giving users a predictable knob to trade memory for throughputHow to run
Are these changes tested?
Yes — this PR is a benchmark test itself (6 parameterized test cases).
Are there any user-facing changes?
No — benchmark infrastructure only