From eb79dc6e960a91b96091815113f056adf731ff5e Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 16:56:59 -0700 Subject: [PATCH 1/6] use metrics.RPCClientMetrics in multinode --- multinode/go.mod | 2 +- multinode/go.sum | 4 +- multinode/rpc_client_base.go | 47 +++++++++- multinode/rpc_client_base_test.go | 139 +++++++++++++++++++++++++++++- 4 files changed, 185 insertions(+), 7 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index c18efc1..fb1b203 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 459485f..ee25ebf 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13 h1:Homq1KxVUoL1rEtEv1N+BL0JJdMdQcDBnJw53vn+/qY= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260326180413-c69f27e37a13/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..20eb978 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -9,6 +9,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" +) + +var errInvalidHead = errors.New("invalid head") + +const ( + rpcCallNameLatestBlock = "latest_block" + rpcCallNameLatestFinalizedBlock = "latest_finalized_block" ) type RPCClientBaseConfig interface { @@ -16,6 +24,12 @@ type RPCClientBaseConfig interface { FinalizedBlockPollInterval() time.Duration } +type RPCClientBaseMetricsConfig struct { + RPCClientMetrics frameworkmetrics.RPCClientMetrics + RPCURL string + IsSendOnly bool +} + // RPCClientBase is used to integrate multinode into chain-specific clients. // For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase // to get the required RPCClient methods and enable the use of MultiNode. @@ -46,14 +60,19 @@ type RPCClientBase[HEAD Head] struct { highestUserObservations ChainInfo // most recent chain info observed during current lifecycle latestChainInfo ChainInfo + + rpcMetrics frameworkmetrics.RPCClientMetrics + rpcURL string + isSendOnly bool } func NewRPCClientBase[HEAD Head]( cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context) (HEAD, error), latestFinalizedBlock func(ctx context.Context) (HEAD, error), + rpcMetrics *RPCClientBaseMetricsConfig, ) *RPCClientBase[HEAD] { - return &RPCClientBase[HEAD]{ + base := &RPCClientBase[HEAD]{ cfg: cfg, log: log, ctxTimeout: ctxTimeout, @@ -62,6 +81,12 @@ func NewRPCClientBase[HEAD Head]( subs: make(map[Subscription]struct{}), lifeCycleCh: make(chan struct{}), } + if rpcMetrics != nil { + base.rpcMetrics = rpcMetrics.RPCClientMetrics + base.rpcURL = rpcMetrics.RPCURL + base.isSendOnly = rpcMetrics.IsSendOnly + } + return base } func (m *RPCClientBase[HEAD]) lenSubs() int { @@ -155,16 +180,20 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { // capture lifeCycleCh to ensure we are not updating chainInfo with observations related to previous life cycle ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestBlock(ctx) if err != nil { + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, err) return head, err } if !head.IsValid() { - return head, errors.New("invalid head") + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, errInvalidHead) + return head, errInvalidHead } + m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, nil) m.OnNewHead(ctx, lifeCycleCh, head) return head, nil } @@ -172,20 +201,32 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestFinalizedBlock(ctx) if err != nil { + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, err) return head, err } if !head.IsValid() { - return head, errors.New("invalid head") + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, errInvalidHead) + return head, errInvalidHead } + m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, nil) m.OnNewFinalizedHead(ctx, lifeCycleCh, head) return head, nil } +func (m *RPCClientBase[HEAD]) recordRPCRequest(ctx context.Context, callName string, startedAt time.Time, err error) { + if m.rpcMetrics == nil { + return + } + + m.rpcMetrics.RecordRequest(ctx, m.rpcURL, m.isSendOnly, callName, time.Since(startedAt), err) +} + func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { if !head.IsValid() { return diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 25afd4d..8bd3aa0 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -2,6 +2,7 @@ package multinode import ( "context" + "errors" "math/big" "testing" "time" @@ -11,6 +12,7 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" "github.com/smartcontractkit/chainlink-framework/multinode/config" ) @@ -67,11 +69,35 @@ func newTestRPC(t *testing.T) *testRPC { } rpc := &testRPC{} - rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock) + rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock, nil) t.Cleanup(rpc.Close) return rpc } +type recordedRPCRequest struct { + rpcURL string + isSendOnly bool + callName string + latency time.Duration + err error +} + +type spyRPCClientMetrics struct { + requests []recordedRPCRequest +} + +var _ frameworkmetrics.RPCClientMetrics = (*spyRPCClientMetrics)(nil) + +func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) { + s.requests = append(s.requests, recordedRPCRequest{ + rpcURL: rpcURL, + isSendOnly: isSendOnly, + callName: callName, + latency: latency, + err: err, + }) +} + func TestAdapter_LatestBlock(t *testing.T) { t.Run("LatestBlock", func(t *testing.T) { rpc := newTestRPC(t) @@ -100,6 +126,117 @@ func TestAdapter_LatestBlock(t *testing.T) { }) } +func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { + requestTimeout := 5 * time.Second + lggr := logger.Test(t) + cfg := &config.MultiNodeConfig{ + MultiNode: config.MultiNode{ + Enabled: ptr(true), + PollFailureThreshold: ptr(uint32(5)), + PollInterval: common.MustNewDuration(15 * time.Second), + SelectionMode: ptr(NodeSelectionModePriorityLevel), + SyncThreshold: ptr(uint32(10)), + LeaseDuration: common.MustNewDuration(time.Minute), + NodeIsSyncingEnabled: ptr(false), + NewHeadsPollInterval: common.MustNewDuration(5 * time.Second), + FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second), + EnforceRepeatableRead: ptr(true), + DeathDeclarationDelay: common.MustNewDuration(20 * time.Second), + NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second), + NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second), + FinalityTagEnabled: ptr(true), + FinalityDepth: ptr(uint32(0)), + FinalizedBlockOffset: ptr(uint32(50)), + }, + } + + t.Run("records successful latest block requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 7}, nil + }, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 8}, nil + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://primary.test", + IsSendOnly: false, + }, + ) + + head, err := rpc.LatestBlock(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, int64(7), head.BlockNumber()) + require.Len(t, spy.requests, 1) + require.Equal(t, "http://primary.test", spy.requests[0].rpcURL) + require.False(t, spy.requests[0].isSendOnly) + require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) + require.NoError(t, spy.requests[0].err) + require.Positive(t, spy.requests[0].latency) + }) + + t.Run("records failed finalized block requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + expectedErr := errors.New("boom") + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 7}, nil + }, + func(context.Context) (*testHead, error) { + return nil, expectedErr + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://sendonly.test", + IsSendOnly: true, + }, + ) + + _, err := rpc.LatestFinalizedBlock(tests.Context(t)) + require.ErrorIs(t, err, expectedErr) + require.Len(t, spy.requests, 1) + require.Equal(t, "http://sendonly.test", spy.requests[0].rpcURL) + require.True(t, spy.requests[0].isSendOnly) + require.Equal(t, rpcCallNameLatestFinalizedBlock, spy.requests[0].callName) + require.ErrorIs(t, spy.requests[0].err, expectedErr) + require.Positive(t, spy.requests[0].latency) + }) + + t.Run("records invalid heads as failed requests", func(t *testing.T) { + spy := &spyRPCClientMetrics{} + rpc := NewRPCClientBase[*testHead]( + cfg, + requestTimeout, + lggr, + func(context.Context) (*testHead, error) { + return &testHead{}, nil + }, + func(context.Context) (*testHead, error) { + return &testHead{blockNumber: 8}, nil + }, + &RPCClientBaseMetricsConfig{ + RPCClientMetrics: spy, + RPCURL: "http://invalid.test", + IsSendOnly: false, + }, + ) + + _, err := rpc.LatestBlock(tests.Context(t)) + require.ErrorIs(t, err, errInvalidHead) + require.Len(t, spy.requests, 1) + require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) + require.ErrorIs(t, spy.requests[0].err, errInvalidHead) + }) +} + func TestAdapter_OnNewHeadFunctions(t *testing.T) { timeout := 10 * time.Second t.Run("OnNewHead and OnNewFinalizedHead updates chain info", func(t *testing.T) { From 77245f905242f63fed9122341fe77edcde869d41 Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 17:06:54 -0700 Subject: [PATCH 2/6] update --- multinode/rpc_client_base_test.go | 33 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 8bd3aa0..02e38f7 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -11,7 +11,6 @@ import ( common "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics" "github.com/smartcontractkit/chainlink-framework/multinode/config" ) @@ -104,7 +103,7 @@ func TestAdapter_LatestBlock(t *testing.T) { latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo() require.Equal(t, int64(0), latestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.BlockNumber) - head, err := rpc.LatestBlock(tests.Context(t)) + head, err := rpc.LatestBlock(t.Context()) require.NoError(t, err) require.True(t, head.IsValid()) latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo() @@ -117,7 +116,7 @@ func TestAdapter_LatestBlock(t *testing.T) { latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo() require.Equal(t, int64(0), latestChainInfo.FinalizedBlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - finalizedHead, err := rpc.LatestFinalizedBlock(tests.Context(t)) + finalizedHead, err := rpc.LatestFinalizedBlock(t.Context()) require.NoError(t, err) require.True(t, finalizedHead.IsValid()) latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo() @@ -169,7 +168,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - head, err := rpc.LatestBlock(tests.Context(t)) + head, err := rpc.LatestBlock(t.Context()) require.NoError(t, err) require.Equal(t, int64(7), head.BlockNumber()) require.Len(t, spy.requests, 1) @@ -200,7 +199,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - _, err := rpc.LatestFinalizedBlock(tests.Context(t)) + _, err := rpc.LatestFinalizedBlock(t.Context()) require.ErrorIs(t, err, expectedErr) require.Len(t, spy.requests, 1) require.Equal(t, "http://sendonly.test", spy.requests[0].rpcURL) @@ -229,7 +228,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { }, ) - _, err := rpc.LatestBlock(tests.Context(t)) + _, err := rpc.LatestBlock(t.Context()) require.ErrorIs(t, err, errInvalidHead) require.Len(t, spy.requests, 1) require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) @@ -247,7 +246,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout) + ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout) defer cancel() rpc.OnNewHead(ctx, lifeCycleCh, &testHead{blockNumber: 10}) rpc.OnNewFinalizedHead(ctx, lifeCycleCh, &testHead{blockNumber: 3}) @@ -269,7 +268,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - healthCheckCtx := CtxAddHealthCheckFlag(tests.Context(t)) + healthCheckCtx := CtxAddHealthCheckFlag(t.Context()) ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(healthCheckCtx, timeout) defer cancel() @@ -295,7 +294,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { require.Equal(t, int64(0), highestChainInfo.BlockNumber) require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber) - ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout) + ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout) defer cancel() rpc.CancelLifeCycle() @@ -317,11 +316,11 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) { func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("SubscribeToHeads", func(t *testing.T) { rpc := newTestRPC(t) - ch, sub, err := rpc.SubscribeToHeads(tests.Context(t)) + ch, sub, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) defer sub.Unsubscribe() - ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() select { case head := <-ch: @@ -334,11 +333,11 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("SubscribeToFinalizedHeads", func(t *testing.T) { rpc := newTestRPC(t) - finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) defer finalizedSub.Unsubscribe() - ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() select { case finalizedHead := <-finalizedCh: @@ -351,10 +350,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) { rpc := newTestRPC(t) - _, sub1, err := rpc.SubscribeToHeads(tests.Context(t)) + _, sub1, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) require.Equal(t, 1, rpc.lenSubs()) - _, sub2, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + _, sub2, err := rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) require.Equal(t, 2, rpc.lenSubs()) @@ -366,10 +365,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) { t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) { rpc := newTestRPC(t) - _, _, err := rpc.SubscribeToHeads(tests.Context(t)) + _, _, err := rpc.SubscribeToHeads(t.Context()) require.NoError(t, err) require.Equal(t, 1, rpc.lenSubs()) - _, _, err = rpc.SubscribeToFinalizedHeads(tests.Context(t)) + _, _, err = rpc.SubscribeToFinalizedHeads(t.Context()) require.NoError(t, err) require.Equal(t, 2, rpc.lenSubs()) rpc.UnsubscribeAllExcept() From 3bc35104e5a6fd96fb98da26314acac3df3585be Mon Sep 17 00:00:00 2001 From: Larry Li Date: Thu, 26 Mar 2026 22:08:59 -0700 Subject: [PATCH 3/6] update --- multinode/rpc_client_base_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 02e38f7..a81979a 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -37,7 +37,7 @@ type testHead struct { func (t *testHead) BlockNumber() int64 { return t.blockNumber } func (t *testHead) BlockDifficulty() *big.Int { return nil } func (t *testHead) GetTotalDifficulty() *big.Int { return nil } -func (t *testHead) IsValid() bool { return true } +func (t *testHead) IsValid() bool { return t != nil && t.blockNumber > 0 } func ptr[T any](t T) *T { return &t From ddabde5f3064b6cc235774047f5f5860734a381d Mon Sep 17 00:00:00 2001 From: Larry Li Date: Fri, 3 Apr 2026 00:54:54 -0700 Subject: [PATCH 4/6] address PR comments --- metrics/client.go | 23 ++++++++++++++--------- metrics/client_test.go | 11 ++++++----- multinode/rpc_client_base.go | 18 +++--------------- multinode/rpc_client_base_test.go | 28 ++++------------------------ 4 files changed, 27 insertions(+), 53 deletions(-) diff --git a/metrics/client.go b/metrics/client.go index 74d823b..06f8a5f 100644 --- a/metrics/client.go +++ b/metrics/client.go @@ -34,12 +34,11 @@ var ( const rpcCallLatencyBeholder = "rpc_call_latency" // RPCClientMetrics records RPC call latency to Prometheus and Beholder (failures: success="false"; same pattern as multinode metrics). -// Construct once per chain (or process) with ChainFamily and ChainID; pass rpcUrl and isSendOnly on each call -// when they vary by node or request. +// Construct once per client with the full stable label set and record requests by call name. type RPCClientMetrics interface { // RecordRequest records latency for an RPC call (observed in nanoseconds for Prometheus and Beholder). // Failures use success="false"; derive error rate from rpc_call_latency_count{success="false"} (or equivalent). - RecordRequest(ctx context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) + RecordRequest(ctx context.Context, callName string, latency time.Duration, err error) } var _ RPCClientMetrics = (*rpcClientMetrics)(nil) @@ -47,13 +46,17 @@ var _ RPCClientMetrics = (*rpcClientMetrics)(nil) type rpcClientMetrics struct { chainFamily string chainID string + rpcURL string + isSendOnly bool latencyHis metric.Float64Histogram } -// RPCClientMetricsConfig holds labels that are fixed for the lifetime of the metrics handle (e.g. one per chain). +// RPCClientMetricsConfig holds labels that are fixed for the lifetime of the metrics handle (e.g. one per client). type RPCClientMetricsConfig struct { ChainFamily string ChainID string + RPCURL string + IsSendOnly bool } // NewRPCClientMetrics creates RPC client metrics that publish to Prometheus and Beholder. @@ -65,24 +68,26 @@ func NewRPCClientMetrics(cfg RPCClientMetricsConfig) (RPCClientMetrics, error) { return &rpcClientMetrics{ chainFamily: cfg.ChainFamily, chainID: cfg.ChainID, + rpcURL: cfg.RPCURL, + isSendOnly: cfg.IsSendOnly, latencyHis: latency, }, nil } -func (m *rpcClientMetrics) RecordRequest(ctx context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) { +func (m *rpcClientMetrics) RecordRequest(ctx context.Context, callName string, latency time.Duration, err error) { successStr := "true" if err != nil { successStr = "false" } - sendStr := strconv.FormatBool(isSendOnly) + sendStr := strconv.FormatBool(m.isSendOnly) latencyNs := float64(latency) - RPCCallLatency.WithLabelValues(m.chainFamily, m.chainID, rpcURL, sendStr, successStr, callName).Observe(latencyNs) + RPCCallLatency.WithLabelValues(m.chainFamily, m.chainID, m.rpcURL, sendStr, successStr, callName).Observe(latencyNs) latAttrs := metric.WithAttributes( attribute.String("chainFamily", m.chainFamily), attribute.String("chainID", m.chainID), - attribute.String("rpcUrl", rpcURL), + attribute.String("rpcUrl", m.rpcURL), attribute.String("isSendOnly", sendStr), attribute.String("success", successStr), attribute.String("rpcCallName", callName), @@ -93,7 +98,7 @@ func (m *rpcClientMetrics) RecordRequest(ctx context.Context, rpcURL string, isS // NoopRPCClientMetrics is a no-op implementation for when metrics are disabled. type NoopRPCClientMetrics struct{} -func (NoopRPCClientMetrics) RecordRequest(context.Context, string, bool, string, time.Duration, error) { +func (NoopRPCClientMetrics) RecordRequest(context.Context, string, time.Duration, error) { } var _ RPCClientMetrics = NoopRPCClientMetrics{} diff --git a/metrics/client_test.go b/metrics/client_test.go index ba407a0..5eb5427 100644 --- a/metrics/client_test.go +++ b/metrics/client_test.go @@ -13,19 +13,20 @@ func TestNewRPCClientMetrics(t *testing.T) { m, err := NewRPCClientMetrics(RPCClientMetricsConfig{ ChainFamily: "evm", ChainID: "1", + RPCURL: "http://localhost:8545", + IsSendOnly: false, }) require.NoError(t, err) require.NotNil(t, m) ctx := context.Background() - const url = "http://localhost:8545" - m.RecordRequest(ctx, url, false, "latest_block", 100*time.Millisecond, nil) - m.RecordRequest(ctx, url, true, "latest_block", 50*time.Millisecond, errors.New("rpc error")) + m.RecordRequest(ctx, "latest_block", 100*time.Millisecond, nil) + m.RecordRequest(ctx, "latest_block", 50*time.Millisecond, errors.New("rpc error")) } func TestNoopRPCClientMetrics_RecordRequest(t *testing.T) { var m NoopRPCClientMetrics ctx := context.Background() - m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 100*time.Millisecond, nil) - m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 50*time.Millisecond, errors.New("rpc error")) + m.RecordRequest(ctx, "latest_block", 100*time.Millisecond, nil) + m.RecordRequest(ctx, "latest_block", 50*time.Millisecond, errors.New("rpc error")) } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index 20eb978..e27072c 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -24,12 +24,6 @@ type RPCClientBaseConfig interface { FinalizedBlockPollInterval() time.Duration } -type RPCClientBaseMetricsConfig struct { - RPCClientMetrics frameworkmetrics.RPCClientMetrics - RPCURL string - IsSendOnly bool -} - // RPCClientBase is used to integrate multinode into chain-specific clients. // For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase // to get the required RPCClient methods and enable the use of MultiNode. @@ -62,15 +56,13 @@ type RPCClientBase[HEAD Head] struct { latestChainInfo ChainInfo rpcMetrics frameworkmetrics.RPCClientMetrics - rpcURL string - isSendOnly bool } func NewRPCClientBase[HEAD Head]( cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context) (HEAD, error), latestFinalizedBlock func(ctx context.Context) (HEAD, error), - rpcMetrics *RPCClientBaseMetricsConfig, + rpcMetrics frameworkmetrics.RPCClientMetrics, ) *RPCClientBase[HEAD] { base := &RPCClientBase[HEAD]{ cfg: cfg, @@ -80,11 +72,7 @@ func NewRPCClientBase[HEAD Head]( latestFinalizedBlock: latestFinalizedBlock, subs: make(map[Subscription]struct{}), lifeCycleCh: make(chan struct{}), - } - if rpcMetrics != nil { - base.rpcMetrics = rpcMetrics.RPCClientMetrics - base.rpcURL = rpcMetrics.RPCURL - base.isSendOnly = rpcMetrics.IsSendOnly + rpcMetrics: rpcMetrics, } return base } @@ -224,7 +212,7 @@ func (m *RPCClientBase[HEAD]) recordRPCRequest(ctx context.Context, callName str return } - m.rpcMetrics.RecordRequest(ctx, m.rpcURL, m.isSendOnly, callName, time.Since(startedAt), err) + m.rpcMetrics.RecordRequest(ctx, callName, time.Since(startedAt), err) } func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index a81979a..2a4fdf6 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -74,8 +74,6 @@ func newTestRPC(t *testing.T) *testRPC { } type recordedRPCRequest struct { - rpcURL string - isSendOnly bool callName string latency time.Duration err error @@ -87,10 +85,8 @@ type spyRPCClientMetrics struct { var _ frameworkmetrics.RPCClientMetrics = (*spyRPCClientMetrics)(nil) -func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) { +func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, callName string, latency time.Duration, err error) { s.requests = append(s.requests, recordedRPCRequest{ - rpcURL: rpcURL, - isSendOnly: isSendOnly, callName: callName, latency: latency, err: err, @@ -161,19 +157,13 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { func(context.Context) (*testHead, error) { return &testHead{blockNumber: 8}, nil }, - &RPCClientBaseMetricsConfig{ - RPCClientMetrics: spy, - RPCURL: "http://primary.test", - IsSendOnly: false, - }, + spy, ) head, err := rpc.LatestBlock(t.Context()) require.NoError(t, err) require.Equal(t, int64(7), head.BlockNumber()) require.Len(t, spy.requests, 1) - require.Equal(t, "http://primary.test", spy.requests[0].rpcURL) - require.False(t, spy.requests[0].isSendOnly) require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName) require.NoError(t, spy.requests[0].err) require.Positive(t, spy.requests[0].latency) @@ -192,18 +182,12 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { func(context.Context) (*testHead, error) { return nil, expectedErr }, - &RPCClientBaseMetricsConfig{ - RPCClientMetrics: spy, - RPCURL: "http://sendonly.test", - IsSendOnly: true, - }, + spy, ) _, err := rpc.LatestFinalizedBlock(t.Context()) require.ErrorIs(t, err, expectedErr) require.Len(t, spy.requests, 1) - require.Equal(t, "http://sendonly.test", spy.requests[0].rpcURL) - require.True(t, spy.requests[0].isSendOnly) require.Equal(t, rpcCallNameLatestFinalizedBlock, spy.requests[0].callName) require.ErrorIs(t, spy.requests[0].err, expectedErr) require.Positive(t, spy.requests[0].latency) @@ -221,11 +205,7 @@ func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) { func(context.Context) (*testHead, error) { return &testHead{blockNumber: 8}, nil }, - &RPCClientBaseMetricsConfig{ - RPCClientMetrics: spy, - RPCURL: "http://invalid.test", - IsSendOnly: false, - }, + spy, ) _, err := rpc.LatestBlock(t.Context()) From f11333c8b36e39273254e1729036c6cd66d75211 Mon Sep 17 00:00:00 2001 From: Larry Li Date: Fri, 3 Apr 2026 01:12:58 -0700 Subject: [PATCH 5/6] update --- multinode/go.mod | 2 +- multinode/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/multinode/go.mod b/multinode/go.mod index 902ee85..fba3b1a 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -7,7 +7,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.6.2 github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7 - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.1 ) diff --git a/multinode/go.sum b/multinode/go.sum index 5c9709e..3ae4ac2 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 h1:GK+2aFpW/Z5ZnMGCa9NU6o7LKHQ/9xJVZx2yMAMudnc= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b h1:L1So1EDBDRET3j/TdV1Gjv3qWARoa/NPRaU7k4r30yA= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= From e17c920b21feccebb46cd1a1f15a583fb0dd5e91 Mon Sep 17 00:00:00 2001 From: Larry Li Date: Fri, 3 Apr 2026 01:25:59 -0700 Subject: [PATCH 6/6] update --- metrics/client.go | 23 +++++++++-------------- metrics/client_test.go | 10 ++++------ multinode/rpc_client_base.go | 4 +++- multinode/rpc_client_base_test.go | 14 +++++++------- 4 files changed, 23 insertions(+), 28 deletions(-) diff --git a/metrics/client.go b/metrics/client.go index 06f8a5f..74d823b 100644 --- a/metrics/client.go +++ b/metrics/client.go @@ -34,11 +34,12 @@ var ( const rpcCallLatencyBeholder = "rpc_call_latency" // RPCClientMetrics records RPC call latency to Prometheus and Beholder (failures: success="false"; same pattern as multinode metrics). -// Construct once per client with the full stable label set and record requests by call name. +// Construct once per chain (or process) with ChainFamily and ChainID; pass rpcUrl and isSendOnly on each call +// when they vary by node or request. type RPCClientMetrics interface { // RecordRequest records latency for an RPC call (observed in nanoseconds for Prometheus and Beholder). // Failures use success="false"; derive error rate from rpc_call_latency_count{success="false"} (or equivalent). - RecordRequest(ctx context.Context, callName string, latency time.Duration, err error) + RecordRequest(ctx context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) } var _ RPCClientMetrics = (*rpcClientMetrics)(nil) @@ -46,17 +47,13 @@ var _ RPCClientMetrics = (*rpcClientMetrics)(nil) type rpcClientMetrics struct { chainFamily string chainID string - rpcURL string - isSendOnly bool latencyHis metric.Float64Histogram } -// RPCClientMetricsConfig holds labels that are fixed for the lifetime of the metrics handle (e.g. one per client). +// RPCClientMetricsConfig holds labels that are fixed for the lifetime of the metrics handle (e.g. one per chain). type RPCClientMetricsConfig struct { ChainFamily string ChainID string - RPCURL string - IsSendOnly bool } // NewRPCClientMetrics creates RPC client metrics that publish to Prometheus and Beholder. @@ -68,26 +65,24 @@ func NewRPCClientMetrics(cfg RPCClientMetricsConfig) (RPCClientMetrics, error) { return &rpcClientMetrics{ chainFamily: cfg.ChainFamily, chainID: cfg.ChainID, - rpcURL: cfg.RPCURL, - isSendOnly: cfg.IsSendOnly, latencyHis: latency, }, nil } -func (m *rpcClientMetrics) RecordRequest(ctx context.Context, callName string, latency time.Duration, err error) { +func (m *rpcClientMetrics) RecordRequest(ctx context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error) { successStr := "true" if err != nil { successStr = "false" } - sendStr := strconv.FormatBool(m.isSendOnly) + sendStr := strconv.FormatBool(isSendOnly) latencyNs := float64(latency) - RPCCallLatency.WithLabelValues(m.chainFamily, m.chainID, m.rpcURL, sendStr, successStr, callName).Observe(latencyNs) + RPCCallLatency.WithLabelValues(m.chainFamily, m.chainID, rpcURL, sendStr, successStr, callName).Observe(latencyNs) latAttrs := metric.WithAttributes( attribute.String("chainFamily", m.chainFamily), attribute.String("chainID", m.chainID), - attribute.String("rpcUrl", m.rpcURL), + attribute.String("rpcUrl", rpcURL), attribute.String("isSendOnly", sendStr), attribute.String("success", successStr), attribute.String("rpcCallName", callName), @@ -98,7 +93,7 @@ func (m *rpcClientMetrics) RecordRequest(ctx context.Context, callName string, l // NoopRPCClientMetrics is a no-op implementation for when metrics are disabled. type NoopRPCClientMetrics struct{} -func (NoopRPCClientMetrics) RecordRequest(context.Context, string, time.Duration, error) { +func (NoopRPCClientMetrics) RecordRequest(context.Context, string, bool, string, time.Duration, error) { } var _ RPCClientMetrics = NoopRPCClientMetrics{} diff --git a/metrics/client_test.go b/metrics/client_test.go index 5eb5427..0e368b9 100644 --- a/metrics/client_test.go +++ b/metrics/client_test.go @@ -13,20 +13,18 @@ func TestNewRPCClientMetrics(t *testing.T) { m, err := NewRPCClientMetrics(RPCClientMetricsConfig{ ChainFamily: "evm", ChainID: "1", - RPCURL: "http://localhost:8545", - IsSendOnly: false, }) require.NoError(t, err) require.NotNil(t, m) ctx := context.Background() - m.RecordRequest(ctx, "latest_block", 100*time.Millisecond, nil) - m.RecordRequest(ctx, "latest_block", 50*time.Millisecond, errors.New("rpc error")) + m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 100*time.Millisecond, nil) + m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 50*time.Millisecond, errors.New("rpc error")) } func TestNoopRPCClientMetrics_RecordRequest(t *testing.T) { var m NoopRPCClientMetrics ctx := context.Background() - m.RecordRequest(ctx, "latest_block", 100*time.Millisecond, nil) - m.RecordRequest(ctx, "latest_block", 50*time.Millisecond, errors.New("rpc error")) + m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 100*time.Millisecond, nil) + m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 50*time.Millisecond, errors.New("rpc error")) } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index c05bc14..1794647 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -17,6 +17,8 @@ var errInvalidHead = errors.New("invalid head") const ( rpcCallNameLatestBlock = "latest_block" rpcCallNameLatestFinalizedBlock = "latest_finalized_block" + rpcMetricsDefaultURL = "" + rpcMetricsDefaultIsSendOnly = false ) type RPCClientBaseConfig interface { @@ -212,7 +214,7 @@ func (m *RPCClientBase[HEAD]) recordRPCRequest(ctx context.Context, callName str return } - m.rpcMetrics.RecordRequest(ctx, callName, time.Since(startedAt), err) + m.rpcMetrics.RecordRequest(ctx, rpcMetricsDefaultURL, rpcMetricsDefaultIsSendOnly, callName, time.Since(startedAt), err) } func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 2a4fdf6..713a2cc 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -74,9 +74,9 @@ func newTestRPC(t *testing.T) *testRPC { } type recordedRPCRequest struct { - callName string - latency time.Duration - err error + callName string + latency time.Duration + err error } type spyRPCClientMetrics struct { @@ -85,11 +85,11 @@ type spyRPCClientMetrics struct { var _ frameworkmetrics.RPCClientMetrics = (*spyRPCClientMetrics)(nil) -func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, callName string, latency time.Duration, err error) { +func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, _ string, _ bool, callName string, latency time.Duration, err error) { s.requests = append(s.requests, recordedRPCRequest{ - callName: callName, - latency: latency, - err: err, + callName: callName, + latency: latency, + err: err, }) }