Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
List<Field> fields = new ArrayList<>();
// set schema
fields.addAll(
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
LanceArrowUtils.toArrowSchema(
tableDescriptor.getSchema().getRowType(),
tableDescriptor.getCustomProperties())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to confirm the return type of getCustomProperties(). If it returns an immutable Map or a Properties type rather than Map<String, String>, will tableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX) in toArrowField work correctly? Suggest using Map<String, String> in the new method signature and ensuring compatibility at the call site.

.getFields());
try {
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new Schema(fields), params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Schema;

Expand Down Expand Up @@ -54,31 +55,43 @@ public static VectorSchemaRoot convertToNonShaded(
VectorSchemaRoot.create(nonShadedSchema, nonShadedAllocator);
nonShadedRoot.allocateNew();

List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> shadedVectors =
shadedRoot.getFieldVectors();
List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();
try {
List<org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector> shadedVectors =
shadedRoot.getFieldVectors();
List<FieldVector> nonShadedVectors = nonShadedRoot.getFieldVectors();

for (int i = 0; i < shadedVectors.size(); i++) {
copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
}
for (int i = 0; i < shadedVectors.size(); i++) {
copyVectorData(shadedVectors.get(i), nonShadedVectors.get(i));
}

nonShadedRoot.setRowCount(shadedRoot.getRowCount());
return nonShadedRoot;
nonShadedRoot.setRowCount(shadedRoot.getRowCount());
return nonShadedRoot;
} catch (Exception e) {
nonShadedRoot.close();
throw e;
}
}

private static void copyVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedVector,
FieldVector nonShadedVector) {

if (shadedVector
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
&& nonShadedVector instanceof ListVector) {
copyListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(ListVector) nonShadedVector);
return;
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) {
if (nonShadedVector instanceof FixedSizeListVector) {
copyListToFixedSizeListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(FixedSizeListVector) nonShadedVector);
return;
} else if (nonShadedVector instanceof ListVector) {
copyListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector)
shadedVector,
(ListVector) nonShadedVector);
return;
}
}

List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> shadedBuffers =
Expand Down Expand Up @@ -143,4 +156,57 @@ private static void copyListVectorData(
// For ListVector, we need to manually set lastSet to avoid offset buffer recalculation
nonShadedListVector.setLastSet(valueCount - 1);
}

private static void copyListToFixedSizeListVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector
shadedListVector,
FixedSizeListVector nonShadedFixedSizeListVector) {

int valueCount = shadedListVector.getValueCount();
int expectedListSize = nonShadedFixedSizeListVector.getListSize();
int expectedTotalValueCount = valueCount * expectedListSize;

// Validate that backing data vector element count matches expected fixed-size layout.
// If every list has exactly expectedListSize elements, the total must be
// valueCount * expectedListSize.
int totalValueCount = shadedListVector.getDataVector().getValueCount();
if (totalValueCount != expectedTotalValueCount) {
throw new IllegalArgumentException(
String.format(
"Total child elements (%d) does not match expected %d for FixedSizeList conversion.",
totalValueCount, expectedTotalValueCount));
}

// Copy the child data vector recursively (e.g., the float values)
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedDataVector =
shadedListVector.getDataVector();
FieldVector nonShadedDataVector = nonShadedFixedSizeListVector.getDataVector();

if (shadedDataVector != null && nonShadedDataVector != null) {
copyVectorData(shadedDataVector, nonShadedDataVector);
}

// FixedSizeListVector only has a validity buffer (no offset buffer).
// Copy the validity buffer from the shaded ListVector.
List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> shadedBuffers =
shadedListVector.getFieldBuffers();
List<ArrowBuf> nonShadedBuffers = nonShadedFixedSizeListVector.getFieldBuffers();

// Both ListVector and FixedSizeListVector have validity as their first buffer
if (!shadedBuffers.isEmpty() && !nonShadedBuffers.isEmpty()) {
org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf shadedValidityBuf =
shadedBuffers.get(0);
ArrowBuf nonShadedValidityBuf = nonShadedBuffers.get(0);

long size = Math.min(shadedValidityBuf.capacity(), nonShadedValidityBuf.capacity());
if (size > 0) {
ByteBuffer srcBuffer = shadedValidityBuf.nioBuffer(0, (int) size);
srcBuffer.position(0);
srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE));
nonShadedValidityBuf.setBytes(0, srcBuffer);
}
}

