[metrics] Materialize all metrics tags into top level columns#6237
[metrics] Materialize all metrics tags into top level columns#6237
Conversation
fd2f214 to
7621e22
Compare
7621e22 to
1b50f21
Compare
1b50f21 to
0b642de
Compare
0b642de to
7a5979f
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5eae799d9a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let empty_batch = | ||
| RecordBatch::new_empty(self.processor.schema().arrow_schema().clone()); | ||
| RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty())); |
There was a problem hiding this comment.
Preserve required schema on checkpoint-only flush batches
Constructing the fallback checkpoint batch with Schema::empty() causes a hard failure when force_commit is true and no valid docs were forwarded (for example, all docs in the raw batch failed parsing): the indexer still flushes this batch, and ParquetWriter::write_to_file now rejects it because required columns are missing. In that path, the packager exits with an error instead of forwarding the checkpoint, which can stall ingestion progress for that shard.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
fixed an issue in this PR - we need to send a None batch to the packager, so that it skips the write. we were sending a empty batch with an empty schema, so the packager was still attempting to write.
| for scope_metrics in resource_metrics.scope_metrics { | ||
| for metric in scope_metrics.metrics { | ||
| parse_metric( | ||
| &metric, | ||
| &service_name, | ||
| &resource_attributes, | ||
| &mut data_points, | ||
| )?; | ||
| parse_metric(&metric, &service_name, &mut data_points, &mut num_rejected); | ||
| } |
There was a problem hiding this comment.
Materialize resource-level metric attributes into tags
Resource attributes are parsed but no longer propagated into per-point tags: after extracting service.name, metrics are parsed with only service_name, so dimensions that commonly live at resource scope (like env, region, host, custom resource tags) are dropped entirely. This is a data-loss regression versus the previous behavior and breaks filtering/grouping on those tags for OTLP senders that attach dimensions at the resource level.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
yeah. fwiw we're not using the OTLP endpoint, but fixing this.
g-talbot
left a comment
There was a problem hiding this comment.
This looks OK to me, modulo fix to the O(m*n) algorithm that should be O(n) that I talk about in there. I'll mark this as LGTM, but fix that, please.
| pub fn append(&mut self, data_point: &MetricDataPoint) { | ||
| self.metric_name.append_value(&data_point.metric_name); | ||
| self.metric_type.append_value(data_point.metric_type as u8); | ||
| pub fn append(&mut self, data_point: MetricDataPoint) { |
There was a problem hiding this comment.
Why just one point? Don't points come in batches?
There was a problem hiding this comment.
yeah, we could just append a vec of points. ill change this later, since future PRs depend on this call and don't want to deal with rebase issues.
| timestamp_secs_builder.append_value(dp.timestamp_secs); | ||
| value_builder.append_value(dp.value); | ||
|
|
||
| for (tag_idx, tag_key) in sorted_tag_keys.iter().enumerate() { |
There was a problem hiding this comment.
Whoa this double nested loop could be quite inefficient. It's O(m*n) where m is the number of columns and n is number of points. Why not just go through dp.tags and append the point to each column. You'll have to keep track of "last row since I appended a value" so you know how many nulls to append "in front" and "at the end", but those can be appended in a single operation ("append X nulls") which is much more efficient. That's ~O(n), instead. (This was a very important optimization in the Husky column handling, as a data point.)
There was a problem hiding this comment.
👍 for each point, we'll only iterate through the tags that exist for each point, and bulk append nulls where we can.
| for scope_metrics in resource_metrics.scope_metrics { | ||
| for metric in scope_metrics.metrics { | ||
| parse_metric( | ||
| &metric, | ||
| &service_name, | ||
| &resource_attributes, | ||
| &mut data_points, | ||
| )?; | ||
| parse_metric(&metric, &service_name, &mut data_points, &mut num_rejected); | ||
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7593bc24e5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
… crate) Adds a DataFusion-based query execution layer on top of the wide-schema parquet metrics pipeline from PR quickwit-oss#6237. - New crate `quickwit-datafusion` with pluggable `QuickwitDataSource` trait, `DataFusionSessionBuilder`, `QuickwitSchemaProvider`, and distributed worker session setup via datafusion-distributed WorkerService - `MetricsDataSource` implements `QuickwitDataSource` for OSS parquet splits: metastore-backed split discovery, object-store caching, filter pushdown with CAST unwrapping fix, Substrait `ReadRel` consumption - `DataFusionService` gRPC (ExecuteSql + ExecuteSubstrait streaming) wired into quickwit-serve alongside the existing searcher and OTLP services - Distributed execution: DistributedPhysicalOptimizerRule produces PartitionIsolatorExec tasks (not shuffles) across multiple searcher nodes - Integration tests covering: pruning, aggregation, time range, GROUP BY, distributed tasks, NULL column fill for missing parquet fields, Substrait named-table queries, rollup from file, partial schema projection - dev/ local cluster setup with start-cluster, ingest-metrics, query-metrics Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6e2f0c999c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let attributes = extract_attributes(dp.attributes.clone()); | ||
| for (key, json_val) in attributes { | ||
| tags.insert(key, json_value_to_string(json_val)); | ||
| } |
There was a problem hiding this comment.
Filter reserved metric columns from dynamic tag keys
This path inserts every OTLP attribute key into tags without excluding reserved names (metric_name, metric_type, timestamp_secs, value), and those keys are later materialized as top-level columns. If a sender includes one of these attribute names, the batch can end up with duplicate column names, which makes downstream name-based lookups (index_of/schema validation/split metadata extraction) ambiguous and can cause rejected batches or incorrect field resolution. Please strip or rename reserved keys before adding them to tags.
Useful? React with 👍 / 👎.
…quired fields in otel metrics
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 24204fada3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f6a9163b83
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Description
This PR can be reviewed commit by commit.
Currently, we define a static schema for metrics data in parquet. This PR makes the schema dynamic - all tags are put into their own columns during ingestion. Incoming metrics points must have
"metric_name", "metric_type", "timestamp_secs", "value"as fields.Again, a lot of metrics parsing/arrow logic lives in
quickwit-opentelemetry, when it should not. We will refactor this, eventually :)How was this PR tested?
Describe how you tested this PR.