Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions metrics/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ func TestNewRPCClientMetrics(t *testing.T) {
require.NotNil(t, m)

ctx := context.Background()
const url = "http://localhost:8545"
m.RecordRequest(ctx, url, false, "latest_block", 100*time.Millisecond, nil)
m.RecordRequest(ctx, url, true, "latest_block", 50*time.Millisecond, errors.New("rpc error"))
m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 100*time.Millisecond, nil)
m.RecordRequest(ctx, "http://localhost:8545", false, "latest_block", 50*time.Millisecond, errors.New("rpc error"))
}

func TestNoopRPCClientMetrics_RecordRequest(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion multinode/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_model v0.6.2
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.1
)
Expand Down
4 changes: 2 additions & 2 deletions multinode/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf
github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9 h1:GK+2aFpW/Z5ZnMGCa9NU6o7LKHQ/9xJVZx2yMAMudnc=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260310180305-3ee91a6d9ae9/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b h1:L1So1EDBDRET3j/TdV1Gjv3qWARoa/NPRaU7k4r30yA=
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260401162955-be2bc6b5264b/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4=
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY=
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU=
github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74=
Expand Down
37 changes: 34 additions & 3 deletions multinode/rpc_client_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics"
)

var errInvalidHead = errors.New("invalid head")

const (
rpcCallNameLatestBlock = "latest_block"
rpcCallNameLatestFinalizedBlock = "latest_finalized_block"
rpcMetricsDefaultURL = ""
rpcMetricsDefaultIsSendOnly = false
)

type RPCClientBaseConfig interface {
Expand Down Expand Up @@ -46,22 +56,27 @@ type RPCClientBase[HEAD Head] struct {
highestUserObservations ChainInfo
// most recent chain info observed during current lifecycle
latestChainInfo ChainInfo

rpcMetrics frameworkmetrics.RPCClientMetrics
}

func NewRPCClientBase[HEAD Head](
cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context) (HEAD, error),
latestFinalizedBlock func(ctx context.Context) (HEAD, error),
rpcMetrics frameworkmetrics.RPCClientMetrics,
) *RPCClientBase[HEAD] {
return &RPCClientBase[HEAD]{
base := &RPCClientBase[HEAD]{
cfg: cfg,
log: log,
ctxTimeout: ctxTimeout,
latestBlock: latestBlock,
latestFinalizedBlock: latestFinalizedBlock,
subs: make(map[Subscription]struct{}),
lifeCycleCh: make(chan struct{}),
rpcMetrics: rpcMetrics,
}
return base
}

func (m *RPCClientBase[HEAD]) lenSubs() int {
Expand Down Expand Up @@ -155,37 +170,53 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture lifeCycleCh to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()
start := time.Now()

head, err := m.latestBlock(ctx)
if err != nil {
m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, err)
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, errInvalidHead)
return head, errInvalidHead
}

m.recordRPCRequest(ctx, rpcCallNameLatestBlock, start, nil)
m.OnNewHead(ctx, lifeCycleCh, head)
return head, nil
}

func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()
start := time.Now()

head, err := m.latestFinalizedBlock(ctx)
if err != nil {
m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, err)
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, errInvalidHead)
return head, errInvalidHead
}

m.recordRPCRequest(ctx, rpcCallNameLatestFinalizedBlock, start, nil)
m.OnNewFinalizedHead(ctx, lifeCycleCh, head)
return head, nil
}

func (m *RPCClientBase[HEAD]) recordRPCRequest(ctx context.Context, callName string, startedAt time.Time, err error) {
if m.rpcMetrics == nil {
return
}

m.rpcMetrics.RecordRequest(ctx, rpcMetricsDefaultURL, rpcMetricsDefaultIsSendOnly, callName, time.Since(startedAt), err)
}

func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
Expand Down
148 changes: 132 additions & 16 deletions multinode/rpc_client_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multinode

import (
"context"
"errors"
"math/big"
"testing"
"time"
Expand All @@ -10,7 +11,7 @@ import (

common "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
frameworkmetrics "github.com/smartcontractkit/chainlink-framework/metrics"
"github.com/smartcontractkit/chainlink-framework/multinode/config"
)

Expand All @@ -36,7 +37,7 @@ type testHead struct {
func (t *testHead) BlockNumber() int64 { return t.blockNumber }
func (t *testHead) BlockDifficulty() *big.Int { return nil }
func (t *testHead) GetTotalDifficulty() *big.Int { return nil }
func (t *testHead) IsValid() bool { return true }
func (t *testHead) IsValid() bool { return t != nil && t.blockNumber > 0 }

func ptr[T any](t T) *T {
return &t
Expand Down Expand Up @@ -67,18 +68,38 @@ func newTestRPC(t *testing.T) *testRPC {
}

rpc := &testRPC{}
rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock)
rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock, nil)
t.Cleanup(rpc.Close)
return rpc
}

