Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/llo"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/retirement"
ocr3beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper"
ocr3_1beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
Expand All @@ -74,7 +75,12 @@ var (
)

func metricViews() []sdkmetric.View {
return slices.Concat(workflowsmonitoring.MetricViews(), ccvcommon.MetricViews(), ocr3_1beholderwrapper.MetricViews())
return slices.Concat(
workflowsmonitoring.MetricViews(),
ccvcommon.MetricViews(),
ocr3beholderwrapper.MetricViews(),
ocr3_1beholderwrapper.MetricViews(),
)
}

func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTelemetry config.Telemetry, lggr logger.Logger, csaPubKeyHex string, beholderAuthHeaders map[string]string) error {
Expand Down
8 changes: 7 additions & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import (
ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config"
vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
ocr3beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -1037,10 +1038,15 @@ func (d *Delegate) newDonTimePlugin(
OnchainKeyring: onchainKeyringAdapter,
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer),
}
oracleArgs.ReportingPluginFactory, err = dontime.NewFactory(d.dontimeStore, lggr.Named("DonTimePluginFactory"))
baseFactory, err := dontime.NewFactory(d.dontimeStore, lggr.Named("DonTimePluginFactory"))
if err != nil {
return nil, err
}
oracleArgs.ReportingPluginFactory = ocr3beholderwrapper.NewReportingPluginFactory(
baseFactory,
lggr,
"dontime",
)

oracle, err := libocr2.NewOracle(oracleArgs)
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions core/services/ocr3/beholderwrapper/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package beholderwrapper

import (
"context"
"fmt"

sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper/metrics"
)

var _ ocr3types.ReportingPluginFactory[any] = &ReportingPluginFactory[any]{}

type ReportingPluginFactory[RI any] struct {
wrapped ocr3types.ReportingPluginFactory[RI]
lggr logger.Logger
plugin string
}

func NewReportingPluginFactory[RI any](
wrapped ocr3types.ReportingPluginFactory[RI],
lggr logger.Logger,
plugin string,
) *ReportingPluginFactory[RI] {
return &ReportingPluginFactory[RI]{
wrapped: wrapped,
lggr: lggr,
plugin: plugin,
}
}

func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
plugin, info, err := r.wrapped.NewReportingPlugin(ctx, config)
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, err
}

m, err := metrics.NewPluginMetrics(MetricPrefix, r.plugin, config.ConfigDigest.String())
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("failed to create plugin metrics: %w", err)
}

r.lggr.Infow("Wrapping OCR3 ReportingPlugin with beholder metrics reporter",
"configDigest", config.ConfigDigest,
"oracleID", config.OracleID,
)

wrappedPlugin := newReportingPlugin(plugin, m)
return wrappedPlugin, info, nil
}

// MetricViews returns the histogram bucket views for registration with beholder
func MetricViews() []sdkmetric.View {
return metrics.MetricViews(MetricPrefix)
}
52 changes: 52 additions & 0 deletions core/services/ocr3/beholderwrapper/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package beholderwrapper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_WrapperFactory(t *testing.T) {
validFactory := NewReportingPluginFactory[uint](
&fakeFactory[uint]{},
logger.TestLogger(t),
"plugin",
)
failingFactory := NewReportingPluginFactory[uint](
&fakeFactory[uint]{err: errors.New("error")},
logger.TestLogger(t),
"plugin",
)

plugin, _, err := validFactory.NewReportingPlugin(t.Context(), ocr3types.ReportingPluginConfig{})
require.NoError(t, err)

// Verify the wrapped plugin works
_, err = plugin.Outcome(t.Context(), ocr3types.OutcomeContext{}, nil, nil)
require.NoError(t, err)

_, _, err = failingFactory.NewReportingPlugin(t.Context(), ocr3types.ReportingPluginConfig{})
require.Error(t, err)
}

func Test_MetricViews(t *testing.T) {
views := MetricViews()
require.Len(t, views, 2)
}

type fakeFactory[RI any] struct {
err error
}

func (f *fakeFactory[RI]) NewReportingPlugin(context.Context, ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
if f.err != nil {
return nil, ocr3types.ReportingPluginInfo{}, f.err
}
return &fakePlugin[RI]{}, ocr3types.ReportingPluginInfo{}, nil
}
139 changes: 139 additions & 0 deletions core/services/ocr3/beholderwrapper/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package metrics

