From 217a97d9b6ef84b2b24984f948c752cc3b249e4a Mon Sep 17 00:00:00 2001 From: ET Date: Mon, 16 Mar 2020 16:28:33 -0700 Subject: [PATCH] Replace ErrNoLastValue and ErrEmptyDataSet by ErrNoData (#557) Handle ForEach returning an error --- exporters/metric/internal/statsd/conn.go | 12 +- exporters/metric/prometheus/prometheus.go | 6 +- exporters/metric/stdout/stdout.go | 83 ++++------- exporters/metric/stdout/stdout_test.go | 2 +- exporters/metric/test/test.go | 9 +- .../otlp/internal/transform/metric_test.go | 8 +- exporters/otlp/otlp.go | 5 +- sdk/export/metric/aggregator/aggregator.go | 16 +- sdk/export/metric/metric.go | 9 +- sdk/metric/aggregator/array/array.go | 2 +- sdk/metric/aggregator/array/array_test.go | 6 +- sdk/metric/aggregator/ddsketch/ddsketch.go | 2 +- sdk/metric/aggregator/lastvalue/lastvalue.go | 4 +- .../aggregator/lastvalue/lastvalue_test.go | 2 +- sdk/metric/aggregator/minmaxsumcount/mmsc.go | 8 +- .../aggregator/minmaxsumcount/mmsc_test.go | 2 +- sdk/metric/batcher/defaultkeys/defaultkeys.go | 9 +- .../batcher/defaultkeys/defaultkeys_test.go | 18 ++- sdk/metric/batcher/test/test.go | 3 +- sdk/metric/batcher/ungrouped/ungrouped.go | 11 +- .../batcher/ungrouped/ungrouped_test.go | 13 +- sdk/metric/controller/push/push.go | 4 +- sdk/metric/controller/push/push_test.go | 138 ++++++++++++------ sdk/metric/stress_test.go | 2 +- 24 files changed, 213 insertions(+), 161 deletions(-) diff --git a/exporters/metric/internal/statsd/conn.go b/exporters/metric/internal/statsd/conn.go index ba28386b329..4b8214cf6fc 100644 --- a/exporters/metric/internal/statsd/conn.go +++ b/exporters/metric/internal/statsd/conn.go @@ -163,16 +163,15 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) var aggErr error var sendErr error - checkpointSet.ForEach(func(rec export.Record) { + aggErr = checkpointSet.ForEach(func(rec export.Record) error { before := buf.Len() - if err := e.formatMetric(rec, buf); err != nil && aggErr == nil { - aggErr = err - return + if err := e.formatMetric(rec, buf); err != nil { + return err } if buf.Len() < e.config.MaxPacketSize { - return + return nil } if before == 0 { // A single metric >= packet size @@ -180,7 +179,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) sendErr = err } buf.Reset() - return + return nil } // Send and copy the leftover @@ -193,6 +192,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) copy(buf.Bytes()[0:leftover], buf.Bytes()[before:]) buf.Truncate(leftover) + return nil }) if err := e.send(buf.Bytes()); err != nil && sendErr == nil { sendErr = err diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 133a58de387..3de10ee982e 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -184,8 +184,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { return } - c.exp.snapshot.ForEach(func(record export.Record) { + _ = c.exp.snapshot.ForEach(func(record export.Record) error { ch <- c.toDesc(&record) + return nil }) } @@ -198,7 +199,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { return } - c.exp.snapshot.ForEach(func(record export.Record) { + _ = c.exp.snapshot.ForEach(func(record export.Record) error { agg := record.Aggregator() numberKind := record.Descriptor().NumberKind() labels := labelValues(record.Labels()) @@ -222,6 +223,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } else if lastValue, ok := agg.(aggregator.LastValue); ok { c.exportLastValue(ch, lastValue, numberKind, desc, labels) } + return nil }) } diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index 465afdf7529..65838050d8d 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -139,15 +139,13 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e } func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { - // N.B. Only return one aggError, if any occur. They're likely - // to be duplicates of the same error. var aggError error var batch expoBatch if !e.config.DoNotPrintTime { ts := time.Now() batch.Timestamp = &ts } - checkpointSet.ForEach(func(record export.Record) { + aggError = checkpointSet.ForEach(func(record export.Record) error { desc := record.Descriptor() agg := record.Aggregator() kind := desc.NumberKind() @@ -155,47 +153,31 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) var expose expoLine if sum, ok := agg.(aggregator.Sum); ok { - if value, err := sum.Sum(); err != nil { - aggError = err - expose.Sum = "NaN" - } else { - expose.Sum = value.AsInterface(kind) + value, err := sum.Sum() + if err != nil { + return err } + expose.Sum = value.AsInterface(kind) } if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok { - if count, err := mmsc.Count(); err != nil { - aggError = err - expose.Count = "NaN" - } else { - expose.Count = count + count, err := mmsc.Count() + if err != nil { + return err } + expose.Count = count - if max, err := mmsc.Max(); err != nil { - if err == aggregator.ErrEmptyDataSet { - // This is a special case, indicates an aggregator that - // was checkpointed before its first value was set. - return - } - - aggError = err - expose.Max = "NaN" - } else { - expose.Max = max.AsInterface(kind) + max, err := mmsc.Max() + if err != nil { + return err } + expose.Max = max.AsInterface(kind) - if min, err := mmsc.Min(); err != nil { - if err == aggregator.ErrEmptyDataSet { - // This is a special case, indicates an aggregator that - // was checkpointed before its first value was set. - return - } - - aggError = err - expose.Min = "NaN" - } else { - expose.Min = min.AsInterface(kind) + min, err := mmsc.Min() + if err != nil { + return err } + expose.Min = min.AsInterface(kind) if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 { summary := make([]expoQuantile, len(e.config.Quantiles)) @@ -203,12 +185,11 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) for i, q := range e.config.Quantiles { var vstr interface{} - if value, err := dist.Quantile(q); err != nil { - aggError = err - vstr = "NaN" - } else { - vstr = value.AsInterface(kind) + value, err := dist.Quantile(q) + if err != nil { + return err } + vstr = value.AsInterface(kind) summary[i] = expoQuantile{ Q: q, V: vstr, @@ -216,21 +197,14 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) } } } else if lv, ok := agg.(aggregator.LastValue); ok { - if value, timestamp, err := lv.LastValue(); err != nil { - if err == aggregator.ErrNoLastValue { - // This is a special case, indicates an aggregator that - // was checkpointed before its first value was set. - return - } - - aggError = err - expose.LastValue = "NaN" - } else { - expose.LastValue = value.AsInterface(kind) + value, timestamp, err := lv.LastValue() + if err != nil { + return err + } + expose.LastValue = value.AsInterface(kind) - if !e.config.DoNotPrintTime { - expose.Timestamp = ×tamp - } + if !e.config.DoNotPrintTime { + expose.Timestamp = ×tamp } } @@ -264,6 +238,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) expose.Name = sb.String() batch.Updates = append(batch.Updates, expose) + return nil }) var data []byte diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index d9c6544b0d4..eb8d0b1c28a 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -221,7 +221,7 @@ func TestStdoutMeasureFormat(t *testing.T) { }`, fix.Output()) } -func TestStdoutEmptyDataSet(t *testing.T) { +func TestStdoutNoData(t *testing.T) { desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind) for name, tc := range map[string]export.Aggregator{ "ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc), diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index 89172e8982b..71b97af051b 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -2,9 +2,11 @@ package test import ( "context" + "errors" "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -82,8 +84,11 @@ func (p *CheckpointSet) updateAggregator(desc *export.Descriptor, newAgg export. } } -func (p *CheckpointSet) ForEach(f func(export.Record)) { +func (p *CheckpointSet) ForEach(f func(export.Record) error) error { for _, r := range p.updates { - f(r) + if err := f(r); err != nil && !errors.Is(err, aggregator.ErrNoData) { + return err + } } + return nil } diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index b8eb613363b..7f337bb5358 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -77,9 +77,9 @@ func TestMinMaxSumCountValue(t *testing.T) { assert.NoError(t, mmsc.Update(context.Background(), 1, &metricsdk.Descriptor{})) assert.NoError(t, mmsc.Update(context.Background(), 10, &metricsdk.Descriptor{})) - // Prior to checkpointing ErrEmptyDataSet should be returned. + // Prior to checkpointing ErrNoData should be returned. _, _, _, _, err := minMaxSumCountValues(mmsc) - assert.Error(t, err, aggregator.ErrEmptyDataSet) + assert.EqualError(t, err, aggregator.ErrNoData.Error()) // Checkpoint to set non-zero values mmsc.Checkpoint(context.Background(), &metricsdk.Descriptor{}) @@ -186,13 +186,13 @@ func TestMinMaxSumCountDatapoints(t *testing.T) { } func TestMinMaxSumCountPropagatesErrors(t *testing.T) { - // ErrEmptyDataSet should be returned by both the Min and Max values of + // ErrNoData should be returned by both the Min and Max values of // a MinMaxSumCount Aggregator. Use this fact to check the error is // correctly returned. mmsc := minmaxsumcount.New(&metricsdk.Descriptor{}) _, _, _, _, err := minMaxSumCountValues(mmsc) assert.Error(t, err) - assert.Equal(t, aggregator.ErrEmptyDataSet, err) + assert.Equal(t, aggregator.ErrNoData, err) } func TestSumMetricDescriptor(t *testing.T) { diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 4e9d99320ac..352f03baddb 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -217,12 +217,13 @@ func (e *Exporter) Export(ctx context.Context, cps metricsdk.CheckpointSet) erro // Seed records into the work processing pool. records := make(chan metricsdk.Record) go func() { - cps.ForEach(func(record metricsdk.Record) { + _ = cps.ForEach(func(record metricsdk.Record) (err error) { select { case <-e.stopCh: case <-ctx.Done(): case records <- record: } + return }) close(records) }() @@ -268,7 +269,7 @@ func (e *Exporter) processMetrics(ctx context.Context, out chan<- *metricpb.Metr for r := range in { m, err := transform.Record(r) if err != nil { - if err == aggregator.ErrEmptyDataSet { + if err == aggregator.ErrNoData { // The Aggregator was checkpointed before the first value // was set, skipping. continue diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 27766d59666..2b738c575f1 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -97,20 +97,12 @@ var ( ErrInvalidQuantile = fmt.Errorf("the requested quantile is out of range") ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument") ErrNaNInput = fmt.Errorf("NaN value is an invalid input") - ErrNonMonotoneInput = fmt.Errorf("the new value is not monotone") ErrInconsistentType = fmt.Errorf("inconsistent aggregator types") - // ErrNoLastValue is returned by the LastValue interface when - // (due to a race with collection) the Aggregator is - // checkpointed before the first value is set. The aggregator - // should simply be skipped in this case. - ErrNoLastValue = fmt.Errorf("no value has been set") - - // ErrEmptyDataSet is returned by Max and Quantile interfaces - // when (due to a race with collection) the Aggregator is - // checkpointed before the first value is set. The aggregator - // should simply be skipped in this case. - ErrEmptyDataSet = fmt.Errorf("the result is not defined on an empty data set") + // ErrNoData is returned when (due to a race with collection) + // the Aggregator is check-pointed before the first value is set. + // The aggregator should simply be skipped in this case. + ErrNoData = fmt.Errorf("no data collected by this aggregator") ) // NewInconsistentMergeError formats an error describing an attempt to diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index dd5adae8d45..a626950b0f4 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -201,8 +201,13 @@ type LabelEncoder interface { type CheckpointSet interface { // ForEach iterates over aggregated checkpoints for all // metrics that were updated during the last collection - // period. - ForEach(func(Record)) + // period. Each aggregated checkpoint returned by the + // function parameter may return an error. + // ForEach tolerates ErrNoData silently, as this is + // expected from the Meter implementation. Any other kind + // of error will immediately halt ForEach and return + // the error to the caller. + ForEach(func(Record) error) error } // Record contains the exported data for a single metric instrument diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go index 3855e8cc28c..ea8d1a8cd84 100644 --- a/sdk/metric/aggregator/array/array.go +++ b/sdk/metric/aggregator/array/array.go @@ -177,7 +177,7 @@ func (p *points) Swap(i, j int) { // of a quantile. func (p *points) Quantile(q float64) (core.Number, error) { if len(*p) == 0 { - return core.Number(0), aggregator.ErrEmptyDataSet + return core.Number(0), aggregator.ErrNoData } if q < 0 || q > 1 { diff --git a/sdk/metric/aggregator/array/array_test.go b/sdk/metric/aggregator/array/array_test.go index a7ff5fcd5ca..7bf3b2f4b5e 100644 --- a/sdk/metric/aggregator/array/array_test.go +++ b/sdk/metric/aggregator/array/array_test.go @@ -204,15 +204,15 @@ func TestArrayErrors(t *testing.T) { _, err := agg.Max() require.Error(t, err) - require.Equal(t, err, aggregator.ErrEmptyDataSet) + require.Equal(t, err, aggregator.ErrNoData) _, err = agg.Min() require.Error(t, err) - require.Equal(t, err, aggregator.ErrEmptyDataSet) + require.Equal(t, err, aggregator.ErrNoData) _, err = agg.Quantile(0.1) require.Error(t, err) - require.Equal(t, err, aggregator.ErrEmptyDataSet) + require.Equal(t, err, aggregator.ErrNoData) ctx := context.Background() diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 324edca7c93..7af36f835ea 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -85,7 +85,7 @@ func (c *Aggregator) Min() (core.Number, error) { // It is an error if `q` is less than 0 or greated than 1. func (c *Aggregator) Quantile(q float64) (core.Number, error) { if c.checkpoint.Count() == 0 { - return core.Number(0), aggregator.ErrEmptyDataSet + return core.Number(0), aggregator.ErrNoData } f := c.checkpoint.Quantile(q) if math.IsNaN(f) { diff --git a/sdk/metric/aggregator/lastvalue/lastvalue.go b/sdk/metric/aggregator/lastvalue/lastvalue.go index f1997537bd7..f7bf76ce931 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue.go @@ -68,13 +68,13 @@ func New() *Aggregator { } // LastValue returns the last-recorded lastValue value and the -// corresponding timestamp. The error value aggregator.ErrNoLastValue +// corresponding timestamp. The error value aggregator.ErrNoData // will be returned if (due to a race condition) the checkpoint was // computed before the first value was set. func (g *Aggregator) LastValue() (core.Number, time.Time, error) { gd := (*lastValueData)(g.checkpoint) if gd == unsetLastValue { - return core.Number(0), time.Time{}, aggregator.ErrNoLastValue + return core.Number(0), time.Time{}, aggregator.ErrNoData } return gd.value.AsNumber(), gd.timestamp, nil } diff --git a/sdk/metric/aggregator/lastvalue/lastvalue_test.go b/sdk/metric/aggregator/lastvalue/lastvalue_test.go index 7705e48b6ca..300c03b2c52 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue_test.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue_test.go @@ -113,7 +113,7 @@ func TestLastValueNotSet(t *testing.T) { g.Checkpoint(context.Background(), descriptor) value, timestamp, err := g.LastValue() - require.Equal(t, aggregator.ErrNoLastValue, err) + require.Equal(t, aggregator.ErrNoData, err) require.True(t, timestamp.IsZero()) require.Equal(t, core.Number(0), value) } diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index bbd7a2bccd5..75372c57765 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -86,25 +86,25 @@ func (c *Aggregator) Count() (int64, error) { } // Min returns the minimum value in the checkpoint. -// The error value aggregator.ErrEmptyDataSet will be returned +// The error value aggregator.ErrNoData will be returned // if there were no measurements recorded during the checkpoint. func (c *Aggregator) Min() (core.Number, error) { c.lock.Lock() defer c.lock.Unlock() if c.checkpoint().count.IsZero(core.Uint64NumberKind) { - return c.kind.Zero(), aggregator.ErrEmptyDataSet + return c.kind.Zero(), aggregator.ErrNoData } return c.checkpoint().min, nil } // Max returns the maximum value in the checkpoint. -// The error value aggregator.ErrEmptyDataSet will be returned +// The error value aggregator.ErrNoData will be returned // if there were no measurements recorded during the checkpoint. func (c *Aggregator) Max() (core.Number, error) { c.lock.Lock() defer c.lock.Unlock() if c.checkpoint().count.IsZero(core.Uint64NumberKind) { - return c.kind.Zero(), aggregator.ErrEmptyDataSet + return c.kind.Zero(), aggregator.ErrNoData } return c.checkpoint().max, nil } diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index 5b365890297..8b6a8f24876 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -234,7 +234,7 @@ func TestMaxSumCountNotSet(t *testing.T) { require.Nil(t, err) max, err := agg.Max() - require.Equal(t, aggregator.ErrEmptyDataSet, err) + require.Equal(t, aggregator.ErrNoData, err) require.Equal(t, core.Number(0), max) }) } diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys.go b/sdk/metric/batcher/defaultkeys/defaultkeys.go index 4b0a36ebb27..57757df0874 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys.go @@ -16,9 +16,11 @@ package defaultkeys // import "go.opentelemetry.io/otel/sdk/metric/batcher/defau import ( "context" + "errors" "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" ) type ( @@ -153,8 +155,11 @@ func (b *Batcher) FinishedCollection() { } } -func (p *checkpointSet) ForEach(f func(export.Record)) { +func (p *checkpointSet) ForEach(f func(export.Record) error) error { for _, entry := range p.aggCheckpointMap { - f(entry) + if err := f(entry); err != nil && !errors.Is(err, aggregator.ErrNoData) { + return err + } } + return nil } diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go index 0a5974daced..8509c18c3ff 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go @@ -50,7 +50,8 @@ func TestGroupingStateless(t *testing.T) { b.FinishedCollection() records := test.Output{} - checkpointSet.ForEach(records.AddTo) + err := checkpointSet.ForEach(records.AddTo) + require.NoError(t, err) // Repeat for {counter,lastvalue}.{1,2}. // Output lastvalue should have only the "G=H" and "G=" keys. @@ -69,8 +70,9 @@ func TestGroupingStateless(t *testing.T) { // Verify that state is reset by FinishedCollection() checkpointSet = b.CheckpointSet() b.FinishedCollection() - checkpointSet.ForEach(func(rec export.Record) { + _ = checkpointSet.ForEach(func(rec export.Record) error { t.Fatal("Unexpected call") + return nil }) } @@ -90,7 +92,8 @@ func TestGroupingStateful(t *testing.T) { b.FinishedCollection() records1 := test.Output{} - checkpointSet.ForEach(records1.AddTo) + err := checkpointSet.ForEach(records1.AddTo) + require.NoError(t, err) require.EqualValues(t, map[string]int64{ "sum.a/C=D": 10, // labels1 @@ -102,7 +105,8 @@ func TestGroupingStateful(t *testing.T) { b.FinishedCollection() records2 := test.Output{} - checkpointSet.ForEach(records2.AddTo) + err = checkpointSet.ForEach(records2.AddTo) + require.NoError(t, err) require.EqualValues(t, records1, records2) @@ -118,7 +122,8 @@ func TestGroupingStateful(t *testing.T) { b.FinishedCollection() records3 := test.Output{} - checkpointSet.ForEach(records3.AddTo) + err = checkpointSet.ForEach(records3.AddTo) + require.NoError(t, err) require.EqualValues(t, records1, records3) @@ -130,7 +135,8 @@ func TestGroupingStateful(t *testing.T) { b.FinishedCollection() records4 := test.Output{} - checkpointSet.ForEach(records4.AddTo) + err = checkpointSet.ForEach(records4.AddTo) + require.NoError(t, err) require.EqualValues(t, map[string]int64{ "sum.a/C=D": 30, diff --git a/sdk/metric/batcher/test/test.go b/sdk/metric/batcher/test/test.go index 4a791f1a8b6..539954a2f91 100644 --- a/sdk/metric/batcher/test/test.go +++ b/sdk/metric/batcher/test/test.go @@ -134,7 +134,7 @@ func CounterAgg(desc *export.Descriptor, v int64) export.Aggregator { // AddTo adds a name/label-encoding entry with the lastValue or counter // value to the output map. -func (o Output) AddTo(rec export.Record) { +func (o Output) AddTo(rec export.Record) error { labels := rec.Labels() key := fmt.Sprint(rec.Descriptor().Name(), "/", labels.Encoded()) var value int64 @@ -147,4 +147,5 @@ func (o Output) AddTo(rec export.Record) { value = lv.AsInt64() } o[key] = value + return nil } diff --git a/sdk/metric/batcher/ungrouped/ungrouped.go b/sdk/metric/batcher/ungrouped/ungrouped.go index 76f53fc2bb7..6e098f73146 100644 --- a/sdk/metric/batcher/ungrouped/ungrouped.go +++ b/sdk/metric/batcher/ungrouped/ungrouped.go @@ -16,8 +16,10 @@ package ungrouped // import "go.opentelemetry.io/otel/sdk/metric/batcher/ungroup import ( "context" + "errors" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" ) type ( @@ -101,12 +103,15 @@ func (b *Batcher) FinishedCollection() { } } -func (c batchMap) ForEach(f func(export.Record)) { +func (c batchMap) ForEach(f func(export.Record) error) error { for key, value := range c { - f(export.NewRecord( + if err := f(export.NewRecord( key.descriptor, value.labels, value.aggregator, - )) + )); err != nil && !errors.Is(err, aggregator.ErrNoData) { + return err + } } + return nil } diff --git a/sdk/metric/batcher/ungrouped/ungrouped_test.go b/sdk/metric/batcher/ungrouped/ungrouped_test.go index efde762e7b1..6e9392cf57e 100644 --- a/sdk/metric/batcher/ungrouped/ungrouped_test.go +++ b/sdk/metric/batcher/ungrouped/ungrouped_test.go @@ -62,7 +62,7 @@ func TestUngroupedStateless(t *testing.T) { b.FinishedCollection() records := test.Output{} - checkpointSet.ForEach(records.AddTo) + _ = checkpointSet.ForEach(records.AddTo) // Output lastvalue should have only the "G=H" and "G=" keys. // Output counter should have only the "C=D" and "C=" keys. @@ -84,8 +84,9 @@ func TestUngroupedStateless(t *testing.T) { // Verify that state was reset checkpointSet = b.CheckpointSet() b.FinishedCollection() - checkpointSet.ForEach(func(rec export.Record) { + _ = checkpointSet.ForEach(func(rec export.Record) error { t.Fatal("Unexpected call") + return nil }) } @@ -105,7 +106,7 @@ func TestUngroupedStateful(t *testing.T) { b.FinishedCollection() records1 := test.Output{} - checkpointSet.ForEach(records1.AddTo) + _ = checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]int64{ "sum.a/G~H&C~D": 10, // labels1 @@ -117,7 +118,7 @@ func TestUngroupedStateful(t *testing.T) { b.FinishedCollection() records2 := test.Output{} - checkpointSet.ForEach(records2.AddTo) + _ = checkpointSet.ForEach(records2.AddTo) require.EqualValues(t, records1, records2) @@ -133,7 +134,7 @@ func TestUngroupedStateful(t *testing.T) { b.FinishedCollection() records3 := test.Output{} - checkpointSet.ForEach(records3.AddTo) + _ = checkpointSet.ForEach(records3.AddTo) require.EqualValues(t, records1, records3) @@ -145,7 +146,7 @@ func TestUngroupedStateful(t *testing.T) { b.FinishedCollection() records4 := test.Output{} - checkpointSet.ForEach(records4.AddTo) + _ = checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]int64{ "sum.a/G~H&C~D": 30, diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index cb2294a1c74..22bcc2c5575 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -190,10 +190,10 @@ type syncCheckpointSet struct { var _ export.CheckpointSet = (*syncCheckpointSet)(nil) -func (c syncCheckpointSet) ForEach(fn func(export.Record)) { +func (c syncCheckpointSet) ForEach(fn func(export.Record) error) error { c.mtx.Lock() defer c.mtx.Unlock() - c.delegate.ForEach(fn) + return c.delegate.ForEach(fn) } func (realClock) Now() time.Time { diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 82b725b7465..6ecf57a4c21 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -43,11 +43,11 @@ type testBatcher struct { } type testExporter struct { - t *testing.T - lock sync.Mutex - exports int - records []export.Record - retErr error + t *testing.T + lock sync.Mutex + exports int + records []export.Record + injectErr func(r export.Record) error } type testFixture struct { @@ -118,10 +118,20 @@ func (e *testExporter) Export(_ context.Context, checkpointSet export.Checkpoint e.lock.Lock() defer e.lock.Unlock() e.exports++ - checkpointSet.ForEach(func(r export.Record) { - e.records = append(e.records, r) - }) - return e.retErr + var records []export.Record + if err := checkpointSet.ForEach(func(r export.Record) error { + if e.injectErr != nil { + if err := e.injectErr(r); err != nil { + return err + } + } + records = append(records, r) + return nil + }); err != nil { + return err + } + e.records = records + return nil } func (e *testExporter) resetRecords() ([]export.Record, int) { @@ -230,37 +240,81 @@ func TestPushTicker(t *testing.T) { } func TestPushExportError(t *testing.T) { - fix := newFixture(t) - fix.exporter.retErr = fmt.Errorf("test export error") - - p := push.New(fix.batcher, fix.exporter, time.Second) - - var err error - var lock sync.Mutex - p.SetErrorHandler(func(sdkErr error) { - lock.Lock() - defer lock.Unlock() - err = sdkErr - }) - - mock := mockClock{clock.NewMock()} - p.SetClock(mock) - - p.Start() - runtime.Gosched() - - require.Equal(t, 0, fix.exporter.exports) - require.Nil(t, err) - - mock.Add(time.Second) - runtime.Gosched() - - lock.Lock() - _, exports := fix.batcher.getCounts() - require.Equal(t, 1, exports) - require.Error(t, err) - require.Equal(t, fix.exporter.retErr, err) - lock.Unlock() - - p.Stop() + injector := func(name string, e error) func(r export.Record) error { + return func(r export.Record) error { + if r.Descriptor().Name() == name { + return e + } + return nil + } + } + var errAggregator = fmt.Errorf("unexpected error") + var tests = []struct { + name string + injectedError error + expectedDescriptors []string + expectedError error + }{ + {"errNone", nil, []string{"counter1", "counter2"}, nil}, + {"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil}, + {"errUnexpected", errAggregator, []string{}, errAggregator}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fix := newFixture(t) + fix.exporter.injectErr = injector("counter1", tt.injectedError) + + p := push.New(fix.batcher, fix.exporter, time.Second) + + var err error + var lock sync.Mutex + p.SetErrorHandler(func(sdkErr error) { + lock.Lock() + defer lock.Unlock() + err = sdkErr + }) + + mock := mockClock{clock.NewMock()} + p.SetClock(mock) + + ctx := context.Background() + + meter := p.Meter("name") + counter1 := metric.Must(meter).NewInt64Counter("counter1") + counter2 := metric.Must(meter).NewInt64Counter("counter2") + + p.Start() + runtime.Gosched() + + counter1.Add(ctx, 3, meter.Labels()) + counter2.Add(ctx, 5, meter.Labels()) + + require.Equal(t, 0, fix.exporter.exports) + require.Nil(t, err) + + mock.Add(time.Second) + runtime.Gosched() + + records, exports := fix.exporter.resetRecords() + checkpoints, finishes := fix.batcher.getCounts() + require.Equal(t, 1, exports) + require.Equal(t, 1, checkpoints) + require.Equal(t, 1, finishes) + lock.Lock() + if tt.expectedError == nil { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Equal(t, tt.expectedError, err) + } + lock.Unlock() + require.Equal(t, len(tt.expectedDescriptors), len(records)) + for _, r := range records { + require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name()) + } + + p.Stop() + + }) + } } diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index d00727c6c8c..fd44dfabb79 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -272,7 +272,7 @@ func (f *testFixture) Process(_ context.Context, record export.Record) error { f.impl.storeCollect(actual, sum, time.Time{}) case export.MeasureKind: lv, ts, err := agg.(aggregator.LastValue).LastValue() - if err != nil && err != aggregator.ErrNoLastValue { + if err != nil && err != aggregator.ErrNoData { f.T.Fatal("Last value error: ", err) } f.impl.storeCollect(actual, lv, ts)