type recordedRPCRequest struct {
callName string
latency time.Duration
err error
}

type spyRPCClientMetrics struct {
requests []recordedRPCRequest
}

var _ frameworkmetrics.RPCClientMetrics = (*spyRPCClientMetrics)(nil)

func (s *spyRPCClientMetrics) RecordRequest(_ context.Context, _ string, _ bool, callName string, latency time.Duration, err error) {
s.requests = append(s.requests, recordedRPCRequest{
callName: callName,
latency: latency,
err: err,
})
}

func TestAdapter_LatestBlock(t *testing.T) {
t.Run("LatestBlock", func(t *testing.T) {
rpc := newTestRPC(t)
latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo()
require.Equal(t, int64(0), latestChainInfo.BlockNumber)
require.Equal(t, int64(0), highestChainInfo.BlockNumber)
head, err := rpc.LatestBlock(tests.Context(t))
head, err := rpc.LatestBlock(t.Context())
require.NoError(t, err)
require.True(t, head.IsValid())
latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo()
Expand All @@ -91,7 +112,7 @@ func TestAdapter_LatestBlock(t *testing.T) {
latestChainInfo, highestChainInfo := rpc.GetInterceptedChainInfo()
require.Equal(t, int64(0), latestChainInfo.FinalizedBlockNumber)
require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber)
finalizedHead, err := rpc.LatestFinalizedBlock(tests.Context(t))
finalizedHead, err := rpc.LatestFinalizedBlock(t.Context())
require.NoError(t, err)
require.True(t, finalizedHead.IsValid())
latestChainInfo, highestChainInfo = rpc.GetInterceptedChainInfo()
Expand All @@ -100,6 +121,101 @@ func TestAdapter_LatestBlock(t *testing.T) {
})
}

func TestRPCClientBase_RecordsRPCMetrics(t *testing.T) {
requestTimeout := 5 * time.Second
lggr := logger.Test(t)
cfg := &config.MultiNodeConfig{
MultiNode: config.MultiNode{
Enabled: ptr(true),
PollFailureThreshold: ptr(uint32(5)),
PollInterval: common.MustNewDuration(15 * time.Second),
SelectionMode: ptr(NodeSelectionModePriorityLevel),
SyncThreshold: ptr(uint32(10)),
LeaseDuration: common.MustNewDuration(time.Minute),
NodeIsSyncingEnabled: ptr(false),
NewHeadsPollInterval: common.MustNewDuration(5 * time.Second),
FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second),
EnforceRepeatableRead: ptr(true),
DeathDeclarationDelay: common.MustNewDuration(20 * time.Second),
NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second),
NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second),
FinalityTagEnabled: ptr(true),
FinalityDepth: ptr(uint32(0)),
FinalizedBlockOffset: ptr(uint32(50)),
},
}

t.Run("records successful latest block requests", func(t *testing.T) {
spy := &spyRPCClientMetrics{}
rpc := NewRPCClientBase[*testHead](
cfg,
requestTimeout,
lggr,
func(context.Context) (*testHead, error) {
return &testHead{blockNumber: 7}, nil
},
func(context.Context) (*testHead, error) {
return &testHead{blockNumber: 8}, nil
},
spy,
)

head, err := rpc.LatestBlock(t.Context())
require.NoError(t, err)
require.Equal(t, int64(7), head.BlockNumber())
require.Len(t, spy.requests, 1)
require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName)
require.NoError(t, spy.requests[0].err)
require.Positive(t, spy.requests[0].latency)
})

t.Run("records failed finalized block requests", func(t *testing.T) {
spy := &spyRPCClientMetrics{}
expectedErr := errors.New("boom")
rpc := NewRPCClientBase[*testHead](
cfg,
requestTimeout,
lggr,
func(context.Context) (*testHead, error) {
return &testHead{blockNumber: 7}, nil
},
func(context.Context) (*testHead, error) {
return nil, expectedErr
},
spy,
)

_, err := rpc.LatestFinalizedBlock(t.Context())
require.ErrorIs(t, err, expectedErr)
require.Len(t, spy.requests, 1)
require.Equal(t, rpcCallNameLatestFinalizedBlock, spy.requests[0].callName)
require.ErrorIs(t, spy.requests[0].err, expectedErr)
require.Positive(t, spy.requests[0].latency)
})

