Skip to content

Commit

Permalink
Flexible schema telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Jan 28, 2025
1 parent 383c026 commit f2e881a
Show file tree
Hide file tree
Showing 25 changed files with 949 additions and 75 deletions.
5 changes: 5 additions & 0 deletions .changeset/hungry-boats-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add telemetry for LLO plugin #added
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5
github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13
github.com/smartcontractkit/libocr v0.0.0-20241223215956-e5b78d8e3919
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8=
github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c=
github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4=
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY=
Expand Down
21 changes: 18 additions & 3 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour
// Observe looks up all streams in the registry and populates a map of stream ID => value
func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error {
now := time.Now()
lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)

if opts.VerboseLogging() {
streamIDs := make([]streams.StreamID, 0, len(streamValues))
for streamID := range streamValues {
streamIDs = append(streamIDs, streamID)
}
sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] })
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
lggr = logger.With(lggr, "streamIDs", streamIDs)
lggr.Debugw("Observing streams")
}

var wg sync.WaitGroup
Expand All @@ -110,7 +112,20 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
var errs []ErrObservationFailed

// oc only lives for the duration of this Observe call
oc := NewObservationContext(d.registry, d.t)
oc := NewObservationContext(lggr, d.registry, d.t)

{
// Size needs to accommodate the max number of telemetry events that could be generated
// Standard case might be about 3 bridge requests per spec and one stream<=>spec
// Overallocate for safety (to avoid dropping packets)
telemCh := d.t.MakeTelemChannel(opts, 10*len(streamValues))
if telemCh != nil {
ctx = pipeline.WithTelemetryCh(ctx, telemCh)
// After all Observations have returned, nothing else will be sent to the
// telemetry channel, so it can safely be closed
defer close(telemCh)
}
}

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
Expand Down Expand Up @@ -153,7 +168,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
failedStreamIDs[i] = e.streamID
}

lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
lggr := logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs)

if opts.VerboseLogging() {
lggr = logger.With(lggr, "streamValues", streamValues)
Expand Down
40 changes: 38 additions & 2 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package llo

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"math/big"
"sort"
"sync"
"testing"
"time"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
Expand All @@ -26,6 +29,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/telem"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
)
Expand Down Expand Up @@ -86,10 +90,14 @@ func (m *mockOpts) OutCtx() ocr3types.OutcomeContext {
func (m *mockOpts) ConfigDigest() ocr2types.ConfigDigest {
return ocr2types.ConfigDigest{6, 5, 4}
}
func (m *mockOpts) ObservationTimestamp() time.Time {
return time.Unix(1737936858, 0)
}

type mockTelemeter struct {
mu sync.Mutex
v3PremiumLegacyPackets []v3PremiumLegacyPacket
ch chan interface{}
}

type v3PremiumLegacyPacket struct {
Expand All @@ -109,6 +117,11 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.
m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err})
}

func (m *mockTelemeter) MakeTelemChannel(opts llo.DSOpts, size int) (ch chan<- interface{}) {
m.ch = make(chan interface{}, size)
return m.ch
}

