feat(datafusion): DataFusion metrics query layer#6276
Open
alexanderbianchi wants to merge 9 commits intoquickwit-oss:mainfrom
Open
feat(datafusion): DataFusion metrics query layer#6276alexanderbianchi wants to merge 9 commits intoquickwit-oss:mainfrom
alexanderbianchi wants to merge 9 commits intoquickwit-oss:mainfrom
Conversation
Introduces a generic DataFusion execution layer with a pluggable QuickwitDataSource trait. No data-source-specific code. - QuickwitDataSource trait + DataSourceContributions (contribution-return pattern) - DataFusionSessionBuilder with shared RuntimeEnv, check_invariants - QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables - QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution - QuickwitWorkerResolver, QuickwitTaskEstimator - QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge - DataFusionService::execute_sql (streaming Arrow IPC responses) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237. - MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver) - MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp - MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery - MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query - MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning - MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support - test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations. Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel. ExtensionTable reads (custom protos) can be handled by downstream callers. - QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer - execute_substrait_plan / execute_substrait_plan_streaming entry points - DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path) - session.rs: DataFusionSessionBuilder::execute_substrait convenience method Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tegration tests - Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs) - Generate codegen and mod.rs for the new proto service - Wire DataFusionService and WorkerService into quickwit-serve gRPC layer - Add DataFusionServiceGrpcImpl handler - Auto-create otel-metrics-v0_9 index on startup alongside logs/traces - Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits - Add metrics_distributed_tests: multi-node distributed execution - Add rollup_substrait.json fixture for Substrait plan testing Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Remove extra schema argument from ParquetWriter::new; the API only accepts a ParquetWriterConfig. Remove unused ParquetSchema import. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…test - Remove erroneous ParquetSchema argument from ParquetWriter::new calls in integration tests (API takes only ParquetWriterConfig) - Mark test_rest_ingest_then_in_process_query as #[ignore] until the /ingest-metrics REST endpoint is wired in quickwit-serve Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
fd63a7f to
5c84abc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds a DataFusion-based query execution layer for OSS parquet metrics. Review commit by commit. Everything is disabled by default.
Commits
f8de0eca—quickwit-datafusioncore frameworkQuickwitDataSourcetrait: plugin point for data sources (contributions,create_default_table_provider,list_index_names, optionalinit/register_for_worker/try_consume_read_relhooks)DataSourceContributions: additive value type for optimizer rules, UDFs, codec appliers;check_invariantsdetects name collisions at startupQuickwitSchemaProvider: routes table resolution to DDL-registered tables (viaMemorySchemaProvider) then sourcecreate_default_table_providerDataFusionSessionBuilder: sharedArc<RuntimeEnv>across sessions; installsDistributedPhysicalOptimizerRule+QuickwitTaskEstimatorwhen a resolver is presentbuild_quickwit_worker/QuickwitWorkerSessionBuilder: shares coordinatorRuntimeEnvso registered object stores are visible on workers without re-registrationQuickwitWorkerResolver: mapsSearcherPooladdresses to worker gRPC URLsQuickwitObjectStore: read-onlyquickwit_storage::Storage→object_store::ObjectStorebridgeDataFusionService: streaming SQL execution; handles semicolon-separated DDL+query batchesbce31c87—MetricsDataSourceMetricsDataSourceimplementsQuickwitDataSource; delegates toMetricsIndexResolver, registers object stores on workers concurrentlyMetricsTableProvider:ParquetSource-backed scan with bloom filter, page-index, and filter pushdown enabled; one file group per splitpredicate.rs): extractsMetricsSplitQueryfrom DataFusion filters; unwrapsCAST/TryCastnodes so time-range predicates survive DataFusion's type-coercion passMetastoreSplitProvider: multi-valueINlists on tag columns are not forwarded to the metastore (singleOption<String>per field); parquet-level filter handles correctnessMetastoreIndexResolver:index_metadataRPC +StorageResolver::resolve; no cache (follow-up)MetricsTableProviderFactory:STORED AS metricsDDL support8359a4a4—QuickwitSubstraitConsumerSubstraitConsumerfromdatafusion-substrait; routes eachReadRelto registered sources viatry_consume_read_rel, falls back to catalogWithCustomProvidershim: injects a pre-built provider for one table without rebuilding the sessionExtensionTablerels toNamedTableafter a source claims them sofrom_read_relapplies standard filter/projection handlingexecute_substrait_plan_streamingtakes fullSessionContext(notSessionState) so catalog registrations frombuild_sessionremain visibleexecute_substraitandexecute_substrait_jsontoDataFusionServicefeaceece7— gRPC wiring + integration testsdatafusion.proto:DataFusionServicewithExecuteSubstraitandExecuteSqlserver-streaming RPCs; each response carries oneRecordBatchas Arrow IPC bytesDataFusionServiceGrpcImpl: tonic adapter streaming viampsc+ReceiverStream;InvalidArgumentfor plan/schema errors,Internalotherwisequickwit-servewhendatafusion_session_builderisSome; gRPC reflection registered only when enabledGROUP BY, distributed tasks (PartitionIsolatorExec, no shuffles), NULL fill for missing parquet columns, Substrait named-table queries, rollup from JSON plan🤖 Generated with Claude Code