Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,39 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```

You can control the number of rows per batch using the `batch_size` parameter:

```python
for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

By default, each file's batches are materialized in memory before being yielded. For large files that may exceed available memory, use `streaming=True` to yield batches as they are produced without materializing entire files:

```python
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:

```python
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

The maximum number of buffered batches can be tuned via the `scan.max-buffered-batches` table property (default 16).

**Ordering semantics:**

| Configuration | File ordering | Within-file ordering |
|---|---|---|
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
| `streaming=True` | Batches grouped by file, sequential | Row order |
| `streaming=True, concurrent_files>1` | Interleaved across files (no grouping guarantee) | Row order within each file |

In all modes, within-file batch ordering follows row order. The `limit` parameter is enforced correctly regardless of configuration.

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down Expand Up @@ -1619,6 +1652,35 @@ table.scan(
).to_arrow_batch_reader()
```

The `batch_size` parameter controls the maximum number of rows per RecordBatch (default is PyArrow's 131,072 rows):

```python
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(batch_size=1000)
```

Use `streaming=True` to avoid materializing entire files in memory. This yields batches as they are produced by PyArrow, one file at a time:

```python
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(streaming=True)
```

For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:

```python
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
```

When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Arrow Batch Reader section](#arrow-batch-reader) above for details.

### Pandas

<!-- prettier-ignore-start -->
Expand Down
191 changes: 167 additions & 24 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import logging
import operator
import os
import queue
import re
import threading
import uuid
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -1581,6 +1583,7 @@ def _task_to_record_batches(
partition_spec: PartitionSpec | None = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
downcast_ns_timestamp_to_us: bool | None = None,
batch_size: int | None = None,
) -> Iterator[pa.RecordBatch]:
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
Expand Down Expand Up @@ -1612,14 +1615,18 @@ def _task_to_record_batches(

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
scanner_kwargs: dict[str, Any] = {
"fragment": fragment,
"schema": physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
"filter": pyarrow_filter if not positional_deletes else None,
"columns": [col.name for col in file_project_schema.columns],
}
if batch_size is not None:
scanner_kwargs["batch_size"] = batch_size

fragment_scanner = ds.Scanner.from_fragment(**scanner_kwargs)

next_index = 0
batches = fragment_scanner.to_batches()
Expand Down Expand Up @@ -1677,6 +1684,89 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
return deletes_per_file


_QUEUE_SENTINEL = object()


def _bounded_concurrent_batches(
tasks: list[FileScanTask],
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
concurrent_files: int,
max_buffered_batches: int = 16,
) -> Iterator[pa.RecordBatch]:
"""Read batches from multiple files concurrently with bounded memory.

Workers read from files in parallel (up to concurrent_files at a time) and push
batches into a shared queue. The consumer yields batches from the queue.
A sentinel value signals completion, avoiding timeout-based polling.

Args:
tasks: The file scan tasks to process.
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
concurrent_files: Maximum number of files to read concurrently.
max_buffered_batches: Maximum number of batches to buffer in the queue.
"""
if not tasks:
return

batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
cancel_event = threading.Event()
pending_count = len(tasks)
pending_lock = threading.Lock()
file_semaphore = threading.Semaphore(concurrent_files)

def worker(task: FileScanTask) -> None:
nonlocal pending_count
acquired = False
try:
# Acquire semaphore — blocks until a slot is available or cancelled
while not cancel_event.is_set():
if file_semaphore.acquire(timeout=0.5):
acquired = True
break
if cancel_event.is_set():
return

for batch in batch_fn(task):
if cancel_event.is_set():
return
batch_queue.put(batch)
except BaseException as e:
if not cancel_event.is_set():
batch_queue.put(e)
finally:
if acquired:
file_semaphore.release()
with pending_lock:
pending_count -= 1
if pending_count == 0:
batch_queue.put(_QUEUE_SENTINEL)

executor = ExecutorFactory.get_or_create()
futures = [executor.submit(worker, task) for task in tasks]

try:
while True:
item = batch_queue.get()

if item is _QUEUE_SENTINEL:
break

if isinstance(item, BaseException):
raise item

yield item
finally:
cancel_event.set()
# Drain the queue to unblock any workers stuck on put()
while not batch_queue.empty():
try:
batch_queue.get_nowait()
except queue.Empty:
break
for future in futures:
future.cancel()


class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
Expand Down Expand Up @@ -1756,15 +1846,34 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:

return result

def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
def to_record_batches(
self,
tasks: Iterable[FileScanTask],
batch_size: int | None = None,
streaming: bool = False,
concurrent_files: int = 1,
) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.

Ordering semantics:
- Default (streaming=False): Batches are grouped by file in task submission order.
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
In all modes, within-file batch ordering follows row order.

Args:
tasks: FileScanTasks representing the data files and delete files to read from.
batch_size: The number of rows per batch. If None, PyArrow's default is used.
streaming: If True, yield batches as they are produced without materializing
entire files into memory. Files are still processed sequentially when
concurrent_files=1.
concurrent_files: Number of files to read concurrently when streaming=True.
When > 1, batches may arrive interleaved across files. Ignored when
streaming=False.

Returns:
An Iterator of PyArrow RecordBatches.
Expand All @@ -1776,34 +1885,67 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)

total_row_count = 0
executor = ExecutorFactory.get_or_create()
if streaming and concurrent_files > 1:
# Concurrent streaming path: read multiple files in parallel with bounded queue.
# Ordering is NOT guaranteed across files — batches arrive as produced.
task_list = list(tasks)

def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for batch in batches:
from pyiceberg.table import TableProperties

max_buffered = int(
self._table_metadata.properties.get(
TableProperties.SCAN_MAX_BUFFERED_BATCHES,
TableProperties.SCAN_MAX_BUFFERED_BATCHES_DEFAULT,
)
)

total_row_count = 0
for batch in _bounded_concurrent_batches(task_list, batch_fn, concurrent_files, max_buffered):
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
yield batch.slice(0, self._limit - total_row_count)

limit_reached = True
break
return
else:
yield batch
total_row_count += current_batch_size
elif streaming:
# Streaming path: process all tasks sequentially, yielding batches as produced.
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
# when called with all tasks, so no outer limit check is needed.
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
else:
# Non-streaming path: existing behavior with executor.map + list()
total_row_count = 0
executor = ExecutorFactory.get_or_create()

def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size))

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
yield batch.slice(0, self._limit - total_row_count)

limit_reached = True
break
else:
yield batch
total_row_count += current_batch_size

if limit_reached:
# This break will also cancel all running tasks in the executor
break
if limit_reached:
# This break will also cancel all running tasks in the executor
break

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]]
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]], batch_size: int | None = None
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand All @@ -1822,6 +1964,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
self._downcast_ns_timestamp_to_us,
batch_size,
)
for batch in batches:
if self._limit is not None:
Expand Down
23 changes: 21 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1

SCAN_MAX_BUFFERED_BATCHES = "scan.max-buffered-batches"
SCAN_MAX_BUFFERED_BATCHES_DEFAULT = 16


class Transaction:
_table: Table
Expand Down Expand Up @@ -2157,13 +2160,29 @@ def to_arrow(self) -> pa.Table:
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
def to_arrow_batch_reader(
self, batch_size: int | None = None, streaming: bool = False, concurrent_files: int = 1
) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.

Ordering semantics:
- Default (streaming=False): Batches are grouped by file in task submission order.
- streaming=True, concurrent_files=1: Batches are grouped by file, processed sequentially.
- streaming=True, concurrent_files>1: Batches may be interleaved across files.
In all modes, within-file batch ordering follows row order.

Args:
batch_size: The number of rows per batch. If None, PyArrow's default is used.
streaming: If True, yield batches as they are produced without materializing
entire files into memory. Files are still processed sequentially when
concurrent_files=1.
concurrent_files: Number of files to read concurrently when streaming=True.
When > 1, batches may arrive interleaved across files.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
Expand All @@ -2175,7 +2194,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())
).to_record_batches(self.plan_files(), batch_size=batch_size, streaming=streaming, concurrent_files=concurrent_files)

return pa.RecordBatchReader.from_batches(
target_schema,
Expand Down
Loading
Loading