Skip to content

feat(connectors): add MongoDB sink connector#2815

Open
amuldotexe wants to merge 1 commit intoapache:masterfrom
amuldotexe:codex/2739-sink-sync
Open

feat(connectors): add MongoDB sink connector#2815
amuldotexe wants to merge 1 commit intoapache:masterfrom
amuldotexe:codex/2739-sink-sync

Conversation

@amuldotexe
Copy link

Which issue does this PR close?

Partially addresses #2739 (MongoDB sink only).
MongoDB source support will follow in a separate PR.

Rationale

We need a MongoDB sink connector with explicit failure behavior so writes are never reported as successful when they are not.

What changed?

Before this change, MongoDB sink support was missing for connector runtime users.
This PR adds the MongoDB sink connector, including insert/write logic, retry handling for transient failures, explicit duplicate-key failure behavior, metadata mapping, and delivery-semantics documentation.
It also adds sink-focused integration and unit tests to validate payload formats, batch behavior, auto-create collection behavior, and non-silent failure paths.

Local Execution

  • Passed: cargo fmt --all -- --check

  • Passed: cargo clippy -p iggy_connector_mongodb_sink --all-targets -- -D warnings

  • Passed: cargo test -p iggy_connector_mongodb_sink (13 passed)

  • Passed: cargo test -p integration --test mod -- mongodb_sink (10 passed)

  • Pre-commit hooks ran

  • Docker E2E proof image: ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

  • Docker smoke result: SMOKE_OK ... docs=3

  • GHCR package: https://github.com/users/amuldotexe/packages/container/package/iggy-mongodb-sink-demo

  • Key sink tests:

  • duplicate_key_is_explicit_failure_and_not_silent_success

  • ordered_duplicate_partial_insert_has_exact_accounting

  • schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix

  • write_concern_timeout_does_not_report_full_success

  • retryable_write_failover_keeps_single_doc_per_id

  • no_writes_performed_label_path_preserves_state_accuracy

  • json_messages_sink_to_mongodb

  • binary_messages_sink_as_bson_binary

  • large_batch_processed_correctly

  • auto_create_collection_on_open

  • given_no_client_should_return_error_not_silent_ok

AI Usage

  1. Which tools?
    Codex, Claude Code, Rust Rover
  2. Scope of usage?
    PRD, connector precedent analysis, TDD, implementation, and PR prep
  3. How did you verify the generated code works correctly?
    Ran all local format, clippy, unit, and integration checks listed above
  4. Can you explain every line of the code if asked?
    Yes

@amuldotexe
Copy link
Author

amuldotexe commented Feb 25, 2026

Docker proof of working (sink-only demo artifact, outside PR code scope):

Quick pull:

docker pull ghcr.io/amuldotexe/iggy-mongodb-sink-demo:issue-2739-05fbec16

readme = "../../README.md"

[package.metadata.cargo-machete]
ignored = ["dashmap", "once_cell", "futures", "simd-json"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these dependencies are not needed then they should be removed instead of being suppressed at cargo-machete level.

Comment on lines +534 to +541
if consume_result != 0 {
return Err(RuntimeError::ConnectorSdkError(
iggy_connector_sdk::Error::CannotStoreData(format!(
"Sink consume callback returned non-zero status ({consume_result}) for plugin ID: {plugin_id}"
)),
));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the errors are silently dropped but this change now could cause some connectors to crash.

This change is better converted into a separate PR that handles the consequences properly. Because it effects other connectors' behavior.

pub id: u32,
client: Option<Client>,
config: MongoDbSinkConfig,
state: Mutex<State>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex is not necessary here. Since State contains only two u64 members, we can convert them into AtomicU64.

match s.map(|s| s.to_lowercase()).as_deref() {
Some("json") => PayloadFormat::Json,
Some("string") | Some("text") => PayloadFormat::String,
_ => PayloadFormat::Binary,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to throw a warning here that an unrecognized format has been passed in config.

doc.insert("_id", message.id.to_string());

if include_metadata {
doc.insert("iggy_offset", message.offset as i64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.offset is u64. Direct casting to i64 could cause wrapping to negative value when the offset > i64::MAX

doc.insert("iggy_timestamp", bson_timestamp);
doc.insert("iggy_stream", &topic_metadata.stream);
doc.insert("iggy_topic", &topic_metadata.topic);
doc.insert("iggy_partition_id", messages_metadata.partition_id as i32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as message.offset

let mut attempts = 0u32;

loop {
let result = collection.insert_many(docs.to_vec()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When insert_many (with ordered=true, the default) hits a duplicate key at position 100, documents 0..99 are already committed to MongoDB. On retry, docs.to_vec() sends the entire batch again.

The already-committed documents trigger duplicate key errors, which is_transient_error classifies as non-retryable which causes retry to fail immediately with a misleading duplicate-key error instead of retrying only the remaining documents. So for partial writes this is currently behaves incorrectly.

Comment on lines +283 to +285
let include_metadata = self.config.include_metadata.unwrap_or(true);
let include_checksum = self.config.include_checksum.unwrap_or(true);
let include_origin_timestamp = self.config.include_origin_timestamp.unwrap_or(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config is validated every batch. These can be moved to new().

Copy link
Contributor

@krishvishal krishvishal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amuldotexe I've added few comments.

It seems that inserts in mongodb can be fail after partial writes. Check if other places where insert is used are effected by this and handle them accordingly.

@amuldotexe
Copy link
Author

Thanks @krishvishal for the detailed feedback

I will work on this and get back with an updated PR

@codecov
Copy link

codecov bot commented Feb 26, 2026

Codecov Report

❌ Patch coverage is 87.59305% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.64%. Comparing base (cca86fe) to head (e2cf5fe).
⚠️ Report is 10 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/sinks/mongodb_sink/src/lib.rs 87.34% 34 Missing and 16 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2815      +/-   ##
============================================
+ Coverage     68.47%   68.64%   +0.16%     
  Complexity      656      656              
============================================
  Files           852      853       +1     
  Lines         68221    68622     +401     
  Branches      59682    60093     +411     
============================================
+ Hits          46714    47103     +389     
+ Misses        19177    19165      -12     
- Partials       2330     2354      +24     
Flag Coverage Δ
csharp 67.06% <ø> (-0.19%) ⬇️
java 52.19% <ø> (ø)
rust 69.84% <87.59%> (+0.20%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/runtime/src/sink.rs 73.62% <100.00%> (+4.35%) ⬆️
core/connectors/sinks/mongodb_sink/src/lib.rs 87.34% <87.34%> (ø)

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@atharvalade
Copy link
Contributor

Adding to @krishvishal points which already cover a lot of ground. Two more things I think are blockers here.

First, there's a liveness issue with how partial failures interact with offset commits. When any batch inside process_messages fails, the whole call returns Err, so the consumer offset never advances. But ordered insert_many already committed the documents before the failure point. On the next poll the upstream re-sends everything, those already committed docs hit duplicate _id errors, which are non-transient so retries fail immediately, offset still doesn't commit, and you're stuck in a permanent loop. The connector can never recover from even a single transient mid-batch failure.

Second, message.id.to_string() as the MongoDB _id isn't safe when you're consuming multiple topics into the same collection. Two unrelated messages from different topics can easily share the same id, and whichever one lands first wins while the other silently fails. The _id should be a composite of stream, topic, partition, and message id to avoid collisions and quiet data loss.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants