From a2481a45f51453691b33d994c529f6b340e079fd Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 26 Feb 2026 16:46:54 +0800 Subject: [PATCH] [client] Clear VectorSchemaRoot to release buffer as soon as possible after a batch read finished. --- .../fluss/record/DefaultLogRecordBatch.java | 15 +++++++++++---- .../org/apache/fluss/record/LogRecordBatch.java | 10 +++++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index 188a62bbd0..c910df8616 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -340,7 +340,7 @@ private CloseableIterator columnRecordIterator( ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); - return new ArrowLogRecordIterator(reader, timestamp, outputProjection) { + return new ArrowLogRecordIterator(root, reader, timestamp, outputProjection) { @Override protected ChangeType getChangeType(int rowId) { return ChangeType.APPEND_ONLY; @@ -358,7 +358,7 @@ protected ChangeType getChangeType(int rowId) { ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); - return new ArrowLogRecordIterator(reader, timestamp, outputProjection) { + return new ArrowLogRecordIterator(root, reader, timestamp, outputProjection) { @Override protected ChangeType getChangeType(int rowId) { return changeTypeVector.getChangeType(rowId); @@ -369,13 +369,18 @@ protected ChangeType getChangeType(int rowId) { /** The basic implementation for Arrow log record iterator. */ private abstract class ArrowLogRecordIterator extends LogRecordIterator { + private final VectorSchemaRoot root; private final ArrowReader reader; private final long timestamp; private int rowId = 0; @Nullable private final ProjectedRow outputProjection; private ArrowLogRecordIterator( - ArrowReader reader, long timestamp, @Nullable ProjectedRow outputProjection) { + VectorSchemaRoot root, + ArrowReader reader, + long timestamp, + @Nullable ProjectedRow outputProjection) { + this.root = root; this.reader = reader; this.timestamp = timestamp; this.outputProjection = outputProjection; @@ -410,7 +415,9 @@ protected boolean ensureNoneRemaining() { @Override public void close() { - // reader has no resources to release + // Clear old buffers before the next batch load to avoid temporary + // duplication of buffers (old + new) during loadFieldBuffers. + root.clear(); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java index 06536e5821..9df8a7898b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java @@ -187,9 +187,13 @@ interface ReadContext { *

The schema root is used to read the Arrow records in the batch, if this is a {@link * LogFormat#ARROW} record batch. * - *

Note: DO NOT close the vector schema root because it is shared across multiple - * batches. Use {@link VectorSchemaRoot#slice(int)} to cache the root and close it after - * use. + *

Lifecycle: DO NOT close the returned {@link VectorSchemaRoot} because it is + * shared across multiple batches. Each new batch load replaces the old buffers inside the + * same root. To avoid temporary buffer duplication (old and new buffers coexisting during + * the load), callers should call {@link VectorSchemaRoot#clear()} after finishing each + * batch to release old buffers immediately before the next batch is loaded. If you need to + * retain the data beyond the current batch, use {@link VectorSchemaRoot#slice(int)} to + * create an independent copy and close it after use. * * @param schemaId The schema id of the record batch. * @return The (maybe projected) schema root of the record batch.