diff --git a/pkg/exporters/verifier/verifier.go b/pkg/exporters/verifier/verifier.go index 833e628..537797e 100644 --- a/pkg/exporters/verifier/verifier.go +++ b/pkg/exporters/verifier/verifier.go @@ -5,6 +5,7 @@ import ( "sync" "time" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/evstack/ev-metrics/internal/clients/celestia" "github.com/evstack/ev-metrics/internal/clients/evm" @@ -56,7 +57,6 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error if err != nil { return err } - defer sub.Unsubscribe() // create buffered channel for block queue blockQueue := make(chan *types.Header, e.workers*2) @@ -74,18 +74,46 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error e.logger.Info().Int("workers", e.workers).Msg("started verification work pool") + // pre-initialize submission metrics so both blob types are always visible + // in Prometheus output, even before any block has been fully processed. + m.InitializeSubmissionMetrics(e.chainID) + // ticker to refresh submission duration metric every 10 seconds refreshTicker := time.NewTicker(10 * time.Second) defer refreshTicker.Stop() + // shutdown cleanly unsubscribes and waits for all workers to finish. + // It captures sub by reference so reassignment during reconnection is reflected. + shutdown := func() { + sub.Unsubscribe() + close(blockQueue) + workerGroup.Wait() + } + // main subscription loop for { select { case <-ctx.Done(): e.logger.Info().Msg("stopping block verification") - close(blockQueue) - workerGroup.Wait() + shutdown() return nil + case subErr := <-sub.Err(): + // WebSocket subscription dropped — reconnect with backoff. + if subErr != nil { + e.logger.Error().Err(subErr).Msg("WebSocket subscription error, reconnecting") + } else { + e.logger.Warn().Msg("WebSocket subscription closed, reconnecting") + } + sub.Unsubscribe() + newSub := e.reconnectSubscription(ctx, headers) + if newSub == nil { + // context was cancelled during reconnection + close(blockQueue) + workerGroup.Wait() + return nil + } + sub = newSub + e.logger.Info().Msg("WebSocket subscription re-established") case <-refreshTicker.C: // ensure that submission duration is always included in the 60 second window. m.RefreshSubmissionDuration() @@ -106,14 +134,40 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error case blockQueue <- header: // block queued successfully case <-ctx.Done(): - close(blockQueue) - workerGroup.Wait() + shutdown() return nil } } } } +// reconnectSubscription attempts to re-establish the WebSocket block header subscription +// with exponential backoff. Returns nil if the context is cancelled before reconnecting. +func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *types.Header) ethereum.Subscription { + backoff := 5 * time.Second + const maxBackoff = 60 * time.Second + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(backoff): + } + + sub, err := e.evmClient.SubscribeNewHead(ctx, headers) + if err != nil { + if backoff*2 < maxBackoff { + backoff *= 2 + } else { + backoff = maxBackoff + } + e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying") + continue + } + return sub + } +} + // processBlocks processes blocks from the queue func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) { logger := e.logger.With().Int("worker_id", workerID).Logger() @@ -153,6 +207,12 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, } } +// verifyAttemptTimeout caps how long a single verification attempt (all RPC calls +// combined) may take. Without this, a slow or hung Celestia/ev-node endpoint can +// block a worker goroutine indefinitely, eventually filling the block queue and +// freezing metrics. +const verifyAttemptTimeout = 30 * time.Second + // verifyBlock attempts to verify a DA height for a given block status. func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool { blockHeight := header.Number.Uint64() @@ -199,89 +259,103 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * // proceed with retry } - blockResult, err := e.evnodeClient.GetBlock(ctx, blockHeight) - if err != nil { - logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") - continue - } - - daHeight := blockResult.HeaderDaHeight - if namespace == "data" { - daHeight = blockResult.DataDaHeight - } - - if daHeight == 0 { - logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") - continue + if e.verifyAttempt(ctx, m, logger, retries, blockHeight, namespace, blockTime, startTime) { + return false } + } - blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(ctx, blockHeight) - if err != nil { - logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") - continue - } + // if loop completes without success, log final error + logger.Error().Msg("max retries exhausted: failed to verify block") + e.onVerified(m, namespace, blockHeight, 0, false, 0) + return true +} - daBlockTime, err := e.celestiaClient.GetBlockTimestamp(ctx, daHeight) - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") - continue - } +// verifyAttempt performs one bounded RPC attempt to verify a block against Celestia DA. +// It returns true when retrying is no longer needed (verified, or permanent failure), +// and false when the caller should retry. +// Each call is bounded by verifyAttemptTimeout so workers cannot hang indefinitely +// on slow or unresponsive ev-node / Celestia endpoints. +func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger zerolog.Logger, retries int, blockHeight uint64, namespace string, blockTime time.Time, startTime time.Time) bool { + attemptCtx, cancel := context.WithTimeout(ctx, verifyAttemptTimeout) + defer cancel() - // the time taken from block time to DA inclusion time. - submissionDuration := daBlockTime.Sub(blockTime) + blockResult, err := e.evnodeClient.GetBlock(attemptCtx, blockHeight) + if err != nil { + logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") + return false + } - switch namespace { - case "header": - verified, err := e.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) + daHeight := blockResult.HeaderDaHeight + if namespace == "data" { + daHeight = blockResult.DataDaHeight + } - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") - continue - } + if daHeight == 0 { + logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") + return false + } - if verified { - logger.Info(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("header blob verified on Celestia") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } + blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(attemptCtx, blockHeight) + if err != nil { + logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") + return false + } - case "data": - if len(blockResultWithBlobs.DataBlob) == 0 { - logger.Info(). - Dur("duration", time.Since(startTime)). - Msg("empty data block - no verification needed") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } + daBlockTime, err := e.celestiaClient.GetBlockTimestamp(attemptCtx, daHeight) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") + return false + } - // perform actual verification between bytes from ev-node and Celestia. - verified, err := e.celestiaClient.VerifyDataBlobAtHeight(ctx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) - if err != nil { - logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") - continue - } + // the time taken from block time to DA inclusion time. + submissionDuration := daBlockTime.Sub(blockTime) - if verified { - logger.Info(). - Uint64("da_height", daHeight). - Dur("duration", time.Since(startTime)). - Msg("data blob verified on Celestia") - e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) - return false - } - logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") + switch namespace { + case "header": + verified, err := e.celestiaClient.VerifyBlobAtHeight(attemptCtx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") + return false + } + if verified { + logger.Info(). + Uint64("da_height", daHeight). + Dur("duration", time.Since(startTime)). + Msg("header blob verified on Celestia") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } + logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("header verification failed, will retry") + + case "data": + if len(blockResultWithBlobs.DataBlob) == 0 { + logger.Info(). + Dur("duration", time.Since(startTime)). + Msg("empty data block - no verification needed") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } - default: - logger.Error().Str("namespace", namespace).Msg("unknown namespace type") + // perform actual verification between bytes from ev-node and Celestia. + verified, err := e.celestiaClient.VerifyDataBlobAtHeight(attemptCtx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) + if err != nil { + logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") return false } + if verified { + logger.Info(). + Uint64("da_height", daHeight). + Dur("duration", time.Since(startTime)). + Msg("data blob verified on Celestia") + e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) + return true + } + logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") + + default: + logger.Error().Str("namespace", namespace).Msg("unknown namespace type") + return true } - // if loop completes without success, log final error - logger.Error().Msg("max retries exhausted: failed to verify block") - e.onVerified(m, namespace, blockHeight, 0, false, 0) - return true + return false } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 8f2773f..be78992 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -701,6 +701,25 @@ func (m *Metrics) RecordJsonRpcRequestDuration(chainID string, duration time.Dur m.JsonRpcRequestDurationSummary.WithLabelValues(chainID).Observe(duration.Seconds()) } +// InitializeSubmissionMetrics pre-initializes submission-related metrics for all +// known blob types so they are always visible in Prometheus output from startup, +// regardless of whether any blocks have been processed yet. +// +// For Gauges and Counters, the label combination is registered at zero. +// For the SubmissionDuration Summary, GetMetricWithLabelValues registers the +// metric without making a fake observation — quantiles will show NaN and +// count/sum will show 0, which accurately represents "no data yet". +func (m *Metrics) InitializeSubmissionMetrics(chainID string) { + for _, blobType := range []string{"header", "data"} { + m.UnsubmittedBlocksTotal.WithLabelValues(chainID, blobType).Set(0) + m.SubmissionAttemptsTotal.WithLabelValues(chainID, blobType).Add(0) + m.SubmissionFailuresTotal.WithLabelValues(chainID, blobType).Add(0) + // Register the Summary without a fake observation so it is visible + // from startup while keeping quantile values accurate. + _, _ = m.SubmissionDuration.GetMetricWithLabelValues(chainID, blobType) + } +} + // InitializeJsonRpcSloThresholds initializes the constant SLO threshold gauges for JSON-RPC requests func (m *Metrics) InitializeJsonRpcSloThresholds(chainID string) { m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "0.5").Set(0.2)