feat(connectors): Delta Lake Sink Connector#2783
feat(connectors): Delta Lake Sink Connector#2783kriti-sc wants to merge 19 commits intoapache:masterfrom
Conversation
The plugin exports a symbol called "open", which conflicts with the POSIX "open()" call to read a file. So when delta sink calls POSIX open() to read a file, the linker resolves it to the plugin's open instead of libc's open. We rename the symbols plugin exports to iggy_sink_open. Now when object_store calls POSIX open(), there's no local symbol called open to collide with, so it correctly resolves to libc's open().
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2783 +/- ##
============================================
+ Coverage 68.30% 69.02% +0.71%
- Complexity 656 739 +83
============================================
Files 741 760 +19
Lines 62210 64514 +2304
Branches 58623 60677 +2054
============================================
+ Hits 42495 44532 +2037
- Misses 17601 17857 +256
- Partials 2114 2125 +11
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| Err(_) if !self.config.schema.is_empty() => { | ||
| info!("Table does not exist, creating from configured schema..."); | ||
| create_table(&self.config.table_uri, storage_options, &self.config.schema) | ||
| .await? | ||
| } |
There was a problem hiding this comment.
Err(_) if !self.config.schema.is_empty() catches network failures, auth errors, corrupted metadata anything.... and proceeds to create a new table. with wrong S3 credentials + schema configured, it will attempt to create a new empty table, diverging from the real one. Must match on the specific "not found" error variant
There was a problem hiding this comment.
Fixed, now matching on deltalake::DeltaTableError::NotATable(_)
There was a problem hiding this comment.
Ended up removing automatic table creation because we are not doing it in any of the other sinks. Looked around and best practices dictate that table creation should be explicit and left to the user for governance reasons.
| let json_bytes = simd_json::to_vec(simd_value).map_err(|e| { | ||
| error!("Failed to serialize JSON payload: {e}"); | ||
| Error::InvalidJsonPayload | ||
| })?; | ||
| let value: serde_json::Value = | ||
| serde_json::from_slice(&json_bytes).map_err(|e| { | ||
| error!("Failed to parse JSON payload: {e}"); | ||
| Error::InvalidJsonPayload | ||
| })?; | ||
| json_values.push(value); |
There was a problem hiding this comment.
double allocation. the elasticsearch_sink already has a direct owned_value_to_serde_json converter for this reason.
There was a problem hiding this comment.
made this change. would it make sense to create a module for common utilities?
| // Write JSON values to internal Parquet buffers | ||
| state.writer.write(json_values).await.map_err(|e| { | ||
| error!("Failed to write to Delta writer: {e}"); | ||
| Error::Storage(format!("Failed to write to Delta writer: {e}")) | ||
| })?; | ||
|
|
||
| // Flush buffers to object store and commit to Delta log | ||
| let version = state | ||
| .writer | ||
| .flush_and_commit(&mut state.table) | ||
| .await | ||
| .map_err(|e| { | ||
| error!("Failed to flush and commit to Delta table: {e}"); | ||
| Error::Storage(format!("Failed to flush and commit: {e}")) | ||
| })?; |
There was a problem hiding this comment.
when flush_and_commit fails, data is already in the writer's buffer. The runtime exits the consumer task on error (core/connectors/runtime/src/sink.rs:322-329), but the consumer group has already committed the offset. Those messages are permanently lost. Additionally, the writer buffer is left in an undefined state -- no reset() call to clear it.
There was a problem hiding this comment.
two parts to the problem if flush_and_commit fails:
- losing data because offset has been commit -- leaving this gap for now, notes below
- undefined state of the writer buffer -- fixed this with
reset
notes:
- the idea solution is
AutoCommitAfter::ConsumingAllMessagesbut that is out of scope of this PR - the ideal solution also adds a DLQ to hold failed payloads. had been planning to propose this as an improvement in the runtime separtely.
- a stop gap is retry based on cause of failure. intending to implement a retry logic across all sinks in another PR.
| let version = state | ||
| .writer | ||
| .flush_and_commit(&mut state.table) |
There was a problem hiding this comment.
with poll_interval = "5ms" and batch_length = 100, this produces thousands of tiny Parquet files per second. Delta Lake is not designed for this write frequency. Should support configurable flush thresholds (size/time/count).
There was a problem hiding this comment.
Agreed - 5ms and 100 are not the best config for Delta sink.
These are configurable though, in the config.toml.
Separately, agree on having a size flush threshold too. These configurations are used by runtime to configure the Iggy consume that runs this sink. Right now, it supports only count aka batch_length and time aka poll_interval only.
Which issue does this PR close?
Closes #1852
Rationale
Delta Lake is a data analytics engine, and very popular in modern streaming analytics architectures.
What changed?
Introduces a Delta Lake Sink Connector that enables writing data from Iggy to Delta Lake.
The Delta Lake writing logic is heavily inspired by the kafka-delta-ingest project, to have a proven starting ground for writing to Delta Lake.
Local Execution
user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime<Utc>, message: Stringusing sample data producer.AI Usage
If AI tools were used, please answer: