diff --git a/test/e2e/benchmark/config.go b/test/e2e/benchmark/config.go index 8cb6df38f..a79c2c888 100644 --- a/test/e2e/benchmark/config.go +++ b/test/e2e/benchmark/config.go @@ -27,6 +27,10 @@ type benchConfig struct { WarmupTxs int GasUnitsToBurn int MaxWallets int + MaxPending int + Rebroadcast int + BaseFee int + TipFee int WaitTimeout time.Duration } @@ -43,6 +47,10 @@ func newBenchConfig(serviceName string) benchConfig { WarmupTxs: envInt("BENCH_WARMUP_TXS", 200), GasUnitsToBurn: envInt("BENCH_GAS_UNITS_TO_BURN", 1_000_000), MaxWallets: envInt("BENCH_MAX_WALLETS", 500), + MaxPending: envInt("BENCH_MAX_PENDING", 50_000), + Rebroadcast: envInt("BENCH_REBROADCAST", 0), + BaseFee: envInt("BENCH_BASE_FEE", 20), + TipFee: envInt("BENCH_TIP_FEE", 2), WaitTimeout: envDuration("BENCH_WAIT_TIMEOUT", 10*time.Minute), } } diff --git a/test/e2e/benchmark/gasburner_test.go b/test/e2e/benchmark/gasburner_test.go index 14b346b2c..8abef62a2 100644 --- a/test/e2e/benchmark/gasburner_test.go +++ b/test/e2e/benchmark/gasburner_test.go @@ -23,6 +23,15 @@ func (s *SpamoorSuite) TestGasBurner() { cfg.log(t) + var result *benchmarkResult + var wallClock time.Duration + var spamoorStats *runSpamoorStats + defer func() { + if result != nil { + emitRunResult(t, cfg, result, wallClock, spamoorStats) + } + }() + e := s.setupEnv(cfg) api := e.spamoorAPI @@ -32,11 +41,11 @@ func (s *SpamoorSuite) TestGasBurner() { "gas_units_to_burn": cfg.GasUnitsToBurn, "total_count": cfg.CountPerSpammer, "throughput": cfg.Throughput, - "max_pending": 50000, + "max_pending": cfg.MaxPending, "max_wallets": cfg.MaxWallets, - "rebroadcast": 5, - "base_fee": 100, - "tip_fee": 50, + "rebroadcast": cfg.Rebroadcast, + "base_fee": cfg.BaseFee, + "tip_fee": cfg.TipFee, "refill_amount": "500000000000000000000", "refill_balance": "200000000000000000000", "refill_interval": 300, @@ -83,7 +92,7 @@ func (s *SpamoorSuite) TestGasBurner() { if err := waitForDrain(drainCtx, t.Logf, e.ethClient, 10); err != nil { t.Logf("warning: %v", err) } - wallClock := time.Since(loadStart) + wallClock = time.Since(loadStart) endHeader, err := e.ethClient.HeaderByNumber(ctx, nil) s.Require().NoError(err, "failed to get end block header") @@ -94,10 +103,16 @@ func (s *SpamoorSuite) TestGasBurner() { bm, err := collectBlockMetrics(ctx, e.ethClient, startBlock, endBlock) s.Require().NoError(err, "failed to collect block metrics") - traces := s.collectTraces(e, cfg.ServiceName) + traces := s.collectTraces(e) - result := newBenchmarkResult("GasBurner", bm, traces) + result = newBenchmarkResult("GasBurner", bm, traces) s.Require().Greater(result.summary.SteadyState, time.Duration(0), "expected non-zero steady-state duration") result.log(t, wallClock) w.addEntries(result.entries()) + + metrics, mErr := api.GetMetrics() + s.Require().NoError(mErr, "failed to get final metrics") + sent := sumCounter(metrics["spamoor_transactions_sent_total"]) + failed := sumCounter(metrics["spamoor_transactions_failed_total"]) + spamoorStats = &runSpamoorStats{Sent: sent, Failed: failed} } diff --git a/test/e2e/benchmark/helpers.go b/test/e2e/benchmark/helpers.go index 4d4241262..1f5e1349c 100644 --- a/test/e2e/benchmark/helpers.go +++ b/test/e2e/benchmark/helpers.go @@ -382,23 +382,16 @@ func (s *blockMetricsSummary) entries(prefix string) []entry { } } -// evNodeOverhead computes the fraction of block production time spent outside -// EVM execution. It looks up the average durations of BlockExecutor.ProduceBlock -// (the outer span covering the full block lifecycle) and Executor.ExecuteTxs -// (the inner span covering only EVM tx execution), then returns: +// overheadFromStats computes ev-node overhead from pre-aggregated span stats. // // overhead% = (avgProduce - avgExecute) / avgProduce * 100 -// -// This captures time spent on sequencing, DA submission, header construction, -// and other ev-node orchestration work. Returns false if either span is missing. -func evNodeOverhead(spans []e2e.TraceSpan) (float64, bool) { - stats := e2e.AggregateSpanStats(spans) +func overheadFromStats(stats map[string]*e2e.SpanStats) (float64, bool) { produce, ok := stats[spanProduceBlock] - if !ok { + if !ok || produce.Count == 0 { return 0, false } execute, ok := stats[spanExecuteTxs] - if !ok { + if !ok || execute.Count == 0 { return 0, false } produceAvg := float64(produce.Total.Microseconds()) / float64(produce.Count) @@ -409,20 +402,25 @@ func evNodeOverhead(spans []e2e.TraceSpan) (float64, bool) { return (produceAvg - executeAvg) / produceAvg * 100, true } -// rethExecutionRate computes ev-reth's effective execution throughput in GGas/s -// based on the total gas processed and the cumulative Engine.NewPayload duration. -// NewPayload is the engine API call where reth validates and executes all state -// transitions for a block (EVM execution + state root + disk commit). -func rethExecutionRate(spans []e2e.TraceSpan, totalGasUsed uint64) (float64, bool) { - stats := e2e.AggregateSpanStats(spans) +// evNodeOverhead aggregates spans then computes overhead. +func evNodeOverhead(spans []e2e.TraceSpan) (float64, bool) { + return overheadFromStats(e2e.AggregateSpanStats(spans)) +} + +// rethRateFromStats computes ev-reth GGas/s from pre-aggregated span stats. +func rethRateFromStats(stats map[string]*e2e.SpanStats, totalGasUsed uint64) (float64, bool) { np, ok := stats[spanNewPayload] if !ok || np.Total <= 0 || totalGasUsed == 0 { return 0, false } - // GGas/s = totalGas / newPayloadSeconds / 1e9 return float64(totalGasUsed) / np.Total.Seconds() / 1e9, true } +// rethExecutionRate aggregates spans then computes GGas/s. +func rethExecutionRate(spans []e2e.TraceSpan, totalGasUsed uint64) (float64, bool) { + return rethRateFromStats(e2e.AggregateSpanStats(spans), totalGasUsed) +} + // engineSpanEntries extracts ProduceBlock, Engine.GetPayload, and // Engine.NewPayload timing stats from ev-node spans and returns them as // result writer entries. these are the key metrics for answering "does block @@ -443,11 +441,13 @@ func engineSpanEntries(prefix string, spans []e2e.TraceSpan) []entry { if !ok || s.Count == 0 { continue } - avg := s.Total / time.Duration(s.Count) + avg := float64(s.Total.Microseconds()) / float64(s.Count) / 1000.0 + min := float64(s.Min.Microseconds()) / 1000.0 + max := float64(s.Max.Microseconds()) / 1000.0 entries = append(entries, - entry{Name: prefix + " - " + k.label + " avg", Unit: "ms", Value: float64(avg.Milliseconds())}, - entry{Name: prefix + " - " + k.label + " min", Unit: "ms", Value: float64(s.Min.Milliseconds())}, - entry{Name: prefix + " - " + k.label + " max", Unit: "ms", Value: float64(s.Max.Milliseconds())}, + entry{Name: prefix + " - " + k.label + " avg", Unit: "ms", Value: avg}, + entry{Name: prefix + " - " + k.label + " min", Unit: "ms", Value: min}, + entry{Name: prefix + " - " + k.label + " max", Unit: "ms", Value: max}, ) } return entries diff --git a/test/e2e/benchmark/result.go b/test/e2e/benchmark/result.go index 995896374..de5ff3764 100644 --- a/test/e2e/benchmark/result.go +++ b/test/e2e/benchmark/result.go @@ -20,6 +20,10 @@ type traceResult struct { // empty when the trace provider doesn't support rich span collection. evNodeRich []richSpan evRethRich []richSpan + + // resource attributes extracted from trace spans (OTEL_RESOURCE_ATTRIBUTES). + evNodeAttrs *resourceAttrs + evRethAttrs *resourceAttrs } // displayFlowcharts renders ASCII flowcharts from rich spans. Falls back to diff --git a/test/e2e/benchmark/run_result.go b/test/e2e/benchmark/run_result.go new file mode 100644 index 000000000..e5bd04f85 --- /dev/null +++ b/test/e2e/benchmark/run_result.go @@ -0,0 +1,303 @@ +//go:build evm + +package benchmark + +import ( + "encoding/json" + "os" + "runtime" + "runtime/debug" + "testing" + "time" + + e2e "github.com/evstack/ev-node/test/e2e" +) + +// runResult is the structured output of a single benchmark run. +// written to the path specified by BENCH_RESULT_OUTPUT. +type runResult struct { + SchemaVersion int `json:"schema_version"` + Test string `json:"test"` + Objective string `json:"objective,omitempty"` + Timestamp string `json:"timestamp"` + Platform string `json:"platform"` + Config runConfig `json:"config"` + Tags runTags `json:"tags"` + Host *resourceAttrs `json:"host,omitempty"` + Metrics runMetrics `json:"metrics"` + BlockRange runBlockRange `json:"block_range"` + Spamoor *runSpamoorStats `json:"spamoor,omitempty"` + FieldDescriptions map[string]string `json:"field_descriptions"` +} + +type runConfig struct { + BlockTime string `json:"block_time"` + SlotDuration string `json:"slot_duration"` + GasLimit string `json:"gas_limit"` + ScrapeInterval string `json:"scrape_interval"` + NumSpammers int `json:"num_spammers"` + CountPerSpammer int `json:"count_per_spammer"` + Throughput int `json:"throughput"` + WarmupTxs int `json:"warmup_txs"` + GasUnitsToBurn int `json:"gas_units_to_burn"` + MaxWallets int `json:"max_wallets"` + WaitTimeout string `json:"wait_timeout"` +} + +type runTags struct { + EvReth string `json:"ev_reth"` + EvNode string `json:"ev_node"` +} + +type runMetrics struct { + // throughput + MGasPerSec float64 `json:"mgas_per_sec"` + TPS float64 `json:"tps"` + BlocksPerSec float64 `json:"blocks_per_sec"` + + // block fill + NonEmptyRatioPct float64 `json:"non_empty_ratio_pct"` + AvgGasPerBlock float64 `json:"avg_gas_per_block"` + AvgTxPerBlock float64 `json:"avg_tx_per_block"` + GasBlockP50 float64 `json:"gas_block_p50"` + GasBlockP99 float64 `json:"gas_block_p99"` + TxBlockP50 float64 `json:"tx_block_p50"` + TxBlockP99 float64 `json:"tx_block_p99"` + + // block intervals + AvgBlockIntervalMs float64 `json:"avg_block_interval_ms"` + BlockIntervalP50Ms float64 `json:"block_interval_p50_ms"` + BlockIntervalP99Ms float64 `json:"block_interval_p99_ms"` + + // duration + SteadyStateSec float64 `json:"steady_state_sec"` + WallClockSec float64 `json:"wall_clock_sec"` + + // trace-derived (optional, omitted when tracing is unavailable) + OverheadPct *float64 `json:"overhead_pct,omitempty"` + EvRethGGasPerSec *float64 `json:"ev_reth_ggas_per_sec,omitempty"` + SecsPerGigagas *float64 `json:"secs_per_gigagas,omitempty"` + + // engine span timings (optional) + ProduceBlockAvgMs *float64 `json:"produce_block_avg_ms,omitempty"` + ProduceBlockMinMs *float64 `json:"produce_block_min_ms,omitempty"` + ProduceBlockMaxMs *float64 `json:"produce_block_max_ms,omitempty"` + GetPayloadAvgMs *float64 `json:"get_payload_avg_ms,omitempty"` + GetPayloadMinMs *float64 `json:"get_payload_min_ms,omitempty"` + GetPayloadMaxMs *float64 `json:"get_payload_max_ms,omitempty"` + NewPayloadAvgMs *float64 `json:"new_payload_avg_ms,omitempty"` + NewPayloadMinMs *float64 `json:"new_payload_min_ms,omitempty"` + NewPayloadMaxMs *float64 `json:"new_payload_max_ms,omitempty"` +} + +type runBlockRange struct { + Start uint64 `json:"start"` + End uint64 `json:"end"` + Total int `json:"total"` + NonEmpty int `json:"non_empty"` +} + +type runSpamoorStats struct { + Sent float64 `json:"sent"` + Failed float64 `json:"failed"` +} + +// emitRunResult builds a structured result from the benchmark data and writes it +// to the path in BENCH_RESULT_OUTPUT. no-op when the env var is unset. +func emitRunResult(t testing.TB, cfg benchConfig, br *benchmarkResult, wallClock time.Duration, spamoor *runSpamoorStats) { + outputPath := os.Getenv("BENCH_RESULT_OUTPUT") + if outputPath == "" { + return + } + + rr := buildRunResult(cfg, br, wallClock, spamoor) + + data, err := json.MarshalIndent(rr, "", " ") + if err != nil { + t.Logf("WARNING: failed to marshal run result: %v", err) + return + } + if err := os.WriteFile(outputPath, data, 0644); err != nil { + t.Logf("WARNING: failed to write run result to %s: %v", outputPath, err) + return + } + t.Logf("wrote structured result to %s", outputPath) +} + +func buildRunResult(cfg benchConfig, br *benchmarkResult, wallClock time.Duration, spamoor *runSpamoorStats) *runResult { + s := br.summary + + // compute span stats once for all trace-derived metrics + stats := e2e.AggregateSpanStats(br.traces.evNode) + + m := runMetrics{ + MGasPerSec: s.AchievedMGas, + TPS: s.AchievedTPS, + BlocksPerSec: s.BlocksPerSec, + NonEmptyRatioPct: s.NonEmptyRatio, + AvgGasPerBlock: s.AvgGas, + AvgTxPerBlock: s.AvgTx, + GasBlockP50: s.GasP50, + GasBlockP99: s.GasP99, + TxBlockP50: s.TxP50, + TxBlockP99: s.TxP99, + AvgBlockIntervalMs: float64(s.AvgBlockInterval.Milliseconds()), + BlockIntervalP50Ms: float64(s.IntervalP50.Milliseconds()), + BlockIntervalP99Ms: float64(s.IntervalP99.Milliseconds()), + SteadyStateSec: s.SteadyState.Seconds(), + WallClockSec: wallClock.Seconds(), + } + + if overhead, ok := overheadFromStats(stats); ok { + m.OverheadPct = &overhead + } + if ggas, ok := rethRateFromStats(stats, br.bm.TotalGasUsed); ok { + m.EvRethGGasPerSec = &ggas + } + if s.AchievedMGas > 0 { + v := 1000.0 / s.AchievedMGas + m.SecsPerGigagas = &v + } + + setEngineSpanTimings(&m, stats) + + return &runResult{ + SchemaVersion: 1, + Test: br.prefix, + Objective: os.Getenv("BENCH_OBJECTIVE"), + Timestamp: time.Now().UTC().Format(time.RFC3339), + Platform: envOrDefault("BENCH_PLATFORM", runtime.GOOS+"/"+runtime.GOARCH), + Host: br.traces.evNodeAttrs, + Config: runConfig{ + BlockTime: cfg.BlockTime, + SlotDuration: cfg.SlotDuration, + GasLimit: cfg.GasLimit, + ScrapeInterval: cfg.ScrapeInterval, + NumSpammers: cfg.NumSpammers, + CountPerSpammer: cfg.CountPerSpammer, + Throughput: cfg.Throughput, + WarmupTxs: cfg.WarmupTxs, + GasUnitsToBurn: cfg.GasUnitsToBurn, + MaxWallets: cfg.MaxWallets, + WaitTimeout: cfg.WaitTimeout.String(), + }, + Tags: runTags{ + EvReth: envOrDefault("EV_RETH_TAG", "latest"), + EvNode: evNodeTag(), + }, + Metrics: m, + BlockRange: runBlockRange{ + Start: br.bm.StartBlock, + End: br.bm.EndBlock, + Total: br.bm.TotalBlockCount, + NonEmpty: br.bm.BlockCount, + }, + Spamoor: spamoor, + FieldDescriptions: fieldDescriptions(), + } +} + +func setEngineSpanTimings(m *runMetrics, stats map[string]*e2e.SpanStats) { + type spanTarget struct { + name string + avg, min, max **float64 + } + targets := []spanTarget{ + {spanProduceBlock, &m.ProduceBlockAvgMs, &m.ProduceBlockMinMs, &m.ProduceBlockMaxMs}, + {spanGetPayload, &m.GetPayloadAvgMs, &m.GetPayloadMinMs, &m.GetPayloadMaxMs}, + {spanNewPayload, &m.NewPayloadAvgMs, &m.NewPayloadMinMs, &m.NewPayloadMaxMs}, + } + for _, target := range targets { + s, ok := stats[target.name] + if !ok || s.Count == 0 { + continue + } + avg := float64(s.Total.Microseconds()) / float64(s.Count) / 1000.0 + min := float64(s.Min.Microseconds()) / 1000.0 + max := float64(s.Max.Microseconds()) / 1000.0 + *target.avg = &avg + *target.min = &min + *target.max = &max + } +} + +func evNodeTag() string { + if tag := os.Getenv("EV_NODE_TAG"); tag != "" { + return tag + } + info, ok := debug.ReadBuildInfo() + if ok { + for _, s := range info.Settings { + if s.Key == "vcs.revision" { + if len(s.Value) > 8 { + return s.Value[:8] + } + return s.Value + } + } + } + return "unknown" +} + +func fieldDescriptions() map[string]string { + return map[string]string{ + "config.block_time": "target block production interval", + "config.slot_duration": "spamoor slot cadence for tx injection", + "config.gas_limit": "max gas per block (genesis config, hex string)", + "config.scrape_interval": "sequencer metrics scrape interval", + "config.num_spammers": "parallel spamoor instances injecting load", + "config.count_per_spammer": "total txs each spammer sends", + "config.throughput": "target tx/s per spammer", + "config.warmup_txs": "txs sent before measurement window starts", + "config.gas_units_to_burn": "gas consumed per tx (gasburner/state-pressure only)", + "config.max_wallets": "sender wallets per spammer", + "config.wait_timeout": "max time to wait for all txs to be sent", + + "tags.ev_reth": "ev-reth docker image tag", + "tags.ev_node": "ev-node git commit or tag", + + "host.host_name": "hostname of the machine running ev-node (from OTEL_RESOURCE_ATTRIBUTES)", + "host.host_cpu": "vCPU count (from OTEL_RESOURCE_ATTRIBUTES)", + "host.host_memory": "total memory (from OTEL_RESOURCE_ATTRIBUTES)", + "host.host_type": "CPU architecture (from OTEL_RESOURCE_ATTRIBUTES)", + "host.os_name": "operating system name (from OTEL_RESOURCE_ATTRIBUTES)", + "host.os_version": "kernel version (from OTEL_RESOURCE_ATTRIBUTES)", + "host.service_type": "node role: sequencer or fullnode (from OTEL_RESOURCE_ATTRIBUTES)", + + "metrics.mgas_per_sec": "total gas / steady-state seconds / 1e6", + "metrics.tps": "total tx count / steady-state seconds", + "metrics.blocks_per_sec": "non-empty blocks / steady-state seconds", + "metrics.non_empty_ratio_pct": "percentage of blocks containing >= 1 tx", + "metrics.avg_gas_per_block": "mean gas per non-empty block", + "metrics.avg_tx_per_block": "mean tx count per non-empty block", + "metrics.gas_block_p50": "50th percentile gas per non-empty block", + "metrics.gas_block_p99": "99th percentile gas per non-empty block", + "metrics.tx_block_p50": "50th percentile tx count per non-empty block", + "metrics.tx_block_p99": "99th percentile tx count per non-empty block", + "metrics.avg_block_interval_ms": "mean time between consecutive blocks (all blocks)", + "metrics.block_interval_p50_ms": "50th percentile block interval", + "metrics.block_interval_p99_ms": "99th percentile block interval", + "metrics.steady_state_sec": "wall-clock seconds between first and last non-empty block", + "metrics.wall_clock_sec": "total elapsed seconds including warmup and drain", + "metrics.overhead_pct": "(ProduceBlock - ExecuteTxs) / ProduceBlock — ev-node orchestration cost", + "metrics.ev_reth_ggas_per_sec": "total gas / cumulative NewPayload time / 1e9", + "metrics.secs_per_gigagas": "inverse of mgas_per_sec scaled to gigagas", + "metrics.produce_block_avg_ms": "avg BlockExecutor.ProduceBlock span (full block lifecycle)", + "metrics.produce_block_min_ms": "min ProduceBlock span", + "metrics.produce_block_max_ms": "max ProduceBlock span", + "metrics.get_payload_avg_ms": "avg Engine.GetPayload span (reth block building)", + "metrics.get_payload_min_ms": "min GetPayload span", + "metrics.get_payload_max_ms": "max GetPayload span", + "metrics.new_payload_avg_ms": "avg Engine.NewPayload span (reth validation + state commit)", + "metrics.new_payload_min_ms": "min NewPayload span", + "metrics.new_payload_max_ms": "max NewPayload span", + + "block_range.start": "first block in measurement window", + "block_range.end": "last block in measurement window", + "block_range.total": "all blocks in range including empty", + "block_range.non_empty": "blocks containing at least one tx", + + "spamoor.sent": "total txs successfully sent by spamoor", + "spamoor.failed": "total txs that failed", + } +} diff --git a/test/e2e/benchmark/spamoor_defi_test.go b/test/e2e/benchmark/spamoor_defi_test.go index 0e9e755a2..be6b96f9c 100644 --- a/test/e2e/benchmark/spamoor_defi_test.go +++ b/test/e2e/benchmark/spamoor_defi_test.go @@ -29,17 +29,26 @@ func (s *SpamoorSuite) TestDeFiSimulation() { w := newResultWriter(t, "DeFiSimulation") defer w.flush() + var result *benchmarkResult + var wallClock time.Duration + var spamoorStats *runSpamoorStats + defer func() { + if result != nil { + emitRunResult(t, cfg, result, wallClock, spamoorStats) + } + }() + e := s.setupEnv(cfg) uniswapConfig := map[string]any{ "throughput": cfg.Throughput, "total_count": cfg.CountPerSpammer, - "max_pending": 50000, + "max_pending": cfg.MaxPending, "max_wallets": cfg.MaxWallets, "pair_count": envInt("BENCH_PAIR_COUNT", 1), - "rebroadcast": envInt("BENCH_REBROADCAST", 0), - "base_fee": 20, - "tip_fee": 2, + "rebroadcast": cfg.Rebroadcast, + "base_fee": cfg.BaseFee, + "tip_fee": cfg.TipFee, "refill_amount": "10000000000000000000", // 10 ETH (swaps need ETH for WETH wrapping and router approvals) "refill_balance": "5000000000000000000", // 5 ETH "refill_interval": 600, @@ -93,7 +102,7 @@ func (s *SpamoorSuite) TestDeFiSimulation() { if err := waitForDrain(drainCtx, t.Logf, e.ethClient, 10); err != nil { t.Logf("warning: %v", err) } - wallClock := time.Since(loadStart) + wallClock = time.Since(loadStart) endHeader, err := e.ethClient.HeaderByNumber(ctx, nil) s.Require().NoError(err, "failed to get end block header") @@ -104,10 +113,16 @@ func (s *SpamoorSuite) TestDeFiSimulation() { bm, err := collectBlockMetrics(ctx, e.ethClient, startBlock, endBlock) s.Require().NoError(err, "failed to collect block metrics") - traces := s.collectTraces(e, cfg.ServiceName) + traces := s.collectTraces(e) - result := newBenchmarkResult("DeFiSimulation", bm, traces) + result = newBenchmarkResult("DeFiSimulation", bm, traces) s.Require().Greater(result.summary.SteadyState, time.Duration(0), "expected non-zero steady-state duration") result.log(t, wallClock) w.addEntries(result.entries()) + + metrics, mErr := e.spamoorAPI.GetMetrics() + s.Require().NoError(mErr, "failed to get final metrics") + sent := sumCounter(metrics["spamoor_transactions_sent_total"]) + failed := sumCounter(metrics["spamoor_transactions_failed_total"]) + spamoorStats = &runSpamoorStats{Sent: sent, Failed: failed} } diff --git a/test/e2e/benchmark/spamoor_erc20_test.go b/test/e2e/benchmark/spamoor_erc20_test.go index a1f8b96e0..1069a9a77 100644 --- a/test/e2e/benchmark/spamoor_erc20_test.go +++ b/test/e2e/benchmark/spamoor_erc20_test.go @@ -23,15 +23,24 @@ func (s *SpamoorSuite) TestERC20Throughput() { w := newResultWriter(t, "ERC20Throughput") defer w.flush() + var result *benchmarkResult + var wallClock time.Duration + var spamoorStats *runSpamoorStats + defer func() { + if result != nil { + emitRunResult(t, cfg, result, wallClock, spamoorStats) + } + }() + e := s.setupEnv(cfg) erc20Config := map[string]any{ "throughput": cfg.Throughput, "total_count": cfg.CountPerSpammer, - "max_pending": 50000, - "max_wallets": 200, - "base_fee": 20, - "tip_fee": 3, + "max_pending": cfg.MaxPending, + "max_wallets": cfg.MaxWallets, + "base_fee": cfg.BaseFee, + "tip_fee": cfg.TipFee, "refill_amount": "5000000000000000000", "refill_balance": "2000000000000000000", "refill_interval": 600, @@ -68,7 +77,7 @@ func (s *SpamoorSuite) TestERC20Throughput() { if err := waitForDrain(drainCtx, t.Logf, e.ethClient, 10); err != nil { t.Logf("warning: %v", err) } - wallClock := time.Since(loadStart) + wallClock = time.Since(loadStart) endHeader, err := e.ethClient.HeaderByNumber(ctx, nil) s.Require().NoError(err, "failed to get end block header") @@ -78,13 +87,15 @@ func (s *SpamoorSuite) TestERC20Throughput() { bm, err := collectBlockMetrics(ctx, e.ethClient, startBlock, endBlock) s.Require().NoError(err, "failed to collect block metrics") - traces := s.collectTraces(e, cfg.ServiceName) + traces := s.collectTraces(e) - result := newBenchmarkResult("ERC20Throughput", bm, traces) + result = newBenchmarkResult("ERC20Throughput", bm, traces) s.Require().Greater(result.summary.SteadyState, time.Duration(0), "expected non-zero steady-state duration") result.log(t, wallClock) w.addEntries(result.entries()) + spamoorStats = &runSpamoorStats{Sent: sent, Failed: failed} + s.Require().Greater(sent, float64(0), "at least one transaction should have been sent") s.Require().Zero(failed, "no transactions should have failed") } diff --git a/test/e2e/benchmark/spamoor_smoke_test.go b/test/e2e/benchmark/spamoor_smoke_test.go index d1168f9f7..df6a9955e 100644 --- a/test/e2e/benchmark/spamoor_smoke_test.go +++ b/test/e2e/benchmark/spamoor_smoke_test.go @@ -74,7 +74,7 @@ func (s *SpamoorSuite) TestSpamoorSmoke() { fail := sumCounter(metrics["spamoor_transactions_failed_total"]) // collect traces - traces := s.collectTraces(e, "ev-node-smoke") + traces := s.collectTraces(e) w.addSpans(traces.allSpans()) s.Require().Greater(sent, float64(0), "at least one transaction should have been sent") diff --git a/test/e2e/benchmark/spamoor_state_pressure_test.go b/test/e2e/benchmark/spamoor_state_pressure_test.go index 6e3bd9cae..57a3f7469 100644 --- a/test/e2e/benchmark/spamoor_state_pressure_test.go +++ b/test/e2e/benchmark/spamoor_state_pressure_test.go @@ -29,16 +29,26 @@ func (s *SpamoorSuite) TestStatePressure() { w := newResultWriter(t, "StatePressure") defer w.flush() + var result *benchmarkResult + var wallClock time.Duration + var spamoorStats *runSpamoorStats + defer func() { + if result != nil { + emitRunResult(t, cfg, result, wallClock, spamoorStats) + } + }() + e := s.setupEnv(cfg) storageSpamConfig := map[string]any{ "throughput": cfg.Throughput, "total_count": cfg.CountPerSpammer, "gas_units_to_burn": cfg.GasUnitsToBurn, - "max_pending": 50000, + "max_pending": cfg.MaxPending, "max_wallets": cfg.MaxWallets, - "base_fee": 20, - "tip_fee": 2, + "rebroadcast": cfg.Rebroadcast, + "base_fee": cfg.BaseFee, + "tip_fee": cfg.TipFee, "refill_amount": "5000000000000000000", // 5 ETH "refill_balance": "2000000000000000000", // 2 ETH "refill_interval": 600, @@ -55,8 +65,6 @@ func (s *SpamoorSuite) TestStatePressure() { t.Cleanup(func() { _ = e.spamoorAPI.DeleteSpammer(id) }) } - // allow spamoor time to initialise spammer goroutines before polling status - time.Sleep(3 * time.Second) requireSpammersRunning(t, e.spamoorAPI, spammerIDs) // wait for wallet funding to finish before recording start block @@ -87,7 +95,7 @@ func (s *SpamoorSuite) TestStatePressure() { if err := waitForDrain(drainCtx, t.Logf, e.ethClient, 10); err != nil { t.Logf("warning: %v", err) } - wallClock := time.Since(loadStart) + wallClock = time.Since(loadStart) endHeader, err := e.ethClient.HeaderByNumber(ctx, nil) s.Require().NoError(err, "failed to get end block header") @@ -98,9 +106,9 @@ func (s *SpamoorSuite) TestStatePressure() { bm, err := collectBlockMetrics(ctx, e.ethClient, startBlock, endBlock) s.Require().NoError(err, "failed to collect block metrics") - traces := s.collectTraces(e, cfg.ServiceName) + traces := s.collectTraces(e) - result := newBenchmarkResult("StatePressure", bm, traces) + result = newBenchmarkResult("StatePressure", bm, traces) s.Require().Greater(result.summary.SteadyState, time.Duration(0), "expected non-zero steady-state duration") result.log(t, wallClock) w.addEntries(result.entries()) @@ -109,6 +117,8 @@ func (s *SpamoorSuite) TestStatePressure() { s.Require().NoError(mErr, "failed to get final metrics") sent := sumCounter(metrics["spamoor_transactions_sent_total"]) failed := sumCounter(metrics["spamoor_transactions_failed_total"]) + spamoorStats = &runSpamoorStats{Sent: sent, Failed: failed} + s.Require().Greater(sent, float64(0), "at least one transaction should have been sent") s.Require().Zero(failed, "no transactions should have failed") } diff --git a/test/e2e/benchmark/suite_test.go b/test/e2e/benchmark/suite_test.go index 9fae77620..b7e4a48d8 100644 --- a/test/e2e/benchmark/suite_test.go +++ b/test/e2e/benchmark/suite_test.go @@ -40,9 +40,11 @@ func (s *SpamoorSuite) SetupTest() { // env holds a fully-wired environment created by setupEnv. type env struct { - traces traceProvider - spamoorAPI *spamoor.API - ethClient *ethclient.Client + traces traceProvider + spamoorAPI *spamoor.API + ethClient *ethclient.Client + evNodeServiceName string + evRethServiceName string } // TODO: temporary hardcoded tag, will be replaced with a proper release tag @@ -151,8 +153,10 @@ func (s *SpamoorSuite) setupLocalEnv(cfg benchConfig) *env { t: t, startTime: time.Now(), }, - spamoorAPI: spNode.API(), - ethClient: ethClient, + spamoorAPI: spNode.API(), + ethClient: ethClient, + evNodeServiceName: cfg.ServiceName, + evRethServiceName: "ev-reth", } } @@ -205,39 +209,46 @@ func (s *SpamoorSuite) setupExternalEnv(cfg benchConfig, rpcURL string) *env { t: t, startTime: time.Now(), }, - spamoorAPI: spNode.API(), - ethClient: ethClient, + spamoorAPI: spNode.API(), + ethClient: ethClient, + evNodeServiceName: envOrDefault("BENCH_EVNODE_SERVICE_NAME", "ev-node"), + evRethServiceName: envOrDefault("BENCH_EVRETH_SERVICE_NAME", "ev-reth"), } } // collectTraces fetches ev-node traces (required) and ev-reth traces (optional) // from the configured trace provider, then displays flowcharts. -func (s *SpamoorSuite) collectTraces(e *env, serviceName string) *traceResult { +func (s *SpamoorSuite) collectTraces(e *env) *traceResult { t := s.T() ctx := t.Context() - evNodeSpans, err := e.traces.collectSpans(ctx, serviceName) - s.Require().NoError(err, "failed to collect %s traces", serviceName) + evNodeSpans, err := e.traces.collectSpans(ctx, e.evNodeServiceName) + s.Require().NoError(err, "failed to collect %s traces", e.evNodeServiceName) tr := &traceResult{ evNode: evNodeSpans, - evReth: e.traces.tryCollectSpans(ctx, "ev-reth"), + evReth: e.traces.tryCollectSpans(ctx, e.evRethServiceName), } - if link := e.traces.uiURL(serviceName); link != "" { + if link := e.traces.uiURL(e.evNodeServiceName); link != "" { t.Logf("traces UI: %s", link) } if rc, ok := e.traces.(richSpanCollector); ok { - if spans, err := rc.collectRichSpans(ctx, serviceName); err == nil { + if spans, err := rc.collectRichSpans(ctx, e.evNodeServiceName); err == nil { tr.evNodeRich = spans } - if spans, err := rc.collectRichSpans(ctx, "ev-reth"); err == nil { + if spans, err := rc.collectRichSpans(ctx, e.evRethServiceName); err == nil { tr.evRethRich = spans } } - tr.displayFlowcharts(t, serviceName) + if rac, ok := e.traces.(resourceAttrCollector); ok { + tr.evNodeAttrs = rac.fetchResourceAttrs(ctx, e.evNodeServiceName) + tr.evRethAttrs = rac.fetchResourceAttrs(ctx, e.evRethServiceName) + } + + tr.displayFlowcharts(t, e.evNodeServiceName) return tr } diff --git a/test/e2e/benchmark/traces.go b/test/e2e/benchmark/traces.go index bdfeb658d..5e462bac6 100644 --- a/test/e2e/benchmark/traces.go +++ b/test/e2e/benchmark/traces.go @@ -46,6 +46,12 @@ type richSpanCollector interface { collectRichSpans(ctx context.Context, serviceName string) ([]richSpan, error) } +// resourceAttrCollector is an optional interface for providers that can +// extract OTEL resource attributes from trace spans. +type resourceAttrCollector interface { + fetchResourceAttrs(ctx context.Context, serviceName string) *resourceAttrs +} + // victoriaTraceProvider collects spans from a VictoriaTraces instance via its // LogsQL streaming API. type victoriaTraceProvider struct { @@ -254,6 +260,50 @@ func (v *victoriaTraceProvider) fetchAllSpans(ctx context.Context, serviceName s return spans, nil } +// fetchResourceAttrs queries a single span and extracts OTEL resource attributes +// from it. Uses limit=1 to avoid streaming the full span set on long-lived +// instances. Returns nil if no spans are available. +func (v *victoriaTraceProvider) fetchResourceAttrs(ctx context.Context, serviceName string) *resourceAttrs { + end := time.Now() + query := fmt.Sprintf(`_stream:{resource_attr:service.name="%s"}`, serviceName) + baseURL := strings.TrimRight(v.queryURL, "/") + url := fmt.Sprintf("%s/select/logsql/query?query=%s&start=%s&end=%s&limit=1", + baseURL, + neturl.QueryEscape(query), + v.startTime.Format(time.RFC3339Nano), + end.Format(time.RFC3339Nano)) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + v.t.Logf("warning: failed to create resource attrs request: %v", err) + return nil + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + v.t.Logf("warning: failed to fetch resource attrs: %v", err) + return nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + v.t.Logf("warning: unexpected status %d fetching resource attrs", resp.StatusCode) + return nil + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + attrs := extractResourceAttrs(line) + return &attrs + } + return nil +} + // logsqlSpan maps the fields returned by VictoriaTraces' LogsQL endpoint. type logsqlSpan struct { Name string `json:"name"` @@ -273,18 +323,49 @@ type logsqlRichSpan struct { // VictoriaTraces encodes it as resource_attr:'host.name which Go's // struct tags can't match due to the single quote. func extractHostName(line []byte) string { + return extractResourceAttr(line, "host.name") +} + +// extractResourceAttr pulls a specific resource attribute from a raw LogsQL JSON line. +// VictoriaTraces encodes resource attributes with keys like resource_attr:'host.name +// which can't be mapped via struct tags. +func extractResourceAttr(line []byte, attr string) string { var raw map[string]string if err := json.Unmarshal(line, &raw); err != nil { return "" } for k, v := range raw { - if strings.Contains(k, "host.name") { + if strings.Contains(k, attr) { return v } } return "" } +// resourceAttrs holds OTEL resource attributes extracted from trace spans. +type resourceAttrs struct { + HostName string `json:"host_name,omitempty"` + HostCPU string `json:"host_cpu,omitempty"` + HostMemory string `json:"host_memory,omitempty"` + HostType string `json:"host_type,omitempty"` + OSName string `json:"os_name,omitempty"` + OSVersion string `json:"os_version,omitempty"` + ServiceType string `json:"service_type,omitempty"` +} + +// extractResourceAttrs pulls all known OTEL resource attributes from a raw LogsQL JSON line. +func extractResourceAttrs(line []byte) resourceAttrs { + return resourceAttrs{ + HostName: extractResourceAttr(line, "host.name"), + HostCPU: extractResourceAttr(line, "host.cpu"), + HostMemory: extractResourceAttr(line, "host.memory"), + HostType: extractResourceAttr(line, "host.type"), + OSName: extractResourceAttr(line, "os.name"), + OSVersion: extractResourceAttr(line, "os.version"), + ServiceType: extractResourceAttr(line, "service.type"), + } +} + func (s logsqlSpan) SpanName() string { return s.Name } func (s logsqlSpan) SpanDuration() time.Duration {