import (
"context"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

// FunctionType represents the OCR plugin function being measured
type FunctionType string

const (
Query FunctionType = "query"
Observation FunctionType = "observation"
ValidateObservation FunctionType = "validateObservation"
// OCR3 specific
Outcome FunctionType = "outcome"
// OCR3.1 specific
ObservationQuorum FunctionType = "observationQuorum"
StateTransition FunctionType = "stateTransition"
Committed FunctionType = "committed"
// Common
Reports FunctionType = "reports"
ShouldAccept FunctionType = "shouldAccept"
ShouldTransmit FunctionType = "shouldTransmit"
)

// PluginMetrics holds OTEL metrics for OCR plugin instrumentation
type PluginMetrics struct {
plugin string
configDigest string

durations metric.Int64Histogram
reportsGenerated metric.Int64Counter
sizes metric.Int64Histogram
status metric.Int64Gauge
}

// NewPluginMetrics creates metrics with the given prefix (e.g., "platform_ocr3_reporting_plugin" or "platform_ocr3_1_reporting_plugin")
func NewPluginMetrics(metricPrefix, plugin, configDigest string) (*PluginMetrics, error) {
durations, err := beholder.GetMeter().Int64Histogram(metricPrefix+"_duration_ms", metric.WithUnit("ms"))
if err != nil {
return nil, fmt.Errorf("failed to create duration histogram: %w", err)
}

reportsGenerated, err := beholder.GetMeter().Int64Counter(metricPrefix+"_reports_processed", metric.WithUnit("1"))
if err != nil {
return nil, fmt.Errorf("failed to create reports counter: %w", err)
}

sizes, err := beholder.GetMeter().Int64Histogram(metricPrefix+"_data_sizes", metric.WithUnit("By"))
if err != nil {
return nil, fmt.Errorf("failed to create sizes histogram: %w", err)
}

status, err := beholder.GetMeter().Int64Gauge(metricPrefix + "_status")
if err != nil {
return nil, fmt.Errorf("failed to create status gauge: %w", err)
}

return &PluginMetrics{
plugin: plugin,
configDigest: configDigest,
durations: durations,
reportsGenerated: reportsGenerated,
sizes: sizes,
status: status,
}, nil
}

// RecordDuration records the duration of a function execution
func (m *PluginMetrics) RecordDuration(ctx context.Context, function FunctionType, d time.Duration, success bool) {
m.durations.Record(ctx, d.Milliseconds(), metric.WithAttributes(
attribute.String("plugin", m.plugin),
attribute.String("function", string(function)),
attribute.String("success", strconv.FormatBool(success)),
attribute.String("configDigest", m.configDigest),
))
}

// TrackReports increments the reports processed counter
func (m *PluginMetrics) TrackReports(ctx context.Context, function FunctionType, count int, success bool) {
m.reportsGenerated.Add(ctx, int64(count), metric.WithAttributes(
attribute.String("plugin", m.plugin),
attribute.String("function", string(function)),
attribute.String("success", strconv.FormatBool(success)),
attribute.String("configDigest", m.configDigest),
))
}

// TrackSize records the size of data produced
func (m *PluginMetrics) TrackSize(ctx context.Context, function FunctionType, size int) {
m.sizes.Record(ctx, int64(size), metric.WithAttributes(
attribute.String("plugin", m.plugin),
attribute.String("function", string(function)),
attribute.String("configDigest", m.configDigest),
))
}

// UpdateStatus updates the plugin status gauge (1 = up, 0 = down)
func (m *PluginMetrics) UpdateStatus(ctx context.Context, up bool) {
val := int64(0)
if up {
val = 1
}
m.status.Record(ctx, val, metric.WithAttributes(
attribute.String("plugin", m.plugin),
attribute.String("configDigest", m.configDigest),
))
}

// MetricViews returns histogram bucket definitions for the given metric prefix.
// Note: due to the OTEL specification, all histogram buckets must be defined when the beholder client is created.
func MetricViews(metricPrefix string) []sdkmetric.View {
return []sdkmetric.View{
sdkmetric.NewView(
sdkmetric.Instrument{Name: metricPrefix + "_duration_ms"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
// 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960
Boundaries: prometheus.ExponentialBuckets(5, 2, 14),
}},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: metricPrefix + "_data_sizes"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
// 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1024KB, 2048KB, 4096KB, 8192KB
Boundaries: prometheus.ExponentialBuckets(1024, 2, 14),
}},
),
}
}
76 changes: 76 additions & 0 deletions core/services/ocr3/beholderwrapper/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package metrics

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestNewPluginMetrics(t *testing.T) {
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
require.NoError(t, err)
require.NotNil(t, metrics)
}

func TestRecordDuration(t *testing.T) {
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
require.NoError(t, err)

// Should not panic and should complete without error
metrics.RecordDuration(context.Background(), Query, 100*time.Millisecond, true)
metrics.RecordDuration(context.Background(), Observation, 200*time.Millisecond, false)
metrics.RecordDuration(context.Background(), ValidateObservation, 50*time.Millisecond, true)
metrics.RecordDuration(context.Background(), Outcome, 150*time.Millisecond, true)
metrics.RecordDuration(context.Background(), Reports, 75*time.Millisecond, true)
metrics.RecordDuration(context.Background(), ShouldAccept, 10*time.Millisecond, true)
metrics.RecordDuration(context.Background(), ShouldTransmit, 5*time.Millisecond, false)
}

func TestTrackReports(t *testing.T) {
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
require.NoError(t, err)

// Should not panic and should complete without error
metrics.TrackReports(context.Background(), Reports, 5, true)
metrics.TrackReports(context.Background(), ShouldAccept, 1, true)
metrics.TrackReports(context.Background(), ShouldTransmit, 0, false)
}

func TestTrackSize(t *testing.T) {
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
require.NoError(t, err)

// Should not panic and should complete without error
metrics.TrackSize(context.Background(), Observation, 1024)
metrics.TrackSize(context.Background(), Outcome, 2048)
}

func TestUpdateStatus(t *testing.T) {
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
require.NoError(t, err)

// Should not panic and should complete without error
metrics.UpdateStatus(context.Background(), true)
metrics.UpdateStatus(context.Background(), false)
}

func TestMetricViews(t *testing.T) {
views := MetricViews("platform_ocr3_reporting_plugin")
require.Len(t, views, 2)
}

func TestFunctionTypeConstants(t *testing.T) {
// Verify all expected function types exist
require.Equal(t, Query, FunctionType("query"))
require.Equal(t, Observation, FunctionType("observation"))
require.Equal(t, ValidateObservation, FunctionType("validateObservation"))
require.Equal(t, Outcome, FunctionType("outcome"))
require.Equal(t, ObservationQuorum, FunctionType("observationQuorum"))
require.Equal(t, StateTransition, FunctionType("stateTransition"))
require.Equal(t, Committed, FunctionType("committed"))
require.Equal(t, Reports, FunctionType("reports"))
require.Equal(t, ShouldAccept, FunctionType("shouldAccept"))
require.Equal(t, ShouldTransmit, FunctionType("shouldTransmit"))
}
Loading
Loading