From 41b447529fcc94e2bcf19702856bc94b6179527e Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 6 Apr 2026 15:14:17 -0400 Subject: [PATCH 1/4] Create compaction planner service stub; wire up serving layer --- quickwit/Cargo.lock | 9 + quickwit/Cargo.toml | 3 + quickwit/quickwit-compaction/Cargo.toml | 15 + quickwit/quickwit-compaction/src/lib.rs | 18 + .../src/planner/compaction_service.rs | 28 + .../quickwit-compaction/src/planner/mod.rs | 17 + .../quickwit-compaction/src/worker/mod.rs | 13 + quickwit/quickwit-config/src/service.rs | 3 + .../src/actors/indexing_service.rs | 81 ++ .../src/tests/basic_tests.rs | 96 +++ quickwit/quickwit-proto/build.rs | 14 + .../protos/quickwit/compaction.proto | 24 + .../codegen/quickwit/quickwit.compaction.rs | 797 ++++++++++++++++++ quickwit/quickwit-proto/src/compaction/mod.rs | 67 ++ quickwit/quickwit-proto/src/lib.rs | 1 + quickwit/quickwit-serve/Cargo.toml | 1 + quickwit/quickwit-serve/src/grpc.rs | 17 + quickwit/quickwit-serve/src/lib.rs | 102 ++- quickwit/quickwit-serve/src/rest.rs | 1 + 19 files changed, 1303 insertions(+), 4 deletions(-) create mode 100644 quickwit/quickwit-compaction/Cargo.toml create mode 100644 quickwit/quickwit-compaction/src/lib.rs create mode 100644 quickwit/quickwit-compaction/src/planner/compaction_service.rs create mode 100644 quickwit/quickwit-compaction/src/planner/mod.rs create mode 100644 quickwit/quickwit-compaction/src/worker/mod.rs create mode 100644 quickwit/quickwit-proto/protos/quickwit/compaction.proto create mode 100644 quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs create mode 100644 quickwit/quickwit-proto/src/compaction/mod.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 89eefe2bed7..544b882d332 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7144,6 +7144,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "quickwit-compaction" +version = "0.8.0" +dependencies = [ + "async-trait", + "quickwit-proto", +] + [[package]] name = "quickwit-config" version = "0.8.0" @@ -7808,6 +7816,7 @@ dependencies = [ "quickwit-actors", "quickwit-cluster", "quickwit-common", + "quickwit-compaction", "quickwit-config", "quickwit-control-plane", "quickwit-doc-mapper", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 9242390d898..9d8cf833dc3 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -12,6 +12,7 @@ members = [ "quickwit-control-plane", "quickwit-datetime", "quickwit-directories", + "quickwit-compaction", "quickwit-doc-mapper", "quickwit-index-management", "quickwit-indexing", @@ -52,6 +53,7 @@ default-members = [ "quickwit-control-plane", "quickwit-datetime", "quickwit-directories", + "quickwit-compaction", "quickwit-doc-mapper", "quickwit-index-management", "quickwit-indexing", @@ -352,6 +354,7 @@ quickwit-control-plane = { path = "quickwit-control-plane" } quickwit-datetime = { path = "quickwit-datetime" } quickwit-directories = { path = "quickwit-directories" } quickwit-doc-mapper = { path = "quickwit-doc-mapper" } +quickwit-compaction = { path = "quickwit-compaction" } quickwit-index-management = { path = "quickwit-index-management" } quickwit-indexing = { path = "quickwit-indexing" } quickwit-ingest = { path = "quickwit-ingest" } diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml new file mode 100644 index 00000000000..07925537dce --- /dev/null +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "quickwit-compaction" +description = "Merge planner and merge worker services for split compaction" + +version.workspace = true +edition.workspace = true +homepage.workspace = true +documentation.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +async-trait = { workspace = true } +quickwit-proto = { workspace = true } \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs new file mode 100644 index 00000000000..2580ff03673 --- /dev/null +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![deny(clippy::disallowed_methods)] + +pub mod planner; +pub mod worker; \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/planner/compaction_service.rs b/quickwit/quickwit-compaction/src/planner/compaction_service.rs new file mode 100644 index 00000000000..77fe77b950c --- /dev/null +++ b/quickwit/quickwit-compaction/src/planner/compaction_service.rs @@ -0,0 +1,28 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use quickwit_proto::compaction::{ + CompactionResult, CompactionService, PingRequest, PingResponse, +}; + +#[derive(Debug, Clone)] +pub struct StubCompactionService; + +#[async_trait] +impl CompactionService for StubCompactionService { + async fn ping(&self, _request: PingRequest) -> CompactionResult { + Ok(PingResponse {}) + } +} \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs new file mode 100644 index 00000000000..0f77fe4e01a --- /dev/null +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod compaction_service; + +pub use compaction_service::StubCompactionService; \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/worker/mod.rs b/quickwit/quickwit-compaction/src/worker/mod.rs new file mode 100644 index 00000000000..ac4defdb972 --- /dev/null +++ b/quickwit/quickwit-compaction/src/worker/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. \ No newline at end of file diff --git a/quickwit/quickwit-config/src/service.rs b/quickwit/quickwit-config/src/service.rs index d323331b510..03446576ed8 100644 --- a/quickwit/quickwit-config/src/service.rs +++ b/quickwit/quickwit-config/src/service.rs @@ -29,6 +29,7 @@ pub enum QuickwitService { Searcher, Janitor, Metastore, + MergeWorker, } #[allow(clippy::from_over_into)] @@ -46,6 +47,7 @@ impl QuickwitService { QuickwitService::Searcher => "searcher", QuickwitService::Janitor => "janitor", QuickwitService::Metastore => "metastore", + QuickwitService::MergeWorker => "merge_worker", } } @@ -70,6 +72,7 @@ impl FromStr for QuickwitService { "searcher" => Ok(QuickwitService::Searcher), "janitor" => Ok(QuickwitService::Janitor), "metastore" => Ok(QuickwitService::Metastore), + "merge-worker" | "merge_worker" => Ok(QuickwitService::MergeWorker), _ => { bail!( "failed to parse service `{service_str}`. supported services are: `{}`", diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 247dc53c41a..f4e4d78a7c9 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1702,6 +1702,87 @@ mod tests { universe.quit().await; } + #[tokio::test] + async fn test_indexing_service_spawns_merge_pipeline_with_merge_scheduler() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let metastore = metastore_for_test(); + + let index_id = append_random_suffix("test-indexing-service-with-merge"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let add_source_request = + AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); + metastore.add_source(add_source_request).await.unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let data_dir_path = temp_dir.path().to_path_buf(); + let indexer_config = IndexerConfig::for_test().unwrap(); + let num_blocking_threads = 1; + let storage_resolver = StorageResolver::unconfigured(); + let universe = Universe::with_accelerated_time(); + let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); + let ingest_api_service = + init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) + .await + .unwrap(); + let merge_scheduler_mailbox: Mailbox = + universe.get_or_spawn_one(); + let indexing_server = IndexingService::new( + NodeId::from("test-node"), + data_dir_path, + indexer_config, + num_blocking_threads, + cluster.clone(), + metastore.clone(), + Some(ingest_api_service), + Some(merge_scheduler_mailbox), + IngesterPool::default(), + storage_resolver.clone(), + EventBroker::default(), + ) + .await + .unwrap(); + let (indexing_server_mailbox, indexing_server_handle) = + universe.spawn_builder().spawn(indexing_server); + + indexing_server_mailbox + .ask_for_res(SpawnPipeline { + index_id: index_id.clone(), + source_config, + pipeline_uid: PipelineUid::default(), + }) + .await + .unwrap(); + + let observation = indexing_server_handle.observe().await; + assert_eq!(observation.num_running_pipelines, 1); + assert_eq!(observation.num_running_merge_pipelines, 1); + assert!(universe.get_one::().is_some()); + + universe.quit().await; + } + #[derive(Debug)] struct FreezePipeline; #[async_trait] diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 91c6b0ebeb1..2715e310a97 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -162,3 +162,99 @@ async fn test_multi_nodes_cluster() { sandbox.shutdown().await.unwrap(); } + +#[tokio::test] +async fn test_no_merge_pipelines_when_compaction_service_enabled() { + quickwit_common::setup_logging_for_tests(); + unsafe { std::env::set_var("QW_ENABLE_COMPACTION_SERVICE", "true") }; + + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create( + r#" + version: 0.8 + index_id: test-no-merge-pipelines + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "#, + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + let stats = sandbox + .rest_client(QuickwitService::Indexer) + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(stats.num_running_merge_pipelines, 0); + + unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_merge_pipelines_present_without_compaction_service() { + quickwit_common::setup_logging_for_tests(); + unsafe { std::env::set_var("QW_ENABLE_COMPACTION_SERVICE", "false") }; + + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build_and_start() + .await; + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create( + r#" + version: 0.8 + index_id: test-with-merge-pipelines + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "#, + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + let stats = sandbox + .rest_client(QuickwitService::Indexer) + .node_stats() + .indexing() + .await + .unwrap(); + assert!(stats.num_running_merge_pipelines > 0); + + unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 569d9b5315b..a83e8723bd2 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -35,6 +35,20 @@ fn main() -> Result<(), Box> { .run() .unwrap(); + // Compaction service. + let mut prost_config = prost_build::Config::default(); + prost_config.file_descriptor_set_path("src/codegen/quickwit/compaction_descriptor.bin"); + + Codegen::builder() + .with_prost_config(prost_config) + .with_protos(&["protos/quickwit/compaction.proto"]) + .with_output_dir("src/codegen/quickwit") + .with_result_type_path("crate::compaction::CompactionResult") + .with_error_type_path("crate::compaction::CompactionError") + .generate_rpc_name_impls() + .run() + .unwrap(); + // Control plane. let mut prost_config = prost_build::Config::default(); prost_config.file_descriptor_set_path("src/codegen/quickwit/control_plane_descriptor.bin"); diff --git a/quickwit/quickwit-proto/protos/quickwit/compaction.proto b/quickwit/quickwit-proto/protos/quickwit/compaction.proto new file mode 100644 index 00000000000..82979c3b4ee --- /dev/null +++ b/quickwit/quickwit-proto/protos/quickwit/compaction.proto @@ -0,0 +1,24 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package quickwit.compaction; + +service CompactionService { + rpc Ping(PingRequest) returns (PingResponse); +} + +message PingRequest {} +message PingResponse {} \ No newline at end of file diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs new file mode 100644 index 00000000000..eec6be161ca --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs @@ -0,0 +1,797 @@ +// This file is @generated by prost-build. +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PingRequest {} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PingResponse {} +/// BEGIN quickwit-codegen +#[allow(unused_imports)] +use std::str::FromStr; +use tower::{Layer, Service, ServiceExt}; +use quickwit_common::tower::RpcName; +impl RpcName for PingRequest { + fn rpc_name() -> &'static str { + "ping" + } +} +#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] +#[async_trait::async_trait] +pub trait CompactionService: std::fmt::Debug + Send + Sync + 'static { + async fn ping( + &self, + request: PingRequest, + ) -> crate::compaction::CompactionResult; +} +#[derive(Debug, Clone)] +pub struct CompactionServiceClient { + inner: InnerCompactionServiceClient, +} +#[derive(Debug, Clone)] +struct InnerCompactionServiceClient(std::sync::Arc); +impl CompactionServiceClient { + pub fn new(instance: T) -> Self + where + T: CompactionService, + { + #[cfg(any(test, feature = "testsuite"))] + assert!( + std::any::TypeId::of:: < T > () != std::any::TypeId::of:: < + MockCompactionService > (), + "`MockCompactionService` must be wrapped in a `MockCompactionServiceWrapper`: use `CompactionServiceClient::from_mock(mock)` to instantiate the client" + ); + Self { + inner: InnerCompactionServiceClient(std::sync::Arc::new(instance)), + } + } + pub fn as_grpc_service( + &self, + max_message_size: bytesize::ByteSize, + ) -> compaction_service_grpc_server::CompactionServiceGrpcServer< + CompactionServiceGrpcServerAdapter, + > { + let adapter = CompactionServiceGrpcServerAdapter::new(self.clone()); + compaction_service_grpc_server::CompactionServiceGrpcServer::new(adapter) + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .send_compressed(tonic::codec::CompressionEncoding::Zstd) + .max_decoding_message_size(max_message_size.0 as usize) + .max_encoding_message_size(max_message_size.0 as usize) + } + pub fn from_channel( + addr: std::net::SocketAddr, + channel: tonic::transport::Channel, + max_message_size: bytesize::ByteSize, + compression_encoding_opt: Option, + ) -> Self { + let (_, connection_keys_watcher) = tokio::sync::watch::channel( + std::collections::HashSet::from_iter([addr]), + ); + let mut client = compaction_service_grpc_client::CompactionServiceGrpcClient::new( + channel, + ) + .max_decoding_message_size(max_message_size.0 as usize) + .max_encoding_message_size(max_message_size.0 as usize); + if let Some(compression_encoding) = compression_encoding_opt { + client = client + .accept_compressed(compression_encoding) + .send_compressed(compression_encoding); + } + let adapter = CompactionServiceGrpcClientAdapter::new( + client, + connection_keys_watcher, + ); + Self::new(adapter) + } + pub fn from_balance_channel( + balance_channel: quickwit_common::tower::BalanceChannel, + max_message_size: bytesize::ByteSize, + compression_encoding_opt: Option, + ) -> CompactionServiceClient { + let connection_keys_watcher = balance_channel.connection_keys_watcher(); + let mut client = compaction_service_grpc_client::CompactionServiceGrpcClient::new( + balance_channel, + ) + .max_decoding_message_size(max_message_size.0 as usize) + .max_encoding_message_size(max_message_size.0 as usize); + if let Some(compression_encoding) = compression_encoding_opt { + client = client + .accept_compressed(compression_encoding) + .send_compressed(compression_encoding); + } + let adapter = CompactionServiceGrpcClientAdapter::new( + client, + connection_keys_watcher, + ); + Self::new(adapter) + } + pub fn from_mailbox(mailbox: quickwit_actors::Mailbox) -> Self + where + A: quickwit_actors::Actor + std::fmt::Debug + Send + 'static, + CompactionServiceMailbox: CompactionService, + { + CompactionServiceClient::new(CompactionServiceMailbox::new(mailbox)) + } + pub fn tower() -> CompactionServiceTowerLayerStack { + CompactionServiceTowerLayerStack::default() + } + #[cfg(any(test, feature = "testsuite"))] + pub fn from_mock(mock: MockCompactionService) -> Self { + let mock_wrapper = mock_compaction_service::MockCompactionServiceWrapper { + inner: tokio::sync::Mutex::new(mock), + }; + Self::new(mock_wrapper) + } + #[cfg(any(test, feature = "testsuite"))] + pub fn mocked() -> Self { + Self::from_mock(MockCompactionService::new()) + } +} +#[async_trait::async_trait] +impl CompactionService for CompactionServiceClient { + async fn ping( + &self, + request: PingRequest, + ) -> crate::compaction::CompactionResult { + self.inner.0.ping(request).await + } +} +#[cfg(any(test, feature = "testsuite"))] +pub mod mock_compaction_service { + use super::*; + #[derive(Debug)] + pub struct MockCompactionServiceWrapper { + pub(super) inner: tokio::sync::Mutex, + } + #[async_trait::async_trait] + impl CompactionService for MockCompactionServiceWrapper { + async fn ping( + &self, + request: super::PingRequest, + ) -> crate::compaction::CompactionResult { + self.inner.lock().await.ping(request).await + } + } +} +pub type BoxFuture = std::pin::Pin< + Box> + Send + 'static>, +>; +impl tower::Service for InnerCompactionServiceClient { + type Response = PingResponse; + type Error = crate::compaction::CompactionError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, request: PingRequest) -> Self::Future { + let svc = self.clone(); + let fut = async move { svc.0.ping(request).await }; + Box::pin(fut) + } +} +/// A tower service stack is a set of tower services. +#[derive(Debug)] +struct CompactionServiceTowerServiceStack { + #[allow(dead_code)] + inner: InnerCompactionServiceClient, + ping_svc: quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::compaction::CompactionError, + >, +} +#[async_trait::async_trait] +impl CompactionService for CompactionServiceTowerServiceStack { + async fn ping( + &self, + request: PingRequest, + ) -> crate::compaction::CompactionResult { + self.ping_svc.clone().ready().await?.call(request).await + } +} +type PingLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::compaction::CompactionError, + >, + PingRequest, + PingResponse, + crate::compaction::CompactionError, +>; +#[derive(Debug, Default)] +pub struct CompactionServiceTowerLayerStack { + ping_layers: Vec, +} +impl CompactionServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::compaction::CompactionError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< + PingRequest, + Response = PingResponse, + Error = crate::compaction::CompactionError, + > + Clone + Send + Sync + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, + { + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self + } + pub fn stack_ping_layer(mut self, layer: L) -> Self + where + L: tower::Layer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::compaction::CompactionError, + >, + > + Send + Sync + 'static, + L::Service: tower::Service< + PingRequest, + Response = PingResponse, + Error = crate::compaction::CompactionError, + > + Clone + Send + Sync + 'static, + >::Future: Send + 'static, + { + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer)); + self + } + pub fn build(self, instance: T) -> CompactionServiceClient + where + T: CompactionService, + { + let inner_client = InnerCompactionServiceClient(std::sync::Arc::new(instance)); + self.build_from_inner_client(inner_client) + } + pub fn build_from_channel( + self, + addr: std::net::SocketAddr, + channel: tonic::transport::Channel, + max_message_size: bytesize::ByteSize, + compression_encoding_opt: Option, + ) -> CompactionServiceClient { + let client = CompactionServiceClient::from_channel( + addr, + channel, + max_message_size, + compression_encoding_opt, + ); + let inner_client = client.inner; + self.build_from_inner_client(inner_client) + } + pub fn build_from_balance_channel( + self, + balance_channel: quickwit_common::tower::BalanceChannel, + max_message_size: bytesize::ByteSize, + compression_encoding_opt: Option, + ) -> CompactionServiceClient { + let client = CompactionServiceClient::from_balance_channel( + balance_channel, + max_message_size, + compression_encoding_opt, + ); + let inner_client = client.inner; + self.build_from_inner_client(inner_client) + } + pub fn build_from_mailbox( + self, + mailbox: quickwit_actors::Mailbox, + ) -> CompactionServiceClient + where + A: quickwit_actors::Actor + std::fmt::Debug + Send + 'static, + CompactionServiceMailbox: CompactionService, + { + let inner_client = InnerCompactionServiceClient( + std::sync::Arc::new(CompactionServiceMailbox::new(mailbox)), + ); + self.build_from_inner_client(inner_client) + } + #[cfg(any(test, feature = "testsuite"))] + pub fn build_from_mock( + self, + mock: MockCompactionService, + ) -> CompactionServiceClient { + let client = CompactionServiceClient::from_mock(mock); + let inner_client = client.inner; + self.build_from_inner_client(inner_client) + } + fn build_from_inner_client( + self, + inner_client: InnerCompactionServiceClient, + ) -> CompactionServiceClient { + let ping_svc = self + .ping_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(inner_client.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = CompactionServiceTowerServiceStack { + inner: inner_client, + ping_svc, + }; + CompactionServiceClient::new(tower_svc_stack) + } +} +#[derive(Debug, Clone)] +struct MailboxAdapter { + inner: quickwit_actors::Mailbox, + phantom: std::marker::PhantomData, +} +impl std::ops::Deref for MailboxAdapter +where + A: quickwit_actors::Actor, +{ + type Target = quickwit_actors::Mailbox; + fn deref(&self) -> &Self::Target { + &self.inner + } +} +#[derive(Debug)] +pub struct CompactionServiceMailbox { + inner: MailboxAdapter, +} +impl CompactionServiceMailbox { + pub fn new(instance: quickwit_actors::Mailbox) -> Self { + let inner = MailboxAdapter { + inner: instance, + phantom: std::marker::PhantomData, + }; + Self { inner } + } +} +impl Clone for CompactionServiceMailbox { + fn clone(&self) -> Self { + let inner = MailboxAdapter { + inner: self.inner.clone(), + phantom: std::marker::PhantomData, + }; + Self { inner } + } +} +impl tower::Service for CompactionServiceMailbox +where + A: quickwit_actors::Actor + + quickwit_actors::DeferableReplyHandler> + Send + + 'static, + M: std::fmt::Debug + Send + 'static, + T: Send + 'static, + E: std::fmt::Debug + Send + 'static, + crate::compaction::CompactionError: From>, +{ + type Response = T; + type Error = crate::compaction::CompactionError; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + //! This does not work with balance middlewares such as `tower::balance::pool::Pool` because + //! this always returns `Poll::Ready`. The fix is to acquire a permit from the + //! mailbox in `poll_ready` and consume it in `call`. + std::task::Poll::Ready(Ok(())) + } + fn call(&mut self, message: M) -> Self::Future { + let mailbox = self.inner.clone(); + let fut = async move { + mailbox.ask_for_res(message).await.map_err(|error| error.into()) + }; + Box::pin(fut) + } +} +#[async_trait::async_trait] +impl CompactionService for CompactionServiceMailbox +where + A: quickwit_actors::Actor + std::fmt::Debug, + CompactionServiceMailbox< + A, + >: tower::Service< + PingRequest, + Response = PingResponse, + Error = crate::compaction::CompactionError, + Future = BoxFuture, + >, +{ + async fn ping( + &self, + request: PingRequest, + ) -> crate::compaction::CompactionResult { + self.clone().call(request).await + } +} +#[derive(Debug, Clone)] +pub struct CompactionServiceGrpcClientAdapter { + inner: T, + #[allow(dead_code)] + connection_addrs_rx: tokio::sync::watch::Receiver< + std::collections::HashSet, + >, +} +impl CompactionServiceGrpcClientAdapter { + pub fn new( + instance: T, + connection_addrs_rx: tokio::sync::watch::Receiver< + std::collections::HashSet, + >, + ) -> Self { + Self { + inner: instance, + connection_addrs_rx, + } + } +} +#[async_trait::async_trait] +impl CompactionService +for CompactionServiceGrpcClientAdapter< + compaction_service_grpc_client::CompactionServiceGrpcClient, +> +where + T: tonic::client::GrpcService + std::fmt::Debug + Clone + Send + + Sync + 'static, + T::ResponseBody: tonic::codegen::Body + Send + 'static, + ::Error: Into + + Send, + T::Future: Send, +{ + async fn ping( + &self, + request: PingRequest, + ) -> crate::compaction::CompactionResult { + self.inner + .clone() + .ping(request) + .await + .map(|response| response.into_inner()) + .map_err(|status| crate::error::grpc_status_to_service_error( + status, + PingRequest::rpc_name(), + )) + } +} +#[derive(Debug)] +pub struct CompactionServiceGrpcServerAdapter { + inner: InnerCompactionServiceClient, +} +impl CompactionServiceGrpcServerAdapter { + pub fn new(instance: T) -> Self + where + T: CompactionService, + { + Self { + inner: InnerCompactionServiceClient(std::sync::Arc::new(instance)), + } + } +} +#[async_trait::async_trait] +impl compaction_service_grpc_server::CompactionServiceGrpc +for CompactionServiceGrpcServerAdapter { + async fn ping( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + self.inner + .0 + .ping(request.into_inner()) + .await + .map(tonic::Response::new) + .map_err(crate::error::grpc_error_to_grpc_status) + } +} +/// Generated client implementations. +pub mod compaction_service_grpc_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct CompactionServiceGrpcClient { + inner: tonic::client::Grpc, + } + impl CompactionServiceGrpcClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl CompactionServiceGrpcClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> CompactionServiceGrpcClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + CompactionServiceGrpcClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn ping( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.compaction.CompactionService/Ping", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.compaction.CompactionService", "Ping"), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod compaction_service_grpc_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with CompactionServiceGrpcServer. + #[async_trait] + pub trait CompactionServiceGrpc: std::marker::Send + std::marker::Sync + 'static { + async fn ping( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct CompactionServiceGrpcServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl CompactionServiceGrpcServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for CompactionServiceGrpcServer + where + T: CompactionServiceGrpc, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/quickwit.compaction.CompactionService/Ping" => { + #[allow(non_camel_case_types)] + struct PingSvc(pub Arc); + impl< + T: CompactionServiceGrpc, + > tonic::server::UnaryService for PingSvc { + type Response = super::PingResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::ping(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PingSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for CompactionServiceGrpcServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "quickwit.compaction.CompactionService"; + impl tonic::server::NamedService for CompactionServiceGrpcServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/quickwit/quickwit-proto/src/compaction/mod.rs b/quickwit/quickwit-proto/src/compaction/mod.rs new file mode 100644 index 00000000000..0220466217c --- /dev/null +++ b/quickwit/quickwit-proto/src/compaction/mod.rs @@ -0,0 +1,67 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_common::rate_limited_error; +use quickwit_common::tower::MakeLoadShedError; +use serde::{Deserialize, Serialize}; + +use crate::GrpcServiceError; +use crate::error::{ServiceError, ServiceErrorCode}; + +include!("../codegen/quickwit/quickwit.compaction.rs"); + +pub const COMPACTION_FILE_DESCRIPTOR_SET: &[u8] = + include_bytes!("../codegen/quickwit/compaction_descriptor.bin"); + +pub type CompactionResult = std::result::Result; + +#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CompactionError { + #[error("{0}")] + Internal(String), +} + +impl ServiceError for CompactionError { + fn error_code(&self) -> ServiceErrorCode { + match self { + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, "compaction error: {err_msg}"); + ServiceErrorCode::Internal + } + } + } +} + +// Required by the codegen tower layers. All four constructors are mandatory. +impl GrpcServiceError for CompactionError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + fn new_timeout(message: String) -> Self { + Self::Internal(message) + } + fn new_too_many_requests() -> Self { + Self::Internal("too many requests".to_string()) + } + fn new_unavailable(message: String) -> Self { + Self::Internal(message) + } +} + +impl MakeLoadShedError for CompactionError { + fn make_load_shed_error() -> Self { + CompactionError::Internal("too many requests".to_string()) + } +} \ No newline at end of file diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index dbe850b55b7..da39dc11711 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -27,6 +27,7 @@ use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; pub mod cluster; +pub mod compaction; pub mod control_plane; pub use bytes; pub use tonic; diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 2721aa719f3..7b5aa303f26 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -65,6 +65,7 @@ quickwit-control-plane = { workspace = true } quickwit-doc-mapper = { workspace = true } quickwit-index-management = { workspace = true } quickwit-indexing = { workspace = true } +quickwit-compaction = { workspace = true } quickwit-ingest = { workspace = true } quickwit-jaeger = { workspace = true } quickwit-janitor = { workspace = true } diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 698c9e07d71..379788062fe 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -141,6 +141,22 @@ pub(crate) async fn start_grpc_server( None }; + // Mount gRPC compaction service if this node is a janitor with the compaction service enabled. + let compaction_grpc_service = if services + .node_config + .is_service_enabled(QuickwitService::Janitor) + { + if let Some(compaction_service) = &services.compaction_service_client_opt { + enabled_grpc_services.insert("compaction"); + file_descriptor_sets + .push(quickwit_proto::compaction::COMPACTION_FILE_DESCRIPTOR_SET); + Some(compaction_service.as_grpc_service(grpc_config.max_message_size)) + } else { + None + } + } else { + None + }; // Mount gRPC control plane service if `QuickwitService::ControlPlane` is enabled on node. let control_plane_grpc_service = if services .node_config @@ -238,6 +254,7 @@ pub(crate) async fn start_grpc_server( .add_service(developer_grpc_service) .add_service(health_service) .add_service(reflection_service) + .add_optional_service(compaction_grpc_service) .add_optional_service(control_plane_grpc_service) .add_optional_service(indexing_grpc_service) .add_optional_service(ingest_api_grpc_service) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 4d674d9cb7b..69f10c6a117 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -75,6 +75,7 @@ use quickwit_common::{get_bool_from_env, spawn_named_task}; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig}; use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; +use quickwit_compaction::planner::StubCompactionService; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::{IndexingService, MergeSchedulerService}; @@ -93,6 +94,7 @@ use quickwit_metastore::{ ControlPlaneMetastore, ListIndexesMetadataResponseExt, MetastoreResolver, }; use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; +use quickwit_proto::compaction::CompactionServiceClient; use quickwit_proto::control_plane::ControlPlaneServiceClient; use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate}; use quickwit_proto::ingest::ingester::{ @@ -140,6 +142,7 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_CONCURRENCY"; const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6; const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE"; +const ENABLE_COMPACTION_SERVICE_ENV_KEY: &str = "QW_ENABLE_COMPACTION_SERVICE"; pub type EnvFilterReloadFn = Arc anyhow::Result<()> + Send + Sync>; @@ -195,6 +198,7 @@ struct QuickwitServices { pub ingest_router_service: IngestRouterServiceClient, ingester_opt: Option, + pub compaction_service_client_opt: Option, pub janitor_service_opt: Option>, pub jaeger_service_opt: Option, pub otlp_logs_service_opt: Option, @@ -261,6 +265,61 @@ async fn balance_channel_for_service( BalanceChannel::from_stream(service_change_stream) } +/// Builds a `CompactionServiceClient` if the compaction service is available. +/// +/// On janitor nodes with `QW_ENABLE_COMPACTION_SERVICE=true`, wraps a local stub. +/// On non-janitor nodes with the flag set, waits up to 10s for a remote janitor +/// exposing the gRPC endpoint and logs an error if none is found. +async fn start_compaction_service_if_needed( + node_config: &NodeConfig, + cluster: &Cluster, +) -> Option { + if !get_bool_from_env(ENABLE_COMPACTION_SERVICE_ENV_KEY, false) { + return None; + } + // Only janitor nodes (which host the planner) and indexer nodes (which need + // to know whether to spawn local merge pipelines) care about this service. + if !node_config.is_service_enabled(QuickwitService::Janitor) + && !node_config.is_service_enabled(QuickwitService::Indexer) + { + return None; + } + if node_config.is_service_enabled(QuickwitService::Janitor) { + info!("compaction service enabled on this node"); + return Some(CompactionServiceClient::new(StubCompactionService)); + } + let balance_channel = + balance_channel_for_service(cluster, QuickwitService::Janitor).await; + let found = balance_channel + .wait_for(Duration::from_secs(300), |connections| { + !connections.is_empty() + }) + .await; + if !found { + error!( + "compaction service is enabled but no janitor node was found in the cluster, \ + falling back to local merge pipelines" + ); + return None; + } + info!("remote compaction service detected on janitor node"); + Some(CompactionServiceClient::from_balance_channel( + balance_channel, + node_config.grpc_config.max_message_size, + None, + )) +} + +fn spawn_merge_scheduler_service( + universe: &Universe, + node_config: &NodeConfig, +) -> Mailbox { + let (mailbox, _) = universe.spawn_builder().spawn( + MergeSchedulerService::new(node_config.indexer_config.merge_concurrency.get()), + ); + mailbox +} + async fn start_ingest_client_if_needed( node_config: &NodeConfig, universe: &Universe, @@ -539,10 +598,15 @@ pub async fn serve_quickwit( .await .context("failed to start ingest v1 service")?; + let compaction_service_client_opt = + start_compaction_service_if_needed(&node_config, &cluster).await; + let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { - let (merge_scheduler_mailbox, _) = universe.spawn_builder().spawn( - MergeSchedulerService::new(node_config.indexer_config.merge_concurrency.get()), - ); + let merge_scheduler_mailbox_opt = if compaction_service_client_opt.is_none() { + Some(spawn_merge_scheduler_service(&universe, &node_config)) + } else { + None + }; let indexing_service = start_indexing_service( &universe, &node_config, @@ -552,7 +616,7 @@ pub async fn serve_quickwit( ingester_pool.clone(), storage_resolver.clone(), event_broker.clone(), - Some(merge_scheduler_mailbox), + merge_scheduler_mailbox_opt, ) .await .context("failed to start indexing service")?; @@ -766,6 +830,7 @@ pub async fn serve_quickwit( ingest_router_service, ingest_service, ingester_opt: ingester_opt.clone(), + compaction_service_client_opt, janitor_service_opt, jaeger_service_opt, otlp_logs_service_opt, @@ -1472,6 +1537,7 @@ mod tests { use quickwit_cluster::{ChannelTransport, ClusterNode, create_cluster_for_test}; use quickwit_common::uri::Uri; use quickwit_common::{ServiceStream, assert_eventually}; + use quickwit_proto::compaction::CompactionService; use quickwit_config::SearcherConfig; use quickwit_metastore::{IndexMetadata, metastore_for_test}; use quickwit_proto::ingest::ingester::{MockIngesterService, ObservationMessage}; @@ -1814,4 +1880,32 @@ mod tests { assert!(ingester_pool.is_empty()); } + + #[tokio::test] + async fn test_compaction_service_on_janitor_node() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["janitor"], &transport, false) + .await + .unwrap(); + + // Without the env var, no compaction service. + let mut node_config = NodeConfig::for_test(); + node_config.enabled_services = HashSet::from([QuickwitService::Janitor]); + let result = start_compaction_service_if_needed(&node_config, &cluster).await; + assert!(result.is_none()); + + // With the env var, compaction service is returned. + unsafe { std::env::set_var(ENABLE_COMPACTION_SERVICE_ENV_KEY, "true") }; + let result = start_compaction_service_if_needed(&node_config, &cluster).await; + assert!(result.is_some()); + + // Ping the stub to confirm it works. + let client = result.unwrap(); + let response = client + .ping(quickwit_proto::compaction::PingRequest {}) + .await; + assert!(response.is_ok()); + + unsafe { std::env::remove_var(ENABLE_COMPACTION_SERVICE_ENV_KEY) }; + } } diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 3f193783b04..c52d790167d 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -843,6 +843,7 @@ mod tests { _report_splits_subscription_handle_opt: None, _local_shards_update_listener_handle_opt: None, cluster, + compaction_service_client_opt: None, control_plane_server_opt: None, control_plane_client, indexing_service_opt: None, From 33e3cd9cb5c5a71b9f1dc9bb81d9e53d46294f9a Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 6 Apr 2026 15:23:41 -0400 Subject: [PATCH 2/4] lints --- quickwit/quickwit-compaction/src/lib.rs | 1 - .../src/planner/compaction_service.rs | 6 ++---- quickwit/quickwit-compaction/src/planner/mod.rs | 2 +- quickwit/quickwit-compaction/src/worker/mod.rs | 13 ------------- .../src/actors/indexing_service.rs | 3 +-- .../quickwit-indexing/src/actors/publisher.rs | 12 ++++-------- quickwit/quickwit-proto/src/compaction/mod.rs | 2 +- quickwit/quickwit-serve/src/grpc.rs | 3 +-- quickwit/quickwit-serve/src/lib.rs | 17 ++++++++--------- 9 files changed, 18 insertions(+), 41 deletions(-) delete mode 100644 quickwit/quickwit-compaction/src/worker/mod.rs diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 2580ff03673..d59933e227e 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -15,4 +15,3 @@ #![deny(clippy::disallowed_methods)] pub mod planner; -pub mod worker; \ No newline at end of file diff --git a/quickwit/quickwit-compaction/src/planner/compaction_service.rs b/quickwit/quickwit-compaction/src/planner/compaction_service.rs index 77fe77b950c..0e3104a2129 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_service.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_service.rs @@ -13,9 +13,7 @@ // limitations under the License. use async_trait::async_trait; -use quickwit_proto::compaction::{ - CompactionResult, CompactionService, PingRequest, PingResponse, -}; +use quickwit_proto::compaction::{CompactionResult, CompactionService, PingRequest, PingResponse}; #[derive(Debug, Clone)] pub struct StubCompactionService; @@ -25,4 +23,4 @@ impl CompactionService for StubCompactionService { async fn ping(&self, _request: PingRequest) -> CompactionResult { Ok(PingResponse {}) } -} \ No newline at end of file +} diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs index 0f77fe4e01a..3b531a439f2 100644 --- a/quickwit/quickwit-compaction/src/planner/mod.rs +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -14,4 +14,4 @@ mod compaction_service; -pub use compaction_service::StubCompactionService; \ No newline at end of file +pub use compaction_service::StubCompactionService; diff --git a/quickwit/quickwit-compaction/src/worker/mod.rs b/quickwit/quickwit-compaction/src/worker/mod.rs deleted file mode 100644 index ac4defdb972..00000000000 --- a/quickwit/quickwit-compaction/src/worker/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. \ No newline at end of file diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index f4e4d78a7c9..5442a98589e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1746,8 +1746,7 @@ mod tests { init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) .await .unwrap(); - let merge_scheduler_mailbox: Mailbox = - universe.get_or_spawn_one(); + let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( NodeId::from("test-node"), data_dir_path, diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 4ed7baec043..e0b4b103a4b 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -213,16 +213,12 @@ impl Handler for Publisher { // When merging is handled locally, notify the merge planner about new // splits. The mailbox is None when an external merge service is active, // or when the planner has already shut down (e.g. source reached its end). - if let Some(merge_planner_mailbox) = &self.merge_planner_mailbox_opt { - match ctx + if let Some(merge_planner_mailbox) = &self.merge_planner_mailbox_opt + && let Err(error) = ctx .send_message(merge_planner_mailbox, NewSplits { new_splits }) .await - { - Ok(_) => {} - Err(error) => { - error!(error=?error, "failed to send new splits to merge planner"); - } - } + { + error!(error=?error, "failed to send new splits to merge planner"); } if replaced_split_ids.is_empty() { diff --git a/quickwit/quickwit-proto/src/compaction/mod.rs b/quickwit/quickwit-proto/src/compaction/mod.rs index 0220466217c..66e931446ae 100644 --- a/quickwit/quickwit-proto/src/compaction/mod.rs +++ b/quickwit/quickwit-proto/src/compaction/mod.rs @@ -64,4 +64,4 @@ impl MakeLoadShedError for CompactionError { fn make_load_shed_error() -> Self { CompactionError::Internal("too many requests".to_string()) } -} \ No newline at end of file +} diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 379788062fe..382b026a6a8 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -148,8 +148,7 @@ pub(crate) async fn start_grpc_server( { if let Some(compaction_service) = &services.compaction_service_client_opt { enabled_grpc_services.insert("compaction"); - file_descriptor_sets - .push(quickwit_proto::compaction::COMPACTION_FILE_DESCRIPTOR_SET); + file_descriptor_sets.push(quickwit_proto::compaction::COMPACTION_FILE_DESCRIPTOR_SET); Some(compaction_service.as_grpc_service(grpc_config.max_message_size)) } else { None diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 69f10c6a117..902541fc9ca 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -72,10 +72,10 @@ use quickwit_common::tower::{ }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; +use quickwit_compaction::planner::StubCompactionService; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig}; use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; -use quickwit_compaction::planner::StubCompactionService; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::{IndexingService, MergeSchedulerService}; @@ -288,8 +288,7 @@ async fn start_compaction_service_if_needed( info!("compaction service enabled on this node"); return Some(CompactionServiceClient::new(StubCompactionService)); } - let balance_channel = - balance_channel_for_service(cluster, QuickwitService::Janitor).await; + let balance_channel = balance_channel_for_service(cluster, QuickwitService::Janitor).await; let found = balance_channel .wait_for(Duration::from_secs(300), |connections| { !connections.is_empty() @@ -297,8 +296,8 @@ async fn start_compaction_service_if_needed( .await; if !found { error!( - "compaction service is enabled but no janitor node was found in the cluster, \ - falling back to local merge pipelines" + "compaction service is enabled but no janitor node was found in the cluster, falling \ + back to local merge pipelines" ); return None; } @@ -314,9 +313,9 @@ fn spawn_merge_scheduler_service( universe: &Universe, node_config: &NodeConfig, ) -> Mailbox { - let (mailbox, _) = universe.spawn_builder().spawn( - MergeSchedulerService::new(node_config.indexer_config.merge_concurrency.get()), - ); + let (mailbox, _) = universe.spawn_builder().spawn(MergeSchedulerService::new( + node_config.indexer_config.merge_concurrency.get(), + )); mailbox } @@ -1537,9 +1536,9 @@ mod tests { use quickwit_cluster::{ChannelTransport, ClusterNode, create_cluster_for_test}; use quickwit_common::uri::Uri; use quickwit_common::{ServiceStream, assert_eventually}; - use quickwit_proto::compaction::CompactionService; use quickwit_config::SearcherConfig; use quickwit_metastore::{IndexMetadata, metastore_for_test}; + use quickwit_proto::compaction::CompactionService; use quickwit_proto::ingest::ingester::{MockIngesterService, ObservationMessage}; use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService}; use quickwit_search::Job; From d661e216073542574af2b804c8819f89a40f32ed Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 6 Apr 2026 15:39:44 -0400 Subject: [PATCH 3/4] add another test, lint --- .../src/tests/basic_tests.rs | 49 +++++++++++++++++++ quickwit/quickwit-serve/src/lib.rs | 25 +++++++++- 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 2715e310a97..4e4ac88e9ec 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -258,3 +258,52 @@ async fn test_merge_pipelines_present_without_compaction_service() { unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; sandbox.shutdown().await.unwrap(); } + +#[tokio::test] +async fn test_merge_pipelines_fallback_when_compaction_enabled_but_no_janitor() { + quickwit_common::setup_logging_for_tests(); + unsafe { std::env::set_var("QW_ENABLE_COMPACTION_SERVICE", "true") }; + + // No janitor node — indexer should fail to find the compaction service + // and fall back to local merge pipelines. + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .build_and_start() + .await; + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create( + r#" + version: 0.8 + index_id: test-merge-fallback + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "#, + quickwit_config::ConfigFormat::Yaml, + false, + ) + .await + .unwrap(); + + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + let stats = sandbox + .rest_client(QuickwitService::Indexer) + .node_stats() + .indexing() + .await + .unwrap(); + assert!(stats.num_running_merge_pipelines > 0); + + unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 902541fc9ca..e2be1ccbc05 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -139,6 +139,12 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test Duration::from_secs(10) }; +const COMPACTION_SERVICE_DISCOVERY_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(100) +} else { + Duration::from_secs(300) +}; + const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_CONCURRENCY"; const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6; const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE"; @@ -290,7 +296,7 @@ async fn start_compaction_service_if_needed( } let balance_channel = balance_channel_for_service(cluster, QuickwitService::Janitor).await; let found = balance_channel - .wait_for(Duration::from_secs(300), |connections| { + .wait_for(COMPACTION_SERVICE_DISCOVERY_TIMEOUT, |connections| { !connections.is_empty() }) .await; @@ -1907,4 +1913,21 @@ mod tests { unsafe { std::env::remove_var(ENABLE_COMPACTION_SERVICE_ENV_KEY) }; } + + #[tokio::test] + async fn test_compaction_service_returns_none_when_no_janitor() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, false) + .await + .unwrap(); + + unsafe { std::env::set_var(ENABLE_COMPACTION_SERVICE_ENV_KEY, "true") }; + + let mut node_config = NodeConfig::for_test(); + node_config.enabled_services = HashSet::from([QuickwitService::Indexer]); + let result = start_compaction_service_if_needed(&node_config, &cluster).await; + assert!(result.is_none()); + + unsafe { std::env::remove_var(ENABLE_COMPACTION_SERVICE_ENV_KEY) }; + } } From cab018f57655f1be620ac1d27d35748174d66c06 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 8 Apr 2026 14:06:40 -0400 Subject: [PATCH 4/4] PR comments --- quickwit/quickwit-config/src/service.rs | 6 +- .../src/tests/basic_tests.rs | 49 -------------- quickwit/quickwit-proto/src/compaction/mod.rs | 19 ++++-- quickwit/quickwit-serve/src/lib.rs | 66 +++++++++---------- 4 files changed, 48 insertions(+), 92 deletions(-) diff --git a/quickwit/quickwit-config/src/service.rs b/quickwit/quickwit-config/src/service.rs index 03446576ed8..0382231330d 100644 --- a/quickwit/quickwit-config/src/service.rs +++ b/quickwit/quickwit-config/src/service.rs @@ -29,7 +29,7 @@ pub enum QuickwitService { Searcher, Janitor, Metastore, - MergeWorker, + Compactor, } #[allow(clippy::from_over_into)] @@ -47,7 +47,7 @@ impl QuickwitService { QuickwitService::Searcher => "searcher", QuickwitService::Janitor => "janitor", QuickwitService::Metastore => "metastore", - QuickwitService::MergeWorker => "merge_worker", + QuickwitService::Compactor => "compactor", } } @@ -72,7 +72,7 @@ impl FromStr for QuickwitService { "searcher" => Ok(QuickwitService::Searcher), "janitor" => Ok(QuickwitService::Janitor), "metastore" => Ok(QuickwitService::Metastore), - "merge-worker" | "merge_worker" => Ok(QuickwitService::MergeWorker), + "compactor" => Ok(QuickwitService::Compactor), _ => { bail!( "failed to parse service `{service_str}`. supported services are: `{}`", diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 4e4ac88e9ec..2715e310a97 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -258,52 +258,3 @@ async fn test_merge_pipelines_present_without_compaction_service() { unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; sandbox.shutdown().await.unwrap(); } - -#[tokio::test] -async fn test_merge_pipelines_fallback_when_compaction_enabled_but_no_janitor() { - quickwit_common::setup_logging_for_tests(); - unsafe { std::env::set_var("QW_ENABLE_COMPACTION_SERVICE", "true") }; - - // No janitor node — indexer should fail to find the compaction service - // and fall back to local merge pipelines. - let sandbox = ClusterSandboxBuilder::default() - .add_node([QuickwitService::Searcher]) - .add_node([QuickwitService::Metastore]) - .add_node([QuickwitService::Indexer]) - .add_node([QuickwitService::ControlPlane]) - .build_and_start() - .await; - - sandbox - .rest_client(QuickwitService::Indexer) - .indexes() - .create( - r#" - version: 0.8 - index_id: test-merge-fallback - doc_mapping: - field_mappings: - - name: body - type: text - indexing_settings: - commit_timeout_secs: 1 - "#, - quickwit_config::ConfigFormat::Yaml, - false, - ) - .await - .unwrap(); - - sandbox.wait_for_indexing_pipelines(1).await.unwrap(); - - let stats = sandbox - .rest_client(QuickwitService::Indexer) - .node_stats() - .indexing() - .await - .unwrap(); - assert!(stats.num_running_merge_pipelines > 0); - - unsafe { std::env::remove_var("QW_ENABLE_COMPACTION_SERVICE") }; - sandbox.shutdown().await.unwrap(); -} diff --git a/quickwit/quickwit-proto/src/compaction/mod.rs b/quickwit/quickwit-proto/src/compaction/mod.rs index 66e931446ae..1e27855add3 100644 --- a/quickwit/quickwit-proto/src/compaction/mod.rs +++ b/quickwit/quickwit-proto/src/compaction/mod.rs @@ -29,8 +29,14 @@ pub type CompactionResult = std::result::Result; #[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum CompactionError { - #[error("{0}")] + #[error("internal error: {0}")] Internal(String), + #[error("request timed out: {0}")] + Timeout(String), + #[error("too many requests")] + TooManyRequests, + #[error("service unavailable: {0}")] + Unavailable(String), } impl ServiceError for CompactionError { @@ -40,6 +46,9 @@ impl ServiceError for CompactionError { rate_limited_error!(limit_per_min = 6, "compaction error: {err_msg}"); ServiceErrorCode::Internal } + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::TooManyRequests => ServiceErrorCode::TooManyRequests, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } @@ -50,18 +59,18 @@ impl GrpcServiceError for CompactionError { Self::Internal(message) } fn new_timeout(message: String) -> Self { - Self::Internal(message) + Self::Timeout(message) } fn new_too_many_requests() -> Self { - Self::Internal("too many requests".to_string()) + Self::TooManyRequests } fn new_unavailable(message: String) -> Self { - Self::Internal(message) + Self::Unavailable(message) } } impl MakeLoadShedError for CompactionError { fn make_load_shed_error() -> Self { - CompactionError::Internal("too many requests".to_string()) + CompactionError::TooManyRequests } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index e2be1ccbc05..e288161393e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -276,23 +276,23 @@ async fn balance_channel_for_service( /// On janitor nodes with `QW_ENABLE_COMPACTION_SERVICE=true`, wraps a local stub. /// On non-janitor nodes with the flag set, waits up to 10s for a remote janitor /// exposing the gRPC endpoint and logs an error if none is found. -async fn start_compaction_service_if_needed( +async fn get_compaction_service_client_if_needed( node_config: &NodeConfig, cluster: &Cluster, -) -> Option { +) -> anyhow::Result, anyhow::Error> { if !get_bool_from_env(ENABLE_COMPACTION_SERVICE_ENV_KEY, false) { - return None; + return Ok(None); } // Only janitor nodes (which host the planner) and indexer nodes (which need // to know whether to spawn local merge pipelines) care about this service. - if !node_config.is_service_enabled(QuickwitService::Janitor) - && !node_config.is_service_enabled(QuickwitService::Indexer) - { - return None; + if !node_config.is_service_enabled(QuickwitService::Indexer) { + return Ok(None); } - if node_config.is_service_enabled(QuickwitService::Janitor) { + if node_config.is_service_enabled(QuickwitService::Janitor) + && node_config.is_service_enabled(QuickwitService::Indexer) + { info!("compaction service enabled on this node"); - return Some(CompactionServiceClient::new(StubCompactionService)); + return Ok(Some(CompactionServiceClient::new(StubCompactionService))); } let balance_channel = balance_channel_for_service(cluster, QuickwitService::Janitor).await; let found = balance_channel @@ -301,18 +301,14 @@ async fn start_compaction_service_if_needed( }) .await; if !found { - error!( - "compaction service is enabled but no janitor node was found in the cluster, falling \ - back to local merge pipelines" - ); - return None; + bail!("compaction service is enabled but no janitor node was found in the cluster") } info!("remote compaction service detected on janitor node"); - Some(CompactionServiceClient::from_balance_channel( + Ok(Some(CompactionServiceClient::from_balance_channel( balance_channel, node_config.grpc_config.max_message_size, None, - )) + ))) } fn spawn_merge_scheduler_service( @@ -604,7 +600,9 @@ pub async fn serve_quickwit( .context("failed to start ingest v1 service")?; let compaction_service_client_opt = - start_compaction_service_if_needed(&node_config, &cluster).await; + get_compaction_service_client_if_needed(&node_config, &cluster) + .await + .context("failed to initialize compaction service client")?; let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { let merge_scheduler_mailbox_opt = if compaction_service_client_opt.is_none() { @@ -1544,7 +1542,6 @@ mod tests { use quickwit_common::{ServiceStream, assert_eventually}; use quickwit_config::SearcherConfig; use quickwit_metastore::{IndexMetadata, metastore_for_test}; - use quickwit_proto::compaction::CompactionService; use quickwit_proto::ingest::ingester::{MockIngesterService, ObservationMessage}; use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService}; use quickwit_search::Job; @@ -1889,33 +1886,32 @@ mod tests { #[tokio::test] async fn test_compaction_service_on_janitor_node() { let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["janitor"], &transport, false) - .await - .unwrap(); + let cluster = + create_cluster_for_test(Vec::new(), &["janitor", "indexer"], &transport, true) + .await + .unwrap(); // Without the env var, no compaction service. let mut node_config = NodeConfig::for_test(); - node_config.enabled_services = HashSet::from([QuickwitService::Janitor]); - let result = start_compaction_service_if_needed(&node_config, &cluster).await; + node_config.enabled_services = + HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]); + let result = get_compaction_service_client_if_needed(&node_config, &cluster) + .await + .unwrap(); assert!(result.is_none()); - // With the env var, compaction service is returned. + // With the env var, compaction service client is returned. unsafe { std::env::set_var(ENABLE_COMPACTION_SERVICE_ENV_KEY, "true") }; - let result = start_compaction_service_if_needed(&node_config, &cluster).await; + let result = get_compaction_service_client_if_needed(&node_config, &cluster) + .await + .unwrap(); assert!(result.is_some()); - // Ping the stub to confirm it works. - let client = result.unwrap(); - let response = client - .ping(quickwit_proto::compaction::PingRequest {}) - .await; - assert!(response.is_ok()); - unsafe { std::env::remove_var(ENABLE_COMPACTION_SERVICE_ENV_KEY) }; } #[tokio::test] - async fn test_compaction_service_returns_none_when_no_janitor() { + async fn test_compaction_service_returns_error_when_no_janitor() { let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, false) .await @@ -1925,8 +1921,8 @@ mod tests { let mut node_config = NodeConfig::for_test(); node_config.enabled_services = HashSet::from([QuickwitService::Indexer]); - let result = start_compaction_service_if_needed(&node_config, &cluster).await; - assert!(result.is_none()); + let result = get_compaction_service_client_if_needed(&node_config, &cluster).await; + assert!(result.is_err()); unsafe { std::env::remove_var(ENABLE_COMPACTION_SERVICE_ENV_KEY) }; }