Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ private CloseableIterator<LogRecord> columnRecordIterator(
ArrowReader reader =
ArrowUtils.createArrowReader(
segment, arrowOffset, arrowLength, root, allocator, rowType);
return new ArrowLogRecordIterator(reader, timestamp, outputProjection) {
return new ArrowLogRecordIterator(root, reader, timestamp, outputProjection) {
@Override
protected ChangeType getChangeType(int rowId) {
return ChangeType.APPEND_ONLY;
Expand All @@ -358,7 +358,7 @@ protected ChangeType getChangeType(int rowId) {
ArrowReader reader =
ArrowUtils.createArrowReader(
segment, arrowOffset, arrowLength, root, allocator, rowType);
return new ArrowLogRecordIterator(reader, timestamp, outputProjection) {
return new ArrowLogRecordIterator(root, reader, timestamp, outputProjection) {
@Override
protected ChangeType getChangeType(int rowId) {
return changeTypeVector.getChangeType(rowId);
Expand All @@ -369,13 +369,18 @@ protected ChangeType getChangeType(int rowId) {

/** The basic implementation for Arrow log record iterator. */
private abstract class ArrowLogRecordIterator extends LogRecordIterator {
private final VectorSchemaRoot root;
private final ArrowReader reader;
private final long timestamp;
private int rowId = 0;
@Nullable private final ProjectedRow outputProjection;

private ArrowLogRecordIterator(
ArrowReader reader, long timestamp, @Nullable ProjectedRow outputProjection) {
VectorSchemaRoot root,
ArrowReader reader,
long timestamp,
@Nullable ProjectedRow outputProjection) {
this.root = root;
this.reader = reader;
this.timestamp = timestamp;
this.outputProjection = outputProjection;
Expand Down Expand Up @@ -410,7 +415,9 @@ protected boolean ensureNoneRemaining() {

@Override
public void close() {
// reader has no resources to release
// Clear old buffers before the next batch load to avoid temporary
// duplication of buffers (old + new) during loadFieldBuffers.
root.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ interface ReadContext {
* <p>The schema root is used to read the Arrow records in the batch, if this is a {@link
* LogFormat#ARROW} record batch.
*
* <p>Note: DO NOT close the vector schema root because it is shared across multiple
* batches. Use {@link VectorSchemaRoot#slice(int)} to cache the root and close it after
* use.
* <p><b>Lifecycle:</b> DO NOT close the returned {@link VectorSchemaRoot} because it is
* shared across multiple batches. Each new batch load replaces the old buffers inside the
* same root. To avoid temporary buffer duplication (old and new buffers coexisting during
* the load), callers should call {@link VectorSchemaRoot#clear()} after finishing each
* batch to release old buffers immediately before the next batch is loaded. If you need to
* retain the data beyond the current batch, use {@link VectorSchemaRoot#slice(int)} to
* create an independent copy and close it after use.
*
* @param schemaId The schema id of the record batch.
* @return The (maybe projected) schema root of the record batch.
Expand Down