Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,389 changes: 1,348 additions & 41 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datafusion",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
Expand Down Expand Up @@ -50,6 +51,7 @@ default-members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datafusion",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
Expand Down Expand Up @@ -349,6 +351,7 @@ quickwit-codegen-example = { path = "quickwit-codegen/example" }
quickwit-common = { path = "quickwit-common" }
quickwit-config = { path = "quickwit-config" }
quickwit-control-plane = { path = "quickwit-control-plane" }
quickwit-datafusion = { path = "quickwit-datafusion" }
quickwit-datetime = { path = "quickwit-datetime" }
quickwit-directories = { path = "quickwit-directories" }
quickwit-doc-mapper = { path = "quickwit-doc-mapper" }
Expand Down
52 changes: 52 additions & 0 deletions quickwit/quickwit-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "quickwit-datafusion"
description = "DataFusion-based query execution for Quickwit parquet metrics"

version.workspace = true
edition.workspace = true
homepage.workspace = true
documentation.workspace = true
repository.workspace = true
authors.workspace = true
license.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
url = "2"

quickwit-common = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-parquet-engine = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-search = { workspace = true }
quickwit-storage = { workspace = true }

arrow = { workspace = true }
datafusion = "52"
datafusion-substrait = "52"
datafusion-datasource = "52"
datafusion-sql = "52"
datafusion-physical-plan = "52"
datafusion-datasource-parquet = "52"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed" }
object_store = "0.12"

[dev-dependencies]
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-datafusion = { path = ".", features = ["testsuite"] }
tokio = { workspace = true, features = ["test-util", "macros"] }

[features]
testsuite = []
137 changes: 137 additions & 0 deletions quickwit/quickwit-datafusion/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Generic DataFusion catalog / schema provider.
//!
//! `QuickwitSchemaProvider` routes table resolution to whichever registered
//! `QuickwitDataSource` claims to own the index. It knows nothing about
//! metrics, logs, or traces — those concerns live in each data source.

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::catalog::{MemorySchemaProvider, SchemaProvider};
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DFResult;

use crate::data_source::QuickwitDataSource;

/// DataFusion `SchemaProvider` that delegates table resolution to the
/// registered `QuickwitDataSource` implementations.
///
/// Resolution order for `table(name)`:
/// 1. Explicitly registered tables (from `CREATE EXTERNAL TABLE` DDL) — backed
/// by DataFusion's own [`MemorySchemaProvider`] which uses a lock-free
/// `DashMap` internally, the idiomatic choice for this role.
/// 2. Each source's `create_default_table_provider`, first non-None wins.
///
/// `register_table` / `deregister_table` delegate directly to the inner
/// `MemorySchemaProvider`, so `CREATE OR REPLACE EXTERNAL TABLE` works
/// correctly without any custom locking.
pub struct QuickwitSchemaProvider {
sources: Vec<Arc<dyn QuickwitDataSource>>,
/// DDL-registered tables (CREATE OR REPLACE EXTERNAL TABLE).
/// Uses DataFusion's MemorySchemaProvider which is backed by DashMap —
/// lock-free, concurrent-read-safe, and the standard DataFusion idiom.
ddl_tables: MemorySchemaProvider,
}

impl QuickwitSchemaProvider {
pub fn new(sources: Vec<Arc<dyn QuickwitDataSource>>) -> Self {
Self {
sources,
ddl_tables: MemorySchemaProvider::new(),
}
}
}

impl std::fmt::Debug for QuickwitSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QuickwitSchemaProvider")
.field("num_sources", &self.sources.len())
.field("num_ddl_tables", &self.ddl_tables.table_names().len())
.finish()
}
}

#[async_trait]
impl SchemaProvider for QuickwitSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

/// Lists all index names across all sources.
///
/// `table_names()` is a sync DataFusion API, but enumerating sources is
/// async. This uses `block_in_place`, which requires a multi-threaded
/// Tokio runtime. Only called for `SHOW TABLES` / `information_schema`;
/// not on the query hot path.
fn table_names(&self) -> Vec<String> {
let sources = &self.sources;
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let mut names = Vec::new();
for source in sources {
if let Ok(mut source_names) = source.list_index_names().await {
names.append(&mut source_names);
}
}
// Deduplicate in case multiple sources claim the same name.
names.dedup();
names
})
})
}

async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
// Resolution order:
// 1. DDL-registered tables (CREATE OR REPLACE EXTERNAL TABLE)
// 2. Each source's create_default_table_provider — first non-None wins.
// We do not pre-validate via table_names(); sources return None for
// unknown names and DataFusion emits "table not found". Avoids N+1.
if let Some(provider) = self.ddl_tables.table(name).await? {
return Ok(Some(provider));
}

for source in &self.sources {
if let Some(provider) = source.create_default_table_provider(name).await? {
return Ok(Some(provider));
}
}

Ok(None)
}

/// Returns `true` if the table is present in the DDL cache.
///
/// DataFusion's contract: `false` does not prevent `table()` from
/// returning `Some`; it is a hint only. Checking only DDL tables keeps
/// this method allocation-free and off the async hot path.
fn table_exist(&self, name: &str) -> bool {
self.ddl_tables.table_exist(name)
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> DFResult<Option<Arc<dyn TableProvider>>> {
self.ddl_tables.register_table(name, table)
}

fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
self.ddl_tables.deregister_table(name)
}
}
Loading