nonShadedFixedSizeListVector.setValueCount(valueCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,76 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is needed because Lance
* requires non-shaded Arrow API.
*/
public class LanceArrowUtils {

/** Property suffix for configuring a fixed-size list Arrow type on array columns. */
public static final String FIXED_SIZE_LIST_SIZE_SUFFIX = ".arrow.fixed-size-list.size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property key format is <column_name>.arrow.fixed-size-list.size. If the column name itself contains . (e.g., a.b), it leads to parsing ambiguity. For example, a.b.arrow.fixed-size-list.size — it's unclear whether this refers to column a.b or column a with some unrelated suffix.

Suggestion:

  1. Explicitly document that column names must not contain .; or
  2. Validate in toArrowField that the column name does not contain ., and throw a meaningful error message if it does.


/** Returns the non-shaded Arrow schema of the specified Fluss RowType. */
public static Schema toArrowSchema(RowType rowType) {
return toArrowSchema(rowType, Collections.emptyMap());
}

/**
* Returns the non-shaded Arrow schema of the specified Fluss RowType, using table properties to
* determine whether array columns should use FixedSizeList instead of List.
*
* <p>When a table property {@code <column>.arrow.fixed-size-list.size} is set, the
* corresponding ARRAY column will be emitted as {@code FixedSizeList<element>(size)} instead of
* {@code List<element>}.
*/
public static Schema toArrowSchema(RowType rowType, Map<String, String> tableProperties) {
List<Field> fields =
rowType.getFields().stream()
.map(f -> toArrowField(f.getName(), f.getType()))
.map(f -> toArrowField(f.getName(), f.getType(), tableProperties))
.collect(Collectors.toList());
return new Schema(fields);
}

private static Field toArrowField(String fieldName, DataType logicalType) {
FieldType fieldType =
new FieldType(logicalType.isNullable(), toArrowType(logicalType), null);
private static Field toArrowField(
String fieldName, DataType logicalType, Map<String, String> tableProperties) {
ArrowType arrowType;
if (logicalType instanceof ArrayType && tableProperties != null) {
String sizeStr = tableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX);
if (sizeStr != null) {
int listSize = -1;
try {
listSize = Integer.parseInt(sizeStr);
} catch (NumberFormatException ignored) {
// Not really ignored, IllegalArgumentEx still thrown below.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validating for a positive integer is correct. However, if the property value is an empty string "" or whitespace " ", Integer.parseInt will throw a NumberFormatException without a user-friendly error message. Suggest catching NumberFormatException and converting it to an IllegalArgumentException that includes the property key name, making it easier for users to locate the configuration issue.

int listSize;
try {
    listSize = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
    throw new IllegalArgumentException(
        String.format("Invalid value '%s' for property '%s', expected a positive integer.",
            sizeStr, fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX), e);
}
checkArgument(listSize > 0, "...");

// This removes duplicate boilerplates for throwing IAE
}

checkArgument(
listSize > 0,
"Invalid value '%s' for property '%s'. Expected a positive integer.",
sizeStr,
fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX);
arrowType = new ArrowType.FixedSizeList(listSize);
} else {
arrowType = toArrowType(logicalType);
}
} else {
arrowType = toArrowType(logicalType);
}
FieldType fieldType = new FieldType(logicalType.isNullable(), arrowType, null);
List<Field> children = null;
if (logicalType instanceof ArrayType) {
children =
Collections.singletonList(
toArrowField("element", ((ArrayType) logicalType).getElementType()));
toArrowField(
"element",
((ArrayType) logicalType).getElementType(),
tableProperties));
}
return new Field(fieldName, fieldType, children);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.record.GenericRecord;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.types.Tuple2;
Expand All @@ -46,7 +47,9 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -69,6 +72,8 @@

/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
class LanceTieringTest {
private static final int EMBEDDING_LIST_SIZE = 4;

private @TempDir File tempWarehouseDir;
private LanceLakeTieringFactory lanceLakeTieringFactory;
private Configuration configuration;
Expand All @@ -91,13 +96,16 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
TablePath tablePath = TablePath.of("lance", "logTable");
Map<String, String> customProperties = new HashMap<>();
customProperties.put("lance.batch_size", "256");
customProperties.put(
"embedding" + LanceArrowUtils.FIXED_SIZE_LIST_SIZE_SUFFIX,
String.valueOf(EMBEDDING_LIST_SIZE));
LanceConfig config =
LanceConfig.from(
configuration.toMap(),
customProperties,
tablePath.getDatabaseName(),
tablePath.getTableName());
Schema schema = createTable(config);
Schema schema = createTable(config, customProperties);

TableDescriptor descriptor =
TableDescriptor.builder()
Expand Down Expand Up @@ -180,6 +188,13 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
new RootAllocator(),
config.getDatasetUri(),
LanceConfig.genReadOptionFromConfig(config))) {
// verify the embedding column uses FixedSizeList in the Lance schema
org.apache.arrow.vector.types.pojo.Field embeddingField =
dataset.getSchema().findField("embedding");
assertThat(embeddingField.getType()).isInstanceOf(ArrowType.FixedSizeList.class);
assertThat(((ArrowType.FixedSizeList) embeddingField.getType()).getListSize())
.isEqualTo(EMBEDDING_LIST_SIZE);

ArrowReader reader = dataset.newScan().scanBatches();
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();

Expand All @@ -189,8 +204,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
reader.loadNextBatch();
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
List<LogRecord> expectRecords = recordsByBucket.get(partitionBucket);
verifyLogTableRecords(
readerRoot, expectRecords, bucket, isPartitioned, partition);
verifyLogTableRecords(readerRoot, expectRecords);
}
}
assertThat(reader.loadNextBatch()).isFalse();
Expand All @@ -216,14 +230,13 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
}
}

