Skip to content

Make MergeSchedulerService optional#6266

Merged
nadav-govari merged 4 commits intonadav/feature-split-mergesfrom
nadav/pr1
Apr 6, 2026
Merged

Make MergeSchedulerService optional#6266
nadav-govari merged 4 commits intonadav/feature-split-mergesfrom
nadav/pr1

Conversation

@nadav-govari
Copy link
Copy Markdown
Collaborator

Description

This is step 1 of the plan to create a new merge service to split merging out of indexing pipelines.
In this plan, a new quickwit service called merge-planner will be introduced to handle all merges.
Similar to how the control plane is optionally started, an indexer node will try to connect to the merge planner on startup. If it does, it'll pass None to the IndexingService, which will not spawn merge pipelines anymore. If it can't connect, the IndexingService will be started up with a MergeSchedulerService, which is the signal to performs merges locally (as it's done today).

In this PR, the merge scheduler service is made optional on indexing pipelines and propagated through as needed.

How was this PR tested?

Unit tests. Nothing intrusive has been done yet.

metastore: MetastoreServiceClient,
ingest_api_service_opt: Option<Mailbox<IngestApiService>>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
merge_scheduler_service: Option<Mailbox<MergeSchedulerService>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
merge_scheduler_service: Option<Mailbox<MergeSchedulerService>>,
merge_scheduler_service_opt: Option<Mailbox<MergeSchedulerService>>,

merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
let merge_planner_mailbox = if let Some(merge_scheduler) =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let merge_planner_mailbox = if let Some(merge_scheduler) =
let merge_planner_mailbox = if let Some(merge_scheduler_service) =

// When merging is handled locally, notify the merge planner about new
// splits. The mailbox is None when an external merge service is active,
// or when the planner has already shut down (e.g. source reached its end).
if let Some(merge_planner_mailbox) = self.merge_planner_mailbox_opt.as_ref() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(merge_planner_mailbox) = self.merge_planner_mailbox_opt.as_ref() {
if let Some(merge_planner_mailbox) = &self.merge_planner_mailbox_opt {

// splits. The mailbox is None when an external merge service is active,
// or when the planner has already shut down (e.g. source reached its end).
if let Some(merge_planner_mailbox) = self.merge_planner_mailbox_opt.as_ref() {
let _ = ctx
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code but I'd like the see an error! log statement error when we can't send the message to the merge planner.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@guilload
Copy link
Copy Markdown
Member

guilload commented Apr 6, 2026

Let's make sure you spin this up locally and merges are running.

@nadav-govari
Copy link
Copy Markdown
Collaborator Author

Verified on a local quickwit instance that merges start up and work correctly.

@nadav-govari nadav-govari merged commit 7a7207b into nadav/feature-split-merges Apr 6, 2026
1 check passed
@nadav-govari nadav-govari deleted the nadav/pr1 branch April 6, 2026 17:38
Comment on lines +217 to +225
match ctx
.send_message(merge_planner_mailbox, NewSplits { new_splits })
.await;
.await
{
Ok(_) => {}
Err(error) => {
error!(error=?error, "failed to send new splits to merge planner");
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match ctx
.send_message(merge_planner_mailbox, NewSplits { new_splits })
.await;
.await
{
Ok(_) => {}
Err(error) => {
error!(error=?error, "failed to send new splits to merge planner");
}
}
if let Err(error) = ... {
error!(error, "failed to send splits to merge planner");
}

nit: for next time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants