Skip to content

feat(connectors): Delta Lake Sink Connector#2783

Open
kriti-sc wants to merge 19 commits intoapache:masterfrom
kriti-sc:delta_sink
Open

feat(connectors): Delta Lake Sink Connector#2783
kriti-sc wants to merge 19 commits intoapache:masterfrom
kriti-sc:delta_sink

Conversation

@kriti-sc
Copy link
Contributor

@kriti-sc kriti-sc commented Feb 19, 2026

Which issue does this PR close?

Closes #1852

Rationale

Delta Lake is a data analytics engine, and very popular in modern streaming analytics architectures.

What changed?

Introduces a Delta Lake Sink Connector that enables writing data from Iggy to Delta Lake.

The Delta Lake writing logic is heavily inspired by the kafka-delta-ingest project, to have a proven starting ground for writing to Delta Lake.

Local Execution

  1. Produced 32632 messages with schema user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime<Utc>, message: String using sample data producer.
  2. Consumed messages using the Delta Lake sink and created a Delta table on filesystem.
  3. Verified number of rows in delta table and the schema.
  4. Added unit tests and e2e tests, both passing.
image Left: messages produced; Right(top): messages consumed by Delta sink; Right(bottom): Inspecting Delta table in python

AI Usage

If AI tools were used, please answer:

  1. Which tools? Claude Code
  2. Scope of usage? generated functions
  3. How did you verify the generated code works correctly? Manual testing by producing data into Iggy and then running the sink and verifying local Delta Lake creation, unit tests and e2e tests for local Delta Lake and Delta Lake on S3.
  4. Can you explain every line of the code if asked? Yes

kriti-sc and others added 13 commits February 18, 2026 13:51
The plugin exports a symbol called "open", which conflicts with the POSIX "open()" call to read a file. So when delta sink calls POSIX open() to read a file, the linker resolves it to the plugin's open instead of libc's open.

We rename the symbols plugin exports to iggy_sink_open. Now when object_store calls POSIX open(), there's no local symbol called open to collide with, so it correctly resolves to libc's open().
@codecov
Copy link

codecov bot commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 95.72271% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.02%. Comparing base (b7818db) to head (dcb24c4).
⚠️ Report is 23 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/sinks/delta_sink/src/sink.rs 79.48% 24 Missing ⚠️
core/connectors/sinks/delta_sink/src/coercions.rs 98.59% 5 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2783      +/-   ##
============================================
+ Coverage     68.30%   69.02%   +0.71%     
- Complexity      656      739      +83     
============================================
  Files           741      760      +19     
  Lines         62210    64514    +2304     
  Branches      58623    60677    +2054     
============================================
+ Hits          42495    44532    +2037     
- Misses        17601    17857     +256     
- Partials       2114     2125      +11     
Flag Coverage Δ
java 54.83% <ø> (+2.64%) ⬆️
python 0.00% <ø> (?)
rust 69.99% <95.72%> (+0.61%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/sinks/delta_sink/src/lib.rs 100.00% <100.00%> (ø)
core/connectors/sinks/delta_sink/src/storage.rs 100.00% <100.00%> (ø)
core/connectors/sinks/delta_sink/src/coercions.rs 98.59% <98.59%> (ø)
core/connectors/sinks/delta_sink/src/sink.rs 79.48% <79.48%> (ø)

... and 64 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@kriti-sc kriti-sc marked this pull request as ready for review February 20, 2026 08:36
Comment on lines 57 to 61
Err(_) if !self.config.schema.is_empty() => {
info!("Table does not exist, creating from configured schema...");
create_table(&self.config.table_uri, storage_options, &self.config.schema)
.await?
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err(_) if !self.config.schema.is_empty() catches network failures, auth errors, corrupted metadata anything.... and proceeds to create a new table. with wrong S3 credentials + schema configured, it will attempt to create a new empty table, diverging from the real one. Must match on the specific "not found" error variant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, now matching on deltalake::DeltaTableError::NotATable(_)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up removing automatic table creation because we are not doing it in any of the other sinks. Looked around and best practices dictate that table creation should be explicit and left to the user for governance reasons.

Comment on lines 114 to 123
let json_bytes = simd_json::to_vec(simd_value).map_err(|e| {
error!("Failed to serialize JSON payload: {e}");
Error::InvalidJsonPayload
})?;
let value: serde_json::Value =
serde_json::from_slice(&json_bytes).map_err(|e| {
error!("Failed to parse JSON payload: {e}");
Error::InvalidJsonPayload
})?;
json_values.push(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double allocation. the elasticsearch_sink already has a direct owned_value_to_serde_json converter for this reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this change. would it make sense to create a module for common utilities?

Comment on lines 150 to 164
// Write JSON values to internal Parquet buffers
state.writer.write(json_values).await.map_err(|e| {
error!("Failed to write to Delta writer: {e}");
Error::Storage(format!("Failed to write to Delta writer: {e}"))
})?;

// Flush buffers to object store and commit to Delta log
let version = state
.writer
.flush_and_commit(&mut state.table)
.await
.map_err(|e| {
error!("Failed to flush and commit to Delta table: {e}");
Error::Storage(format!("Failed to flush and commit: {e}"))
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when flush_and_commit fails, data is already in the writer's buffer. The runtime exits the consumer task on error (core/connectors/runtime/src/sink.rs:322-329), but the consumer group has already committed the offset. Those messages are permanently lost. Additionally, the writer buffer is left in an undefined state -- no reset() call to clear it.

Copy link
Contributor Author

@kriti-sc kriti-sc Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two parts to the problem if flush_and_commit fails:

  1. losing data because offset has been commit -- leaving this gap for now, notes below
  2. undefined state of the writer buffer -- fixed this with reset

notes:

  • the idea solution is AutoCommitAfter::ConsumingAllMessages but that is out of scope of this PR
  • the ideal solution also adds a DLQ to hold failed payloads. had been planning to propose this as an improvement in the runtime separtely.
  • a stop gap is retry based on cause of failure. intending to implement a retry logic across all sinks in another PR.

Comment on lines 157 to 159
let version = state
.writer
.flush_and_commit(&mut state.table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with poll_interval = "5ms" and batch_length = 100, this produces thousands of tiny Parquet files per second. Delta Lake is not designed for this write frequency. Should support configurable flush thresholds (size/time/count).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - 5ms and 100 are not the best config for Delta sink.
These are configurable though, in the config.toml.

Separately, agree on having a size flush threshold too. These configurations are used by runtime to configure the Iggy consume that runs this sink. Right now, it supports only count aka batch_length and time aka poll_interval only.

@kriti-sc kriti-sc requested a review from hubcio March 1, 2026 12:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Delta Lake connectors

2 participants