func Test_DataSource(t *testing.T) {
lggr := logger.TestLogger(t)
reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)}
Expand Down Expand Up @@ -165,7 +178,9 @@ func Test_DataSource(t *testing.T) {

vals := makeStreamValues()
err := ds.Observe(ctx, vals, opts)
assert.NoError(t, err)
require.NoError(t, err)

close(tm.ch)

assert.Equal(t, llo.StreamValues{
2: llo.ToDecimal(decimal.NewFromInt(40602)),
Expand All @@ -184,7 +199,28 @@ func Test_DataSource(t *testing.T) {
assert.Equal(t, 1, int(pkt.streamID))
assert.Equal(t, opts, pkt.opts)
assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String())
assert.Nil(t, pkt.err)
assert.NoError(t, pkt.err)

telems := []interface{}{}
for p := range tm.ch {
telems = append(telems, p)
}
require.Len(t, telems, 3)
sort.Slice(telems, func(i, j int) bool {
return telems[i].(*telem.LLOObservationTelemetry).StreamId < telems[j].(*telem.LLOObservationTelemetry).StreamId
})
require.IsType(t, telems[0], &telem.LLOObservationTelemetry{})
obsTelem := telems[0].(*telem.LLOObservationTelemetry)
assert.Equal(t, uint32(1), obsTelem.StreamId)
assert.Equal(t, int32(llo.LLOStreamValue_Decimal), obsTelem.StreamValueType)
assert.Equal(t, "00000000020885", hex.EncodeToString(obsTelem.StreamValueBinary))
assert.Equal(t, "2181", obsTelem.StreamValueText)
assert.Nil(t, obsTelem.ObservationError)
assert.Equal(t, int64(1737936858000000000), obsTelem.ObservationTimestamp)
assert.Greater(t, obsTelem.ObservationFinishedAt, int64(1737936858000000000))
assert.Equal(t, uint32(0), obsTelem.DonId)
assert.Equal(t, opts.SeqNr(), obsTelem.SeqNr)
assert.Equal(t, opts.ConfigDigest().Hex(), hex.EncodeToString(obsTelem.ConfigDigest))
})

t.Run("records telemetry for errors", func(t *testing.T) {
Expand Down
67 changes: 56 additions & 11 deletions core/services/llo/observation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/shopspring/decimal"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/telem"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand All @@ -33,6 +36,7 @@ type execution struct {
}

type observationContext struct {
l logger.Logger
r Registry
t Telemeter

Expand All @@ -41,16 +45,17 @@ type observationContext struct {
executions map[streams.Pipeline]*execution
}

func NewObservationContext(r Registry, t Telemeter) ObservationContext {
return newObservationContext(r, t)
func NewObservationContext(l logger.Logger, r Registry, t Telemeter) ObservationContext {
return newObservationContext(l, r, t)
}

func newObservationContext(r Registry, t Telemeter) *observationContext {
return &observationContext{r, t, sync.Mutex{}, make(map[streams.Pipeline]*execution)}
func newObservationContext(l logger.Logger, r Registry, t Telemeter) *observationContext {
return &observationContext{l, r, t, sync.Mutex{}, make(map[streams.Pipeline]*execution)}
}

func (oc *observationContext) Observe(ctx context.Context, streamID streams.StreamID, opts llo.DSOpts) (val llo.StreamValue, err error) {
run, trrs, err := oc.run(ctx, streamID)
observationFinishedAt := time.Now()
if err != nil {
// FIXME: This is a hack specific for V3 telemetry, future schemas should
// use a generic stream value telemetry instead
Expand All @@ -59,25 +64,65 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre
return nil, err
}
// Extract stream value based on streamID attribute
found := false
for _, trr := range trrs {
if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID {
val, err = resultToStreamValue(trr.Result.Value)
if err != nil {
return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err)
}
return val, nil
if trr.FinishedAt.Valid {
observationFinishedAt = trr.FinishedAt.Time
}
found = true
break
}
}
// If no streamID attribute is found in the task results, then assume the
// final output is the stream ID and return that. This is safe to do since
// the registry will never return a spec that doesn't match either by tag
// or by spec streamID.

val, err = extractFinalResultAsStreamValue(trrs)
// FIXME: This is a hack specific for V3 telemetry, future schemas should
// use a generic stream value telemetry instead
// https://smartcontract-it.atlassian.net/browse/MERC-6290
oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err)
if !found {
// FIXME: This is a hack specific for V3 telemetry, future schemas should
// use the generic stream value telemetry instead
// https://smartcontract-it.atlassian.net/browse/MERC-6290
val, err = extractFinalResultAsStreamValue(trrs)
oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err)
}
if ch := pipeline.GetTelemetryCh(ctx); ch != nil {
cd := opts.ConfigDigest()
ot := &telem.LLOObservationTelemetry{
StreamId: streamID,
ObservationTimestamp: opts.ObservationTimestamp().UnixNano(),
ObservationFinishedAt: observationFinishedAt.UnixNano(),
SeqNr: opts.SeqNr(),
ConfigDigest: cd[:],
}
if err != nil {
ot.ObservationError = new(string)
*ot.ObservationError = err.Error()
}
if val != nil {
ot.StreamValueType = int32(val.Type())
b, err := val.MarshalBinary()
if err != nil {
oc.l.Errorw("failed to MarshalBinary on stream value", "error", err)
} else {
ot.StreamValueBinary = b
}
s, err := val.MarshalText()
if err != nil {
oc.l.Errorw("failed to MarshalText on stream value", "error", err)
} else {
ot.StreamValueText = string(s)
}
}
select {
case ch <- ot:
default:
oc.l.Error("telemetry channel is full, dropping observation telemetry")
}
}
return
}

Expand Down
10 changes: 6 additions & 4 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func TestObservationContext_Observe(t *testing.T) {
ctx := tests.Context(t)
r := &mockRegistry{}
telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
lggr := logger.TestLogger(t)
oc := newObservationContext(lggr, r, telem)
opts := llo.DSOpts(nil)

missingStreamID := streams.StreamID(0)
Expand Down Expand Up @@ -128,7 +129,8 @@ func TestObservationContext_Observe_concurrencyStressTest(t *testing.T) {
ctx := tests.Context(t)
r := &mockRegistry{}
telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
lggr := logger.TestLogger(t)
oc := newObservationContext(lggr, r, telem)
opts := llo.DSOpts(nil)

streamID := streams.StreamID(1)
Expand Down Expand Up @@ -252,7 +254,7 @@ result3 -> result3_parse -> multiply3;
require.NoError(t, err)

telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
oc := newObservationContext(lggr, r, telem)
opts := llo.DSOpts(nil)

val, err := oc.Observe(ctx, streams.StreamID(1), opts)
Expand Down Expand Up @@ -337,7 +339,7 @@ result3 -> result3_parse -> multiply3;
}

telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
oc := newObservationContext(lggr, r, telem)
opts := llo.DSOpts(nil)

// concurrency stress test
Expand Down
3 changes: 3 additions & 0 deletions core/services/llo/telem/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package telem

//go:generate protoc --go_out=. --go_opt=paths=source_relative telem_streams.proto
Loading

0 comments on commit f2e881a

Please sign in to comment.