From 339450977317f2687f2bf2f53c285fb2c53ede52 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:09:56 +0100 Subject: [PATCH 1/3] fix(verifier): reconnect WebSocket on drop and add per-attempt RPC timeouts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two issues caused submission metrics to silently stop updating, requiring an application restart to recover: 1. WebSocket subscription drop was never detected. The main select loop never listened on sub.Err(). When the go-ethereum WebSocket connection to the EVM node dropped, the error was silently swallowed and the headers channel stopped delivering blocks forever. Fix: handle sub.Err() in the loop and reconnect via reconnectSubscription() with exponential backoff (5s → 10s → 20s → 40s → 60s cap). The worker pool continues running throughout reconnection. 2. Worker goroutines could hang indefinitely on Celestia/ev-node RPC calls. All calls inside verifyBlock (GetBlock, GetBlockWithBlobs, GetBlockTimestamp, VerifyBlobAtHeight, VerifyDataBlobAtHeight) used the root application context, which is only cancelled on shutdown. With no per-call timeout, a slow or hung Celestia node locked a worker goroutine permanently. Once all workers were stuck the block queue filled (capacity = workers*2), the main loop blocked on the inner queue send, and the refresh ticker stopped firing — metrics froze. Fix: extract each retry's RPC calls into verifyAttempt() which creates a context.WithTimeout(ctx, 30s) and defers cancel(). Workers are now guaranteed to free within 30 seconds per attempt regardless of endpoint availability. Co-Authored-By: Claude Sonnet 4.6 --- pkg/exporters/verifier/verifier.go | 205 +++++++++++++++++++---------- 1 file changed, 135 insertions(+), 70 deletions(-) diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 833e628..0e2caf2 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -5,6 +5,7 @@ import ( "sync" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/evstack/ev-metrics/internal/clients/celestia" "github.com/evstack/ev-metrics/internal/clients/evm" @@ -56,7 +57,6 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error if err != nil { return err } - defer sub.Unsubscribe() // create buffered channel for block queue blockQueue := make(chan *types.Header, e.workers*2) @@ -83,9 +83,27 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error select { case <-ctx.Done(): e.logger.Info().Msg("stopping block verification") + sub.Unsubscribe() close(blockQueue) workerGroup.Wait() return nil + case subErr := <-sub.Err(): + // WebSocket subscription dropped — reconnect with backoff. + if subErr != nil { + e.logger.Error().Err(subErr).Msg("WebSocket subscription error, reconnecting") + } else { + e.logger.Warn().Msg("WebSocket subscription closed, reconnecting") + } + sub.Unsubscribe() + newSub := e.reconnectSubscription(ctx, headers) + if newSub == nil { + // context was cancelled during reconnection + close(blockQueue) + workerGroup.Wait() + return nil + } + sub = newSub + e.logger.Info().Msg("WebSocket subscription re-established") case <-refreshTicker.C: // ensure that submission duration is always included in the 60 second window. m.RefreshSubmissionDuration() @@ -106,6 +124,7 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error case blockQueue <- header: // block queued successfully case <-ctx.Done(): + sub.Unsubscribe() close(blockQueue) workerGroup.Wait() return nil @@ -114,6 +133,33 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error } } +// reconnectSubscription attempts to re-establish the WebSocket block header subscription +// with exponential backoff. Returns nil if the context is cancelled before reconnecting. +func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *types.Header) ethereum.Subscription { + backoff := 5 * time.Second + const maxBackoff = 60 * time.Second + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(backoff): + } + + sub, err := e.evmClient.SubscribeNewHead(ctx, headers) + if err != nil { + e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying") + if backoff*2 < maxBackoff { + backoff *= 2 + } else { + backoff = maxBackoff + } + continue + } + return sub + } +} + // processBlocks processes blocks from the queue func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) { logger := e.logger.With().Int("worker_id", workerID).Logger() @@ -153,6 +199,12 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, } } +// verifyAttemptTimeout caps how long a single verification attempt (all RPC calls +// combined) may take. Without this, a slow or hung Celestia/ev-node endpoint can +// block a worker goroutine indefinitely, eventually filling the block queue and +// freezing metrics. +const verifyAttemptTimeout = 30 * time.Second + // verifyBlock attempts to verify a DA height for a given block status. func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool { blockHeight := header.Number.Uint64() @@ -199,89 +251,102 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * // proceed with retry } - blockResult, err := e.evnodeClient.GetBlock(ctx, blockHeight) - if err != nil { - logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") - continue - } - - daHeight := blockResult.HeaderDaHeight - if namespace == "data" { - daHeight = blockResult.DataDaHeight + if e.verifyAttempt(ctx, m, logger, retries, blockHeight, namespace, blockTime, startTime) { + return false } + } - if daHeight == 0 { - logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") - continue - } + // if loop completes without success, log final error + logger.Error().Msg("max retries exhausted: failed to verify block") + e.onVerified(m, namespace, blockHeight, 0, false, 0) + return true +} - blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(ctx, blockHeight) - if err != nil { - logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") - continue - } +// verifyAttempt performs one bounded RPC attempt to verify a block against Celestia DA. +// It returns true when retrying is no longer needed (verified, or permanent failure), +// and false when the caller should retry. +// Each call is bounded by verifyAttemptTimeout so workers cannot hang indefinitely +// on slow or unresponsive ev-node / Celestia endpoints. +func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger zerolog.Logger, retries int, blockHeight uint64, namespace string, blockTime time.Time, startTime time.Time) bool { + attemptCtx, cancel := context.WithTimeout(ctx, verifyAttemptTimeout) + defer cancel() - daBlockTime, err := e.celestiaClient.GetBlockTimestamp(ctx, daHeight) - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") - continue - } + blockResult, err := e.evnodeClient.GetBlock(attemptCtx, blockHeight) + if err != nil { + logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") + return false + } - // the time taken from block time to DA inclusion time. - submissionDuration := daBlockTime.Sub(blockTime) + daHeight := blockResult.HeaderDaHeight + if namespace == "data" { + daHeight = blockResult.DataDaHeight + } - switch namespace { - case "header": - verified, err := e.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) + if daHeight == 0 { + logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") + return false + } - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") - continue - } + blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(attemptCtx, blockHeight) + if err != nil { + logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") + return false + } - if verified { - logger.Info(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("header blob verified on Celestia") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } + daBlockTime, err := e.celestiaClient.GetBlockTimestamp(attemptCtx, daHeight) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") + return false + } - case "data": - if len(blockResultWithBlobs.DataBlob) == 0 { - logger.Info(). - Dur("duration", time.Since(startTime)). - Msg("empty data block - no verification needed") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } + // the time taken from block time to DA inclusion time. + submissionDuration := daBlockTime.Sub(blockTime) - // perform actual verification between bytes from ev-node and Celestia. - verified, err := e.celestiaClient.VerifyDataBlobAtHeight(ctx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") - continue - } + switch namespace { + case "header": + verified, err := e.celestiaClient.VerifyBlobAtHeight(attemptCtx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") + return false + } + if verified { + logger.Info(). + Uint64("da_height", daHeight). + Dur("duration", time.Since(startTime)). + Msg("header blob verified on Celestia") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } - if verified { - logger.Info(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("data blob verified on Celestia") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } - logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") + case "data": + if len(blockResultWithBlobs.DataBlob) == 0 { + logger.Info(). + Dur("duration", time.Since(startTime)). + Msg("empty data block - no verification needed") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } - default: - logger.Error().Str("namespace", namespace).Msg("unknown namespace type") + // perform actual verification between bytes from ev-node and Celestia. + verified, err := e.celestiaClient.VerifyDataBlobAtHeight(attemptCtx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") return false } + if verified { + logger.Info(). + Uint64("da_height", daHeight). + Dur("duration", time.Since(startTime)). + Msg("data blob verified on Celestia") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } + logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") + + default: + logger.Error().Str("namespace", namespace).Msg("unknown namespace type") + return true } - // if loop completes without success, log final error - logger.Error().Msg("max retries exhausted: failed to verify block") - e.onVerified(m, namespace, blockHeight, 0, false, 0) - return true + return false } From 583c45dc783df4f9965aa8f064af45173f631e20 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:24:24 +0100 Subject: [PATCH 2/3] fix(verifier): address PR review feedback - Extract repeated shutdown sequence into a closure to eliminate duplication across three select branches - Fix stale retry_in log in reconnectSubscription: increment backoff before logging so the logged duration reflects the actual next wait - Add missing warn log in header verification branch when VerifyBlobAtHeight returns verified=false, matching the existing log in the data branch for observability during retries Co-Authored-By: Claude Sonnet 4.6 --- pkg/exporters/verifier/verifier.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 0e2caf2..1698784 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -78,14 +78,20 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error refreshTicker := time.NewTicker(10 * time.Second) defer refreshTicker.Stop() + // shutdown cleanly unsubscribes and waits for all workers to finish. + // It captures sub by reference so reassignment during reconnection is reflected. + shutdown := func() { + sub.Unsubscribe() + close(blockQueue) + workerGroup.Wait() + } + // main subscription loop for { select { case <-ctx.Done(): e.logger.Info().Msg("stopping block verification") - sub.Unsubscribe() - close(blockQueue) - workerGroup.Wait() + shutdown() return nil case subErr := <-sub.Err(): // WebSocket subscription dropped — reconnect with backoff. @@ -124,9 +130,7 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error case blockQueue <- header: // block queued successfully case <-ctx.Done(): - sub.Unsubscribe() - close(blockQueue) - workerGroup.Wait() + shutdown() return nil } } @@ -148,12 +152,12 @@ func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *type sub, err := e.evmClient.SubscribeNewHead(ctx, headers) if err != nil { - e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying") if backoff*2 < maxBackoff { backoff *= 2 } else { backoff = maxBackoff } + e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying") continue } return sub @@ -317,6 +321,7 @@ func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) return true } + logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("header verification failed, will retry") case "data": if len(blockResultWithBlobs.DataBlob) == 0 { From ff9ac2cd2066d08df489e187c7f312cefc9da6be Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Fri, 20 Feb 2026 16:29:32 +0100 Subject: [PATCH 3/3] fix(metrics): pre-initialize submission metrics for both blob types at startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ev_metrics_unsubmitted_blocks_total, submission_duration_seconds, submission_attempts_total, and submission_failures_total only appeared in Prometheus output after onVerified() was first called for a given blob_type. If no data block had completed yet (idle chain, blocks still retrying, or no transaction blocks), the blob_type=data label combination was entirely absent — indistinguishable from a real bug. Add InitializeSubmissionMetrics() called from ExportMetrics at startup: - UnsubmittedBlocksTotal: Set(0) — "0 unsubmitted blocks" is meaningful - SubmissionAttemptsTotal/FailuresTotal: Add(0) — registers the counter - SubmissionDuration: GetMetricWithLabelValues() — registers the Summary without a fake observation; quantiles show NaN and count/sum show 0, accurately representing "metric exists, no data observed yet" Co-Authored-By: Claude Sonnet 4.6 --- pkg/exporters/verifier/verifier.go | 4 ++++ pkg/metrics/metrics.go | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 1698784..537797e 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -74,6 +74,10 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error e.logger.Info().Int("workers", e.workers).Msg("started verification work pool") + // pre-initialize submission metrics so both blob types are always visible + // in Prometheus output, even before any block has been fully processed. + m.InitializeSubmissionMetrics(e.chainID) + // ticker to refresh submission duration metric every 10 seconds refreshTicker := time.NewTicker(10 * time.Second) defer refreshTicker.Stop() diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 8f2773f..be78992 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -701,6 +701,25 @@ func (m *Metrics) RecordJsonRpcRequestDuration(chainID string, duration time.Dur m.JsonRpcRequestDurationSummary.WithLabelValues(chainID).Observe(duration.Seconds()) } +// InitializeSubmissionMetrics pre-initializes submission-related metrics for all +// known blob types so they are always visible in Prometheus output from startup, +// regardless of whether any blocks have been processed yet. +// +// For Gauges and Counters, the label combination is registered at zero. +// For the SubmissionDuration Summary, GetMetricWithLabelValues registers the +// metric without making a fake observation — quantiles will show NaN and +// count/sum will show 0, which accurately represents "no data yet". +func (m *Metrics) InitializeSubmissionMetrics(chainID string) { + for _, blobType := range []string{"header", "data"} { + m.UnsubmittedBlocksTotal.WithLabelValues(chainID, blobType).Set(0) + m.SubmissionAttemptsTotal.WithLabelValues(chainID, blobType).Add(0) + m.SubmissionFailuresTotal.WithLabelValues(chainID, blobType).Add(0) + // Register the Summary without a fake observation so it is visible + // from startup while keeping quantile values accurate. + _, _ = m.SubmissionDuration.GetMetricWithLabelValues(chainID, blobType) + } +} + // InitializeJsonRpcSloThresholds initializes the constant SLO threshold gauges for JSON-RPC requests func (m *Metrics) InitializeJsonRpcSloThresholds(chainID string) { m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "0.5").Set(0.2)