feat(connectors): restart connector with new config without runtime restart#2781
feat(connectors): restart connector with new config without runtime restart#2781seokjin0414 wants to merge 19 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2781 +/- ##
============================================
+ Coverage 67.65% 67.89% +0.24%
Complexity 708 708
============================================
Files 1030 1030
Lines 83846 84700 +854
Branches 60704 61558 +854
============================================
+ Hits 56722 57509 +787
- Misses 24771 24823 +52
- Partials 2353 2368 +15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
mmodzelewski
left a comment
There was a problem hiding this comment.
@seokjin0414 Thanks for the PR. I left 2 comments. As the logic for sinks/sources is similar, they apply to both equally.
Also, would you be able to write an integration test under core/integration/connectors, at least one case where we can see that the connector is using the updated config? I think with the new test harness, it should be straightforward.
Extend RuntimeContext with Arc<IggyClients> and state_path fields to enable individual connector restart without full runtime restart. Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
…etails Add shutdown_tx (watch::Sender) and task_handles/handler_tasks (Vec<JoinHandle>) to SinkDetails and SourceDetails for per-connector lifecycle management during restart. Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
Add watch::Receiver to consume_messages() with tokio::select! for cooperative cancellation. Store shutdown_tx and task JoinHandles in SinkDetails for per-connector lifecycle management. Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
…port - Wrap SinkApi/SourceApi containers in Arc for shared ownership - Store container references in SinkDetails/SourceDetails via Manager - Add manual Debug impl for Details structs (dlopen2 Container lacks Debug) - Make SinkApi/SourceApi pub(crate) for cross-module access Signed-off-by: shin <sars21@hanmail.net>
- SinkManager.stop_connector: shutdown signal → await tasks → FFI close - SinkManager.start_connector: FFI open → create consumers → spawn tasks - SinkManager.restart_connector: stop → load active config → start - Make init_sink and consume_messages pub(crate) for reuse - Add setup_sink_consumers helper for consumer creation Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
… connectors Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
- source::handle() now stores handler_tasks in SourceManager via async spawn - main.rs shutdown uses Manager's stop_connector() for both sources and sinks - Remove SinkWithPlugins and SourceWithPlugins structs (no longer needed) - Remove unused shutdown_tx field from SourceDetails Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Cover CRUD operations, status transitions, metrics integration, shutdown signal handling, error clearing, and edge cases for the connector restart lifecycle methods. Signed-off-by: shin <sars21@hanmail.net>
…torage Replace detached tokio::spawn for storing shutdown handles with synchronous return from consume() and handle(), preventing race where restart API could be called before handles are stored. Also deduplicate ~145 lines in source::handle() by reusing source_forwarding_loop(). Signed-off-by: shin <sars21@hanmail.net>
…er is None Move producer check before spawn_blocking and SOURCE_SENDERS insert to match start_connector() pattern. Signed-off-by: shin <sars21@hanmail.net>
…d SourceDetails Signed-off-by: shin <sars21@hanmail.net>
…k_consumers in init Signed-off-by: shin <sars21@hanmail.net>
…urce_producer in init Signed-off-by: shin <sars21@hanmail.net>
04d6714 to
274d03a
Compare
Signed-off-by: shin <sars21@hanmail.net>
…shutdown safety - Add RuntimeContext to spawn_consume_tasks for error state reporting - Track spawn_blocking JoinHandle in spawn_source_handler to prevent zombie tasks - Acquire restart_guard in shutdown path to prevent race with concurrent restarts - Restore transform type info logs in setup_sink_consumers and setup_source_producer Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
- Remove #[allow(dead_code)] from set_error() now that it's used - Add source error state reporting when forwarding loop terminates - Add key parameter to setup_source_producer() for consistent logging - Extract stop_connector_with_guard() to deduplicate shutdown guard pattern Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
…e race - Remove unconditional set_error() after source forwarding loop exits, as the loop already handles error/stopped status internally - Move API server init after handle storage to eliminate race window where restart API could arrive before handles are stored Signed-off-by: seokjin0414 <sars21@hanmail.net> Signed-off-by: shin <sars21@hanmail.net>
|
@mmodzelewski |
Summary
Closes #2417.
POST /sinks/{key}/restartandPOST /sources/{key}/restartAPI endpointsstop_connector(),start_connector(),restart_connector()on SinkManager and SourceManagertokio::sync::watchshutdown channel to sink consume loop for graceful cancellationArc<Container>and task handles in Manager for lifecycle controliggy_clientsandstate_pathfor restart resource accessstop_connector()callsDesign
iggy_sink_close→ FFIiggy_sink_openwith new config → spawn new consume tasksiggy_source_close→ load state → FFIiggy_source_openwith new config → spawn new handler