Skip to content

feat(datafusion): DataFusion metrics query layer#6276

Open
alexanderbianchi wants to merge 9 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/df-4-serve
Open

feat(datafusion): DataFusion metrics query layer#6276
alexanderbianchi wants to merge 9 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/df-4-serve

Conversation

@alexanderbianchi
Copy link
Copy Markdown

@alexanderbianchi alexanderbianchi commented Apr 7, 2026

Adds a DataFusion-based query execution layer for OSS parquet metrics. Review commit by commit. Everything is disabled by default.

Commits

f8de0ecaquickwit-datafusion core framework

  • QuickwitDataSource trait: plugin point for data sources (contributions, create_default_table_provider, list_index_names, optional init/register_for_worker/try_consume_read_rel hooks)
  • DataSourceContributions: additive value type for optimizer rules, UDFs, codec appliers; check_invariants detects name collisions at startup
  • QuickwitSchemaProvider: routes table resolution to DDL-registered tables (via MemorySchemaProvider) then source create_default_table_provider
  • DataFusionSessionBuilder: shared Arc<RuntimeEnv> across sessions; installs DistributedPhysicalOptimizerRule + QuickwitTaskEstimator when a resolver is present
  • build_quickwit_worker / QuickwitWorkerSessionBuilder: shares coordinator RuntimeEnv so registered object stores are visible on workers without re-registration
  • QuickwitWorkerResolver: maps SearcherPool addresses to worker gRPC URLs
  • QuickwitObjectStore: read-only quickwit_storage::Storageobject_store::ObjectStore bridge
  • DataFusionService: streaming SQL execution; handles semicolon-separated DDL+query batches

bce31c87MetricsDataSource

  • MetricsDataSource implements QuickwitDataSource; delegates to MetricsIndexResolver, registers object stores on workers concurrently
  • MetricsTableProvider: ParquetSource-backed scan with bloom filter, page-index, and filter pushdown enabled; one file group per split
  • Predicate extraction (predicate.rs): extracts MetricsSplitQuery from DataFusion filters; unwraps CAST/TryCast nodes so time-range predicates survive DataFusion's type-coercion pass
  • MetastoreSplitProvider: multi-value IN lists on tag columns are not forwarded to the metastore (single Option<String> per field); parquet-level filter handles correctness
  • MetastoreIndexResolver: index_metadata RPC + StorageResolver::resolve; no cache (follow-up)
  • MetricsTableProviderFactory: STORED AS metrics DDL support

8359a4a4QuickwitSubstraitConsumer

  • Implements SubstraitConsumer from datafusion-substrait; routes each ReadRel to registered sources via try_consume_read_rel, falls back to catalog
  • WithCustomProvider shim: injects a pre-built provider for one table without rebuilding the session
  • Rewrites ExtensionTable rels to NamedTable after a source claims them so from_read_rel applies standard filter/projection handling
  • execute_substrait_plan_streaming takes full SessionContext (not SessionState) so catalog registrations from build_session remain visible
  • Adds execute_substrait and execute_substrait_json to DataFusionService

feaceece7 — gRPC wiring + integration tests

  • datafusion.proto: DataFusionService with ExecuteSubstrait and ExecuteSql server-streaming RPCs; each response carries one RecordBatch as Arrow IPC bytes
  • DataFusionServiceGrpcImpl: tonic adapter streaming via mpsc + ReceiverStream; InvalidArgument for plan/schema errors, Internal otherwise
  • Mounted in quickwit-serve when datafusion_session_builder is Some; gRPC reflection registered only when enabled
  • Integration tests: SQL pruning, aggregation, time-range filtering, GROUP BY, distributed tasks (PartitionIsolatorExec, no shuffles), NULL fill for missing parquet columns, Substrait named-table queries, rollup from JSON plan

🤖 Generated with Claude Code

alexanderbianchi and others added 7 commits April 7, 2026 13:36
Introduces a generic DataFusion execution layer with a pluggable
QuickwitDataSource trait. No data-source-specific code.

- QuickwitDataSource trait + DataSourceContributions (contribution-return pattern)
- DataFusionSessionBuilder with shared RuntimeEnv, check_invariants
- QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables
- QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution
- QuickwitWorkerResolver, QuickwitTaskEstimator
- QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge
- DataFusionService::execute_sql (streaming Arrow IPC responses)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237.

- MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver)
- MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp
- MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery
- MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query
- MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning
- MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support
- test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations.
Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel.
ExtensionTable reads (custom protos) can be handled by downstream callers.

- QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer
- execute_substrait_plan / execute_substrait_plan_streaming entry points
- DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path)
- session.rs: DataFusionSessionBuilder::execute_substrait convenience method

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tegration tests

- Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs)
- Generate codegen and mod.rs for the new proto service
- Wire DataFusionService and WorkerService into quickwit-serve gRPC layer
- Add DataFusionServiceGrpcImpl handler
- Auto-create otel-metrics-v0_9 index on startup alongside logs/traces
- Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits
- Add metrics_distributed_tests: multi-node distributed execution
- Add rollup_substrait.json fixture for Substrait plan testing

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Remove extra schema argument from ParquetWriter::new; the API only accepts
a ParquetWriterConfig. Remove unused ParquetSchema import.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…test

- Remove erroneous ParquetSchema argument from ParquetWriter::new calls
  in integration tests (API takes only ParquetWriterConfig)
- Mark test_rest_ingest_then_in_process_query as #[ignore] until the
  /ingest-metrics REST endpoint is wired in quickwit-serve

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@alexanderbianchi alexanderbianchi changed the title feat(datafusion): wire DataFusion gRPC service + integration tests feat(datafusion): DataFusion metrics query layer Apr 7, 2026
alexanderbianchi and others added 2 commits April 7, 2026 14:26
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name
  so time-range predicates are correctly classified as Inexact and
  passed to scan(); previously CAST-wrapped filters were silently dropped
- fix: declare parquet sort order (metric_name, timestamp_secs ASC) on
  FileScanConfig so DataFusion avoids redundant sort operators
- fix: get_opts now respects GetOptions.range — dispatches to get_slice
  for Bounded/Suffix ranges instead of always downloading the full file
- fix: to_object_store_error propagates file path on NotFound
- fix: register_for_worker made a no-op; lazy scan-path registration is
  sufficient and avoids O(indexes) metastore RPCs per worker task;
  removes stale comment claiming a non-existent object-store cache
- fix: extract is_index_not_found helper, removing duplicated downcast
  block from try_consume_read_rel and create_default_table_provider
- fix: sort before dedup in QuickwitSchemaProvider::table_names
- fix: empty searcher pool returns Ok(vec![]) for local execution fallback
- fix: remove dead builder methods with_udf_batch, with_codec_applier,
  with_physical_optimizer_rule from DataSourceContributions
- feat: add tracing spans to execute_substrait and execute_sql
- feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve
- refactor: extract stream_to_receiver helper in gRPC handler

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant