feat(connectors): add MongoDB sink connector#2815
feat(connectors): add MongoDB sink connector#2815amuldotexe wants to merge 1 commit intoapache:masterfrom
Conversation
|
Docker proof of working (sink-only demo artifact, outside PR code scope):
Quick pull: docker pull ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16 |
05fbec1 to
e2cf5fe
Compare
| readme = "../../README.md" | ||
|
|
||
| [package.metadata.cargo-machete] | ||
| ignored = ["dashmap", "once_cell", "futures", "simd-json"] |
There was a problem hiding this comment.
If these dependencies are not needed then they should be removed instead of being suppressed at cargo-machete level.
| if consume_result != 0 { | ||
| return Err(RuntimeError::ConnectorSdkError( | ||
| iggy_connector_sdk::Error::CannotStoreData(format!( | ||
| "Sink consume callback returned non-zero status ({consume_result}) for plugin ID: {plugin_id}" | ||
| )), | ||
| )); | ||
| } | ||
|
|
There was a problem hiding this comment.
Previously the errors are silently dropped but this change now could cause some connectors to crash.
This change is better converted into a separate PR that handles the consequences properly. Because it effects other connectors' behavior.
| pub id: u32, | ||
| client: Option<Client>, | ||
| config: MongoDbSinkConfig, | ||
| state: Mutex<State>, |
There was a problem hiding this comment.
Mutex is not necessary here. Since State contains only two u64 members, we can convert them into AtomicU64.
| match s.map(|s| s.to_lowercase()).as_deref() { | ||
| Some("json") => PayloadFormat::Json, | ||
| Some("string") | Some("text") => PayloadFormat::String, | ||
| _ => PayloadFormat::Binary, |
There was a problem hiding this comment.
Its better to throw a warning here that an unrecognized format has been passed in config.
| doc.insert("_id", message.id.to_string()); | ||
|
|
||
| if include_metadata { | ||
| doc.insert("iggy_offset", message.offset as i64); |
There was a problem hiding this comment.
message.offset is u64. Direct casting to i64 could cause wrapping to negative value when the offset > i64::MAX
| doc.insert("iggy_timestamp", bson_timestamp); | ||
| doc.insert("iggy_stream", &topic_metadata.stream); | ||
| doc.insert("iggy_topic", &topic_metadata.topic); | ||
| doc.insert("iggy_partition_id", messages_metadata.partition_id as i32); |
There was a problem hiding this comment.
Same comment as message.offset
| let mut attempts = 0u32; | ||
|
|
||
| loop { | ||
| let result = collection.insert_many(docs.to_vec()).await; |
There was a problem hiding this comment.
When insert_many (with ordered=true, the default) hits a duplicate key at position 100, documents 0..99 are already committed to MongoDB. On retry, docs.to_vec() sends the entire batch again.
The already-committed documents trigger duplicate key errors, which is_transient_error classifies as non-retryable which causes retry to fail immediately with a misleading duplicate-key error instead of retrying only the remaining documents. So for partial writes this is currently behaves incorrectly.
| let include_metadata = self.config.include_metadata.unwrap_or(true); | ||
| let include_checksum = self.config.include_checksum.unwrap_or(true); | ||
| let include_origin_timestamp = self.config.include_origin_timestamp.unwrap_or(true); |
There was a problem hiding this comment.
Config is validated every batch. These can be moved to new().
krishvishal
left a comment
There was a problem hiding this comment.
@amuldotexe I've added few comments.
It seems that inserts in mongodb can be fail after partial writes. Check if other places where insert is used are effected by this and handle them accordingly.
|
Thanks @krishvishal for the detailed feedback I will work on this and get back with an updated PR |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2815 +/- ##
============================================
+ Coverage 68.47% 68.64% +0.16%
Complexity 656 656
============================================
Files 852 853 +1
Lines 68221 68622 +401
Branches 59682 60093 +411
============================================
+ Hits 46714 47103 +389
+ Misses 19177 19165 -12
- Partials 2330 2354 +24
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
Adding to @krishvishal points which already cover a lot of ground. Two more things I think are blockers here. First, there's a liveness issue with how partial failures interact with offset commits. When any batch inside Second, |
Which issue does this PR close?
Partially addresses #2739 (MongoDB sink only).
MongoDB source support will follow in a separate PR.
Rationale
We need a MongoDB sink connector with explicit failure behavior so writes are never reported as successful when they are not.
What changed?
Before this change, MongoDB sink support was missing for connector runtime users.
This PR adds the MongoDB sink connector, including insert/write logic, retry handling for transient failures, explicit duplicate-key failure behavior, metadata mapping, and delivery-semantics documentation.
It also adds sink-focused integration and unit tests to validate payload formats, batch behavior, auto-create collection behavior, and non-silent failure paths.
Local Execution
Passed:
cargo fmt --all -- --checkPassed:
cargo clippy -p iggy_connector_mongodb_sink --all-targets -- -D warningsPassed:
cargo test -p iggy_connector_mongodb_sink(13 passed)Passed:
cargo test -p integration --test mod -- mongodb_sink(10 passed)Pre-commit hooks ran
Docker E2E proof image:
ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16Docker smoke result:
SMOKE_OK ... docs=3GHCR package: https://github.com/users/amuldotexe/packages/container/package/iggy-mongodb-sink-demo
Key sink tests:
duplicate_key_is_explicit_failure_and_not_silent_successordered_duplicate_partial_insert_has_exact_accountingschema_validation_mid_batch_surfaces_hard_error_and_partial_prefixwrite_concern_timeout_does_not_report_full_successretryable_write_failover_keeps_single_doc_per_idno_writes_performed_label_path_preserves_state_accuracyjson_messages_sink_to_mongodbbinary_messages_sink_as_bson_binarylarge_batch_processed_correctlyauto_create_collection_on_opengiven_no_client_should_return_error_not_silent_okAI Usage
Codex, Claude Code, Rust Rover
PRD, connector precedent analysis, TDD, implementation, and PR prep
Ran all local format, clippy, unit, and integration checks listed above
Yes