Skip to content

feat(connectors): restart connector with new config without runtime restart#2781

Open
seokjin0414 wants to merge 19 commits intoapache:masterfrom
seokjin0414:2417-connector-restart-with-new-config
Open

feat(connectors): restart connector with new config without runtime restart#2781
seokjin0414 wants to merge 19 commits intoapache:masterfrom
seokjin0414:2417-connector-restart-with-new-config

Conversation

@seokjin0414
Copy link
Contributor

Summary

Closes #2417.

  • Add POST /sinks/{key}/restart and POST /sources/{key}/restart API endpoints
  • Implement stop_connector(), start_connector(), restart_connector() on SinkManager and SourceManager
  • Add tokio::sync::watch shutdown channel to sink consume loop for graceful cancellation
  • Store Arc<Container> and task handles in Manager for lifecycle control
  • Extend RuntimeContext with iggy_clients and state_path for restart resource access
  • Replace direct shutdown in main.rs with Manager-based stop_connector() calls

Design

  • Sink restart: shutdown signal via watch channel → await task handles (5s timeout) → FFI iggy_sink_close → FFI iggy_sink_open with new config → spawn new consume tasks
  • Source restart: cleanup flume sender → await forwarding tasks (5s timeout) → FFI iggy_source_close → load state → FFI iggy_source_open with new config → spawn new handler
  • Container (shared library) is NOT reloaded — only closed/reopened with new config and plugin ID

@codecov
Copy link

codecov bot commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 81.50745% with 211 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.89%. Comparing base (7ebce1b) to head (48ae540).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/runtime/src/manager/source.rs 72.75% 89 Missing and 2 partials ⚠️
core/connectors/runtime/src/source.rs 74.88% 43 Missing and 10 partials ⚠️
core/connectors/runtime/src/manager/sink.rs 94.04% 10 Missing and 10 partials ⚠️
core/connectors/runtime/src/sink.rs 87.05% 11 Missing and 7 partials ⚠️
core/connectors/runtime/src/api/source.rs 5.55% 17 Missing ⚠️
core/connectors/runtime/src/main.rs 85.33% 8 Missing and 3 partials ⚠️
core/connectors/runtime/src/api/sink.rs 94.11% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2781      +/-   ##
============================================
+ Coverage     67.65%   67.89%   +0.24%     
  Complexity      708      708              
============================================
  Files          1030     1030              
  Lines         83846    84700     +854     
  Branches      60704    61558     +854     