t.Run("records invalid heads as failed requests", func(t *testing.T) {
spy := &spyRPCClientMetrics{}
rpc := NewRPCClientBase[*testHead](
cfg,
requestTimeout,
lggr,
func(context.Context) (*testHead, error) {
return &testHead{}, nil
},
func(context.Context) (*testHead, error) {
return &testHead{blockNumber: 8}, nil
},
spy,
)

_, err := rpc.LatestBlock(t.Context())
require.ErrorIs(t, err, errInvalidHead)
require.Len(t, spy.requests, 1)
require.Equal(t, rpcCallNameLatestBlock, spy.requests[0].callName)
require.ErrorIs(t, spy.requests[0].err, errInvalidHead)
})
}

func TestAdapter_OnNewHeadFunctions(t *testing.T) {
timeout := 10 * time.Second
t.Run("OnNewHead and OnNewFinalizedHead updates chain info", func(t *testing.T) {
Expand All @@ -110,7 +226,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) {
require.Equal(t, int64(0), highestChainInfo.BlockNumber)
require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber)

ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout)
ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout)
defer cancel()
rpc.OnNewHead(ctx, lifeCycleCh, &testHead{blockNumber: 10})
rpc.OnNewFinalizedHead(ctx, lifeCycleCh, &testHead{blockNumber: 3})
Expand All @@ -132,7 +248,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) {
require.Equal(t, int64(0), highestChainInfo.BlockNumber)
require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber)

healthCheckCtx := CtxAddHealthCheckFlag(tests.Context(t))
healthCheckCtx := CtxAddHealthCheckFlag(t.Context())

ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(healthCheckCtx, timeout)
defer cancel()
Expand All @@ -158,7 +274,7 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) {
require.Equal(t, int64(0), highestChainInfo.BlockNumber)
require.Equal(t, int64(0), highestChainInfo.FinalizedBlockNumber)

ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(tests.Context(t), timeout)
ctx, cancel, lifeCycleCh := rpc.AcquireQueryCtx(t.Context(), timeout)
defer cancel()
rpc.CancelLifeCycle()

Expand All @@ -180,11 +296,11 @@ func TestAdapter_OnNewHeadFunctions(t *testing.T) {
func TestAdapter_HeadSubscriptions(t *testing.T) {
t.Run("SubscribeToHeads", func(t *testing.T) {
rpc := newTestRPC(t)
ch, sub, err := rpc.SubscribeToHeads(tests.Context(t))
ch, sub, err := rpc.SubscribeToHeads(t.Context())
require.NoError(t, err)
defer sub.Unsubscribe()

ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute)
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
defer cancel()
select {
case head := <-ch:
Expand All @@ -197,11 +313,11 @@ func TestAdapter_HeadSubscriptions(t *testing.T) {

t.Run("SubscribeToFinalizedHeads", func(t *testing.T) {
rpc := newTestRPC(t)
finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t))
finalizedCh, finalizedSub, err := rpc.SubscribeToFinalizedHeads(t.Context())
require.NoError(t, err)
defer finalizedSub.Unsubscribe()

ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute)
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
defer cancel()
select {
case finalizedHead := <-finalizedCh:
Expand All @@ -214,10 +330,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) {

t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) {
rpc := newTestRPC(t)
_, sub1, err := rpc.SubscribeToHeads(tests.Context(t))
_, sub1, err := rpc.SubscribeToHeads(t.Context())
require.NoError(t, err)
require.Equal(t, 1, rpc.lenSubs())
_, sub2, err := rpc.SubscribeToFinalizedHeads(tests.Context(t))
_, sub2, err := rpc.SubscribeToFinalizedHeads(t.Context())
require.NoError(t, err)
require.Equal(t, 2, rpc.lenSubs())

Expand All @@ -229,10 +345,10 @@ func TestAdapter_HeadSubscriptions(t *testing.T) {

t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) {
rpc := newTestRPC(t)
_, _, err := rpc.SubscribeToHeads(tests.Context(t))
_, _, err := rpc.SubscribeToHeads(t.Context())
require.NoError(t, err)
require.Equal(t, 1, rpc.lenSubs())
_, _, err = rpc.SubscribeToFinalizedHeads(tests.Context(t))
_, _, err = rpc.SubscribeToFinalizedHeads(t.Context())
require.NoError(t, err)
require.Equal(t, 2, rpc.lenSubs())
rpc.UnsubscribeAllExcept()
Expand Down
Loading