private void verifyLogTableRecords(
VectorSchemaRoot root,
List<LogRecord> expectRecords,
int expectBucket,
boolean isPartitioned,
@Nullable String partition)
throws Exception {
private void verifyLogTableRecords(VectorSchemaRoot root, List<LogRecord> expectRecords) {
assertThat(root.getRowCount()).isEqualTo(expectRecords.size());

// verify the embedding vector is a FixedSizeListVector
assertThat(root.getVector("embedding")).isInstanceOf(FixedSizeListVector.class);
FixedSizeListVector embeddingVector = (FixedSizeListVector) root.getVector("embedding");

for (int i = 0; i < expectRecords.size(); i++) {
LogRecord expectRecord = expectRecords.get(i);
// check business columns:
Expand All @@ -233,6 +246,13 @@ private void verifyLogTableRecords(
.isEqualTo(expectRecord.getRow().getString(1).toString());
assertThat(((VarCharVector) root.getVector(2)).getObject(i).toString())
.isEqualTo(expectRecord.getRow().getString(2).toString());
// check embedding column
java.util.List<?> embeddingValues = embeddingVector.getObject(i);
assertThat(embeddingValues).hasSize(EMBEDDING_LIST_SIZE);
org.apache.fluss.row.InternalArray expectedArray = expectRecord.getRow().getArray(3);
for (int j = 0; j < EMBEDDING_LIST_SIZE; j++) {
assertThat((Float) embeddingValues.get(j)).isEqualTo(expectedArray.getFloat(j));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null value scenario in test data?

}
}
}

Expand Down Expand Up @@ -296,19 +316,21 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow;
if (partition != null) {
// Partitioned table: include partition field in data
genericRow = new GenericRow(3); // c1, c2, c3(partition)
genericRow.setField(0, i);
genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i));
genericRow.setField(2, BinaryString.fromString(partition)); // partition field
} else {
// Non-partitioned table
genericRow = new GenericRow(3);
genericRow.setField(0, i);
genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i));

// Partitioned table: include partition field in data
genericRow = new GenericRow(4); // c1, c2, c3(partition), embedding
genericRow.setField(0, i);
genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i));

if (partition == null) {
genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
} else {
genericRow.setField(2, BinaryString.fromString(partition));
}

genericRow.setField(
3, new GenericArray(new float[] {0.1f * i, 0.2f * i, 0.3f * i, 0.4f * i}));

LogRecord logRecord =
new GenericRecord(
i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow);
Expand All @@ -317,16 +339,19 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
return Tuple2.of(logRecords, logRecords);
}

private Schema createTable(LanceConfig config) {
private Schema createTable(LanceConfig config, Map<String, String> customProperties) {
List<Schema.Column> columns = new ArrayList<>();
columns.add(new Schema.Column("c1", DataTypes.INT()));
columns.add(new Schema.Column("c2", DataTypes.STRING()));
columns.add(new Schema.Column("c3", DataTypes.STRING()));
columns.add(new Schema.Column("embedding", DataTypes.ARRAY(DataTypes.FLOAT())));
Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns);
Schema schema = schemaBuilder.build();
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
LanceDatasetAdapter.createDataset(
config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
config.getDatasetUri(),
LanceArrowUtils.toArrowSchema(schema.getRowType(), customProperties),
params);

return schema;
}
Expand Down
Loading