============================================
+ Hits          56722    57509     +787     
- Misses        24771    24823      +52     
- Partials       2353     2368      +15     
Flag Coverage Δ
rust 70.01% <81.50%> (+0.31%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/connectors/runtime/src/context.rs 89.47% <100.00%> (+1.37%) ⬆️
core/connectors/runtime/src/api/sink.rs 56.14% <94.11%> (+10.26%) ⬆️
core/connectors/runtime/src/main.rs 84.46% <85.33%> (-1.64%) ⬇️
core/connectors/runtime/src/api/source.rs 42.10% <5.55%> (-3.83%) ⬇️
core/connectors/runtime/src/sink.rs 74.34% <87.05%> (+5.62%) ⬆️
core/connectors/runtime/src/manager/sink.rs 94.11% <94.04%> (+30.48%) ⬆️
core/connectors/runtime/src/source.rs 69.49% <74.88%> (+3.19%) ⬆️
core/connectors/runtime/src/manager/source.rs 75.83% <72.75%> (+10.38%) ⬆️

... and 13 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@mmodzelewski mmodzelewski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seokjin0414 Thanks for the PR. I left 2 comments. As the logic for sinks/sources is similar, they apply to both equally.
Also, would you be able to write an integration test under core/integration/connectors, at least one case where we can see that the connector is using the updated config? I think with the new test harness, it should be straightforward.

Extend RuntimeContext with Arc<IggyClients> and state_path fields
to enable individual connector restart without full runtime restart.

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…etails

Add shutdown_tx (watch::Sender) and task_handles/handler_tasks
(Vec<JoinHandle>) to SinkDetails and SourceDetails for per-connector
lifecycle management during restart.

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Add watch::Receiver to consume_messages() with tokio::select! for
cooperative cancellation. Store shutdown_tx and task JoinHandles in
SinkDetails for per-connector lifecycle management.

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…port

- Wrap SinkApi/SourceApi containers in Arc for shared ownership
- Store container references in SinkDetails/SourceDetails via Manager
- Add manual Debug impl for Details structs (dlopen2 Container lacks Debug)
- Make SinkApi/SourceApi pub(crate) for cross-module access

Signed-off-by: shin <sars21@hanmail.net>
- SinkManager.stop_connector: shutdown signal → await tasks → FFI close
- SinkManager.start_connector: FFI open → create consumers → spawn tasks
- SinkManager.restart_connector: stop → load active config → start
- Make init_sink and consume_messages pub(crate) for reuse
- Add setup_sink_consumers helper for consumer creation

Signed-off-by: shin <sars21@hanmail.net>
… connectors

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
- source::handle() now stores handler_tasks in SourceManager via async spawn
- main.rs shutdown uses Manager's stop_connector() for both sources and sinks
- Remove SinkWithPlugins and SourceWithPlugins structs (no longer needed)
- Remove unused shutdown_tx field from SourceDetails

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Cover CRUD operations, status transitions, metrics integration,
shutdown signal handling, error clearing, and edge cases for
the connector restart lifecycle methods.

Signed-off-by: shin <sars21@hanmail.net>
…torage

Replace detached tokio::spawn for storing shutdown handles with
synchronous return from consume() and handle(), preventing race
where restart API could be called before handles are stored.

Also deduplicate ~145 lines in source::handle() by reusing
source_forwarding_loop().

Signed-off-by: shin <sars21@hanmail.net>
…er is None

Move producer check before spawn_blocking and SOURCE_SENDERS insert
to match start_connector() pattern.

Signed-off-by: shin <sars21@hanmail.net>
…d SourceDetails

Signed-off-by: shin <sars21@hanmail.net>
…k_consumers in init

Signed-off-by: shin <sars21@hanmail.net>
…urce_producer in init

Signed-off-by: shin <sars21@hanmail.net>
@seokjin0414 seokjin0414 force-pushed the 2417-connector-restart-with-new-config branch from 04d6714 to 274d03a Compare March 1, 2026 12:42
…shutdown safety

- Add RuntimeContext to spawn_consume_tasks for error state reporting
- Track spawn_blocking JoinHandle in spawn_source_handler to prevent zombie tasks
- Acquire restart_guard in shutdown path to prevent race with concurrent restarts
- Restore transform type info logs in setup_sink_consumers and setup_source_producer

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
- Remove #[allow(dead_code)] from set_error() now that it's used
- Add source error state reporting when forwarding loop terminates
- Add key parameter to setup_source_producer() for consistent logging
- Extract stop_connector_with_guard() to deduplicate shutdown guard pattern

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…e race

- Remove unconditional set_error() after source forwarding loop exits,
  as the loop already handles error/stopped status internally
- Move API server init after handle storage to eliminate race window
  where restart API could arrive before handles are stored

Signed-off-by: seokjin0414 <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
@seokjin0414
Copy link
Contributor Author

@mmodzelewski
Thanks for the thorough review!
Added restart integration test using Postgres sink fixture in core/integration/tests/connectors/postgres/restart.rs
It verifies:
send messages → confirm in DB → restart via API → send more messages → confirm all in DB
(2ddb9f9)

@seokjin0414 seokjin0414 requested a review from mmodzelewski March 1, 2026 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Connectors Runtime - restart connector with new configuration without restarting the whole runtime

2 participants