-
Notifications
You must be signed in to change notification settings - Fork 508
Fix Lance writer to emit Arrow FixedSizeList for array columns to enable native vector search #2707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
35cb755
a6acf1e
029e204
4164ceb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,31 +47,76 @@ | |
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.fluss.utils.Preconditions.checkArgument; | ||
|
|
||
| /** | ||
| * Utilities for converting Fluss RowType to non-shaded Arrow Schema. This is needed because Lance | ||
| * requires non-shaded Arrow API. | ||
| */ | ||
| public class LanceArrowUtils { | ||
|
|
||
| /** Property suffix for configuring a fixed-size list Arrow type on array columns. */ | ||
| public static final String FIXED_SIZE_LIST_SIZE_SUFFIX = ".arrow.fixed-size-list.size"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The property key format is <column_name>.arrow.fixed-size-list.size. If the column name itself contains . (e.g., a.b), it leads to parsing ambiguity. For example, a.b.arrow.fixed-size-list.size — it's unclear whether this refers to column a.b or column a with some unrelated suffix. Suggestion:
|
||
|
|
||
| /** Returns the non-shaded Arrow schema of the specified Fluss RowType. */ | ||
| public static Schema toArrowSchema(RowType rowType) { | ||
| return toArrowSchema(rowType, Collections.emptyMap()); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the non-shaded Arrow schema of the specified Fluss RowType, using table properties to | ||
| * determine whether array columns should use FixedSizeList instead of List. | ||
| * | ||
| * <p>When a table property {@code <column>.arrow.fixed-size-list.size} is set, the | ||
| * corresponding ARRAY column will be emitted as {@code FixedSizeList<element>(size)} instead of | ||
| * {@code List<element>}. | ||
| */ | ||
| public static Schema toArrowSchema(RowType rowType, Map<String, String> tableProperties) { | ||
| List<Field> fields = | ||
| rowType.getFields().stream() | ||
| .map(f -> toArrowField(f.getName(), f.getType())) | ||
| .map(f -> toArrowField(f.getName(), f.getType(), tableProperties)) | ||
| .collect(Collectors.toList()); | ||
| return new Schema(fields); | ||
| } | ||
|
|
||
| private static Field toArrowField(String fieldName, DataType logicalType) { | ||
| FieldType fieldType = | ||
| new FieldType(logicalType.isNullable(), toArrowType(logicalType), null); | ||
| private static Field toArrowField( | ||
| String fieldName, DataType logicalType, Map<String, String> tableProperties) { | ||
| ArrowType arrowType; | ||
| if (logicalType instanceof ArrayType && tableProperties != null) { | ||
| String sizeStr = tableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX); | ||
| if (sizeStr != null) { | ||
| int listSize = -1; | ||
| try { | ||
| listSize = Integer.parseInt(sizeStr); | ||
| } catch (NumberFormatException ignored) { | ||
| // Not really ignored, IllegalArgumentEx still thrown below. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validating for a positive integer is correct. However, if the property value is an empty string "" or whitespace " ", Integer.parseInt will throw a NumberFormatException without a user-friendly error message. Suggest catching NumberFormatException and converting it to an IllegalArgumentException that includes the property key name, making it easier for users to locate the configuration issue. |
||
| // This removes duplicate boilerplates for throwing IAE | ||
| } | ||
|
|
||
| checkArgument( | ||
| listSize > 0, | ||
| "Invalid value '%s' for property '%s'. Expected a positive integer.", | ||
| sizeStr, | ||
| fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX); | ||
| arrowType = new ArrowType.FixedSizeList(listSize); | ||
| } else { | ||
| arrowType = toArrowType(logicalType); | ||
| } | ||
| } else { | ||
| arrowType = toArrowType(logicalType); | ||
| } | ||
| FieldType fieldType = new FieldType(logicalType.isNullable(), arrowType, null); | ||
| List<Field> children = null; | ||
| if (logicalType instanceof ArrayType) { | ||
| children = | ||
| Collections.singletonList( | ||
| toArrowField("element", ((ArrayType) logicalType).getElementType())); | ||
| toArrowField( | ||
| "element", | ||
| ((ArrayType) logicalType).getElementType(), | ||
| tableProperties)); | ||
| } | ||
| return new Field(fieldName, fieldType, children); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ | |
| import org.apache.fluss.record.GenericRecord; | ||
| import org.apache.fluss.record.LogRecord; | ||
| import org.apache.fluss.row.BinaryString; | ||
| import org.apache.fluss.row.GenericArray; | ||
| import org.apache.fluss.row.GenericRow; | ||
| import org.apache.fluss.types.DataTypes; | ||
| import org.apache.fluss.utils.types.Tuple2; | ||
|
|
@@ -46,7 +47,9 @@ | |
| import org.apache.arrow.memory.RootAllocator; | ||
| import org.apache.arrow.vector.VarCharVector; | ||
| import org.apache.arrow.vector.VectorSchemaRoot; | ||
| import org.apache.arrow.vector.complex.FixedSizeListVector; | ||
| import org.apache.arrow.vector.ipc.ArrowReader; | ||
| import org.apache.arrow.vector.types.pojo.ArrowType; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
|
|
@@ -69,6 +72,8 @@ | |
|
|
||
| /** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */ | ||
| class LanceTieringTest { | ||
| private static final int EMBEDDING_LIST_SIZE = 4; | ||
|
|
||
| private @TempDir File tempWarehouseDir; | ||
| private LanceLakeTieringFactory lanceLakeTieringFactory; | ||
| private Configuration configuration; | ||
|
|
@@ -91,13 +96,16 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { | |
| TablePath tablePath = TablePath.of("lance", "logTable"); | ||
| Map<String, String> customProperties = new HashMap<>(); | ||
| customProperties.put("lance.batch_size", "256"); | ||
| customProperties.put( | ||
| "embedding" + LanceArrowUtils.FIXED_SIZE_LIST_SIZE_SUFFIX, | ||
| String.valueOf(EMBEDDING_LIST_SIZE)); | ||
| LanceConfig config = | ||
| LanceConfig.from( | ||
| configuration.toMap(), | ||
| customProperties, | ||
| tablePath.getDatabaseName(), | ||
| tablePath.getTableName()); | ||
| Schema schema = createTable(config); | ||
| Schema schema = createTable(config, customProperties); | ||
|
|
||
| TableDescriptor descriptor = | ||
| TableDescriptor.builder() | ||
|
|
@@ -180,6 +188,13 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { | |
| new RootAllocator(), | ||
| config.getDatasetUri(), | ||
| LanceConfig.genReadOptionFromConfig(config))) { | ||
| // verify the embedding column uses FixedSizeList in the Lance schema | ||
| org.apache.arrow.vector.types.pojo.Field embeddingField = | ||
| dataset.getSchema().findField("embedding"); | ||
| assertThat(embeddingField.getType()).isInstanceOf(ArrowType.FixedSizeList.class); | ||
| assertThat(((ArrowType.FixedSizeList) embeddingField.getType()).getListSize()) | ||
| .isEqualTo(EMBEDDING_LIST_SIZE); | ||
|
|
||
| ArrowReader reader = dataset.newScan().scanBatches(); | ||
| VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); | ||
|
|
||
|
|
@@ -189,8 +204,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { | |
| reader.loadNextBatch(); | ||
| Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); | ||
| List<LogRecord> expectRecords = recordsByBucket.get(partitionBucket); | ||
| verifyLogTableRecords( | ||
| readerRoot, expectRecords, bucket, isPartitioned, partition); | ||
| verifyLogTableRecords(readerRoot, expectRecords); | ||
| } | ||
| } | ||
| assertThat(reader.loadNextBatch()).isFalse(); | ||
|
|
@@ -216,14 +230,13 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception { | |
| } | ||
| } | ||
|
|
||
| private void verifyLogTableRecords( | ||
| VectorSchemaRoot root, | ||
| List<LogRecord> expectRecords, | ||
| int expectBucket, | ||
| boolean isPartitioned, | ||
| @Nullable String partition) | ||
| throws Exception { | ||
| private void verifyLogTableRecords(VectorSchemaRoot root, List<LogRecord> expectRecords) { | ||
| assertThat(root.getRowCount()).isEqualTo(expectRecords.size()); | ||
|
|
||
| // verify the embedding vector is a FixedSizeListVector | ||
| assertThat(root.getVector("embedding")).isInstanceOf(FixedSizeListVector.class); | ||
| FixedSizeListVector embeddingVector = (FixedSizeListVector) root.getVector("embedding"); | ||
|
|
||
| for (int i = 0; i < expectRecords.size(); i++) { | ||
| LogRecord expectRecord = expectRecords.get(i); | ||
| // check business columns: | ||
|
|
@@ -233,6 +246,13 @@ private void verifyLogTableRecords( | |
| .isEqualTo(expectRecord.getRow().getString(1).toString()); | ||
| assertThat(((VarCharVector) root.getVector(2)).getObject(i).toString()) | ||
| .isEqualTo(expectRecord.getRow().getString(2).toString()); | ||
| // check embedding column | ||
| java.util.List<?> embeddingValues = embeddingVector.getObject(i); | ||
| assertThat(embeddingValues).hasSize(EMBEDDING_LIST_SIZE); | ||
| org.apache.fluss.row.InternalArray expectedArray = expectRecord.getRow().getArray(3); | ||
| for (int j = 0; j < EMBEDDING_LIST_SIZE; j++) { | ||
| assertThat((Float) embeddingValues.get(j)).isEqualTo(expectedArray.getFloat(j)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing null value scenario in test data? |
||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -296,19 +316,21 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords( | |
| List<LogRecord> logRecords = new ArrayList<>(); | ||
| for (int i = 0; i < numRecords; i++) { | ||
| GenericRow genericRow; | ||
| if (partition != null) { | ||
| // Partitioned table: include partition field in data | ||
| genericRow = new GenericRow(3); // c1, c2, c3(partition) | ||
| genericRow.setField(0, i); | ||
| genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); | ||
| genericRow.setField(2, BinaryString.fromString(partition)); // partition field | ||
| } else { | ||
| // Non-partitioned table | ||
| genericRow = new GenericRow(3); | ||
| genericRow.setField(0, i); | ||
| genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); | ||
|
|
||
| // Partitioned table: include partition field in data | ||
| genericRow = new GenericRow(4); // c1, c2, c3(partition), embedding | ||
| genericRow.setField(0, i); | ||
| genericRow.setField(1, BinaryString.fromString("bucket" + bucket + "_" + i)); | ||
|
|
||
| if (partition == null) { | ||
| genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); | ||
| } else { | ||
| genericRow.setField(2, BinaryString.fromString(partition)); | ||
| } | ||
|
|
||
| genericRow.setField( | ||
| 3, new GenericArray(new float[] {0.1f * i, 0.2f * i, 0.3f * i, 0.4f * i})); | ||
|
|
||
| LogRecord logRecord = | ||
| new GenericRecord( | ||
| i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); | ||
|
|
@@ -317,16 +339,19 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords( | |
| return Tuple2.of(logRecords, logRecords); | ||
| } | ||
|
|
||
| private Schema createTable(LanceConfig config) { | ||
| private Schema createTable(LanceConfig config, Map<String, String> customProperties) { | ||
| List<Schema.Column> columns = new ArrayList<>(); | ||
| columns.add(new Schema.Column("c1", DataTypes.INT())); | ||
| columns.add(new Schema.Column("c2", DataTypes.STRING())); | ||
| columns.add(new Schema.Column("c3", DataTypes.STRING())); | ||
| columns.add(new Schema.Column("embedding", DataTypes.ARRAY(DataTypes.FLOAT()))); | ||
| Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns); | ||
| Schema schema = schemaBuilder.build(); | ||
| WriteParams params = LanceConfig.genWriteParamsFromConfig(config); | ||
| LanceDatasetAdapter.createDataset( | ||
| config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); | ||
| config.getDatasetUri(), | ||
| LanceArrowUtils.toArrowSchema(schema.getRowType(), customProperties), | ||
| params); | ||
|
|
||
| return schema; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to confirm the return type of
getCustomProperties(). If it returns an immutable Map or a Properties type rather thanMap<String, String>, willtableProperties.get(fieldName + FIXED_SIZE_LIST_SIZE_SUFFIX)in toArrowField work correctly? Suggest usingMap<String, String>in the new method signature and ensuring compatibility at the call site.