Skip to content

Commit

Permalink
Replace ErrNoLastValue and ErrEmptyDataSet by ErrNoData (#557)
Browse files Browse the repository at this point in the history
Handle ForEach returning an error
  • Loading branch information
evantorrie authored Mar 16, 2020
1 parent 6ada85a commit 217a97d
Show file tree
Hide file tree
Showing 24 changed files with 213 additions and 161 deletions.
12 changes: 6 additions & 6 deletions exporters/metric/internal/statsd/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,23 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
var aggErr error
var sendErr error

checkpointSet.ForEach(func(rec export.Record) {
aggErr = checkpointSet.ForEach(func(rec export.Record) error {
before := buf.Len()

if err := e.formatMetric(rec, buf); err != nil && aggErr == nil {
aggErr = err
return
if err := e.formatMetric(rec, buf); err != nil {
return err
}

if buf.Len() < e.config.MaxPacketSize {
return
return nil
}
if before == 0 {
// A single metric >= packet size
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
buf.Reset()
return
return nil
}

// Send and copy the leftover
Expand All @@ -193,6 +192,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])

buf.Truncate(leftover)
return nil
})
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
Expand Down
6 changes: 4 additions & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
return
}

c.exp.snapshot.ForEach(func(record export.Record) {
_ = c.exp.snapshot.ForEach(func(record export.Record) error {
ch <- c.toDesc(&record)
return nil
})
}

Expand All @@ -198,7 +199,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
return
}

c.exp.snapshot.ForEach(func(record export.Record) {
_ = c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
labels := labelValues(record.Labels())
Expand All @@ -222,6 +223,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
} else if lastValue, ok := agg.(aggregator.LastValue); ok {
c.exportLastValue(ch, lastValue, numberKind, desc, labels)
}
return nil
})
}

Expand Down
83 changes: 29 additions & 54 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,98 +139,72 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e
}

func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
// N.B. Only return one aggError, if any occur. They're likely
// to be duplicates of the same error.
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
checkpointSet.ForEach(func(record export.Record) {
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
kind := desc.NumberKind()

var expose expoLine

if sum, ok := agg.(aggregator.Sum); ok {
if value, err := sum.Sum(); err != nil {
aggError = err
expose.Sum = "NaN"
} else {
expose.Sum = value.AsInterface(kind)
value, err := sum.Sum()
if err != nil {
return err
}
expose.Sum = value.AsInterface(kind)
}

if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok {
if count, err := mmsc.Count(); err != nil {
aggError = err
expose.Count = "NaN"
} else {
expose.Count = count
count, err := mmsc.Count()
if err != nil {
return err
}
expose.Count = count

if max, err := mmsc.Max(); err != nil {
if err == aggregator.ErrEmptyDataSet {
// This is a special case, indicates an aggregator that
// was checkpointed before its first value was set.
return
}

aggError = err
expose.Max = "NaN"
} else {
expose.Max = max.AsInterface(kind)
max, err := mmsc.Max()
if err != nil {
return err
}
expose.Max = max.AsInterface(kind)

if min, err := mmsc.Min(); err != nil {
if err == aggregator.ErrEmptyDataSet {
// This is a special case, indicates an aggregator that
// was checkpointed before its first value was set.
return
}

aggError = err
expose.Min = "NaN"
} else {
expose.Min = min.AsInterface(kind)
min, err := mmsc.Min()
if err != nil {
return err
}
expose.Min = min.AsInterface(kind)

if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 {
summary := make([]expoQuantile, len(e.config.Quantiles))
expose.Quantiles = summary

for i, q := range e.config.Quantiles {
var vstr interface{}
if value, err := dist.Quantile(q); err != nil {
aggError = err
vstr = "NaN"
} else {
vstr = value.AsInterface(kind)
value, err := dist.Quantile(q)
if err != nil {
return err
}
vstr = value.AsInterface(kind)
summary[i] = expoQuantile{
Q: q,
V: vstr,
}
}
}
} else if lv, ok := agg.(aggregator.LastValue); ok {
if value, timestamp, err := lv.LastValue(); err != nil {
if err == aggregator.ErrNoLastValue {
// This is a special case, indicates an aggregator that
// was checkpointed before its first value was set.
return
}

aggError = err
expose.LastValue = "NaN"
} else {
expose.LastValue = value.AsInterface(kind)
value, timestamp, err := lv.LastValue()
if err != nil {
return err
}
expose.LastValue = value.AsInterface(kind)

if !e.config.DoNotPrintTime {
expose.Timestamp = &timestamp
}
if !e.config.DoNotPrintTime {
expose.Timestamp = &timestamp
}
}

Expand Down Expand Up @@ -264,6 +238,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
expose.Name = sb.String()

batch.Updates = append(batch.Updates, expose)
return nil
})

var data []byte
Expand Down
2 changes: 1 addition & 1 deletion exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
}`, fix.Output())
}

func TestStdoutEmptyDataSet(t *testing.T) {
func TestStdoutNoData(t *testing.T) {
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind)
for name, tc := range map[string]export.Aggregator{
"ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc),
Expand Down
9 changes: 7 additions & 2 deletions exporters/metric/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package test

import (
"context"
"errors"

"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
Expand Down Expand Up @@ -82,8 +84,11 @@ func (p *CheckpointSet) updateAggregator(desc *export.Descriptor, newAgg export.
}
}

func (p *CheckpointSet) ForEach(f func(export.Record)) {
func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
for _, r := range p.updates {
f(r)
if err := f(r); err != nil && !errors.Is(err, aggregator.ErrNoData) {
return err
}
}
return nil
}
8 changes: 4 additions & 4 deletions exporters/otlp/internal/transform/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.NoError(t, mmsc.Update(context.Background(), 1, &metricsdk.Descriptor{}))
assert.NoError(t, mmsc.Update(context.Background(), 10, &metricsdk.Descriptor{}))

// Prior to checkpointing ErrEmptyDataSet should be returned.
// Prior to checkpointing ErrNoData should be returned.
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err, aggregator.ErrEmptyDataSet)
assert.EqualError(t, err, aggregator.ErrNoData.Error())

// Checkpoint to set non-zero values
mmsc.Checkpoint(context.Background(), &metricsdk.Descriptor{})
Expand Down Expand Up @@ -186,13 +186,13 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
}

func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
// ErrEmptyDataSet should be returned by both the Min and Max values of
// ErrNoData should be returned by both the Min and Max values of
// a MinMaxSumCount Aggregator. Use this fact to check the error is
// correctly returned.
mmsc := minmaxsumcount.New(&metricsdk.Descriptor{})
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err)
assert.Equal(t, aggregator.ErrEmptyDataSet, err)
assert.Equal(t, aggregator.ErrNoData, err)
}

func TestSumMetricDescriptor(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions exporters/otlp/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,13 @@ func (e *Exporter) Export(ctx context.Context, cps metricsdk.CheckpointSet) erro
// Seed records into the work processing pool.
records := make(chan metricsdk.Record)
go func() {
cps.ForEach(func(record metricsdk.Record) {
_ = cps.ForEach(func(record metricsdk.Record) (err error) {
select {
case <-e.stopCh:
case <-ctx.Done():
case records <- record:
}
return
})
close(records)
}()
Expand Down Expand Up @@ -268,7 +269,7 @@ func (e *Exporter) processMetrics(ctx context.Context, out chan<- *metricpb.Metr
for r := range in {
m, err := transform.Record(r)
if err != nil {
if err == aggregator.ErrEmptyDataSet {
if err == aggregator.ErrNoData {
// The Aggregator was checkpointed before the first value
// was set, skipping.
continue
Expand Down
16 changes: 4 additions & 12 deletions sdk/export/metric/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,12 @@ var (
ErrInvalidQuantile = fmt.Errorf("the requested quantile is out of range")
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrNonMonotoneInput = fmt.Errorf("the new value is not monotone")
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")

// ErrNoLastValue is returned by the LastValue interface when
// (due to a race with collection) the Aggregator is
// checkpointed before the first value is set. The aggregator
// should simply be skipped in this case.
ErrNoLastValue = fmt.Errorf("no value has been set")

// ErrEmptyDataSet is returned by Max and Quantile interfaces
// when (due to a race with collection) the Aggregator is
// checkpointed before the first value is set. The aggregator
// should simply be skipped in this case.
ErrEmptyDataSet = fmt.Errorf("the result is not defined on an empty data set")
// ErrNoData is returned when (due to a race with collection)
// the Aggregator is check-pointed before the first value is set.
// The aggregator should simply be skipped in this case.
ErrNoData = fmt.Errorf("no data collected by this aggregator")
)

// NewInconsistentMergeError formats an error describing an attempt to
Expand Down
9 changes: 7 additions & 2 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,13 @@ type LabelEncoder interface {
type CheckpointSet interface {
// ForEach iterates over aggregated checkpoints for all
// metrics that were updated during the last collection
// period.
ForEach(func(Record))
// period. Each aggregated checkpoint returned by the
// function parameter may return an error.
// ForEach tolerates ErrNoData silently, as this is
// expected from the Meter implementation. Any other kind
// of error will immediately halt ForEach and return
// the error to the caller.
ForEach(func(Record) error) error
}

// Record contains the exported data for a single metric instrument
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (p *points) Swap(i, j int) {
// of a quantile.
func (p *points) Quantile(q float64) (core.Number, error) {
if len(*p) == 0 {
return core.Number(0), aggregator.ErrEmptyDataSet
return core.Number(0), aggregator.ErrNoData
}

if q < 0 || q > 1 {
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ func TestArrayErrors(t *testing.T) {

_, err := agg.Max()
require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet)
require.Equal(t, err, aggregator.ErrNoData)

_, err = agg.Min()
require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet)
require.Equal(t, err, aggregator.ErrNoData)

_, err = agg.Quantile(0.1)
require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet)
require.Equal(t, err, aggregator.ErrNoData)

ctx := context.Background()

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Aggregator) Min() (core.Number, error) {
// It is an error if `q` is less than 0 or greated than 1.
func (c *Aggregator) Quantile(q float64) (core.Number, error) {
if c.checkpoint.Count() == 0 {
return core.Number(0), aggregator.ErrEmptyDataSet
return core.Number(0), aggregator.ErrNoData
}
f := c.checkpoint.Quantile(q)
if math.IsNaN(f) {
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func New() *Aggregator {
}

// LastValue returns the last-recorded lastValue value and the
// corresponding timestamp. The error value aggregator.ErrNoLastValue
// corresponding timestamp. The error value aggregator.ErrNoData
// will be returned if (due to a race condition) the checkpoint was
// computed before the first value was set.
func (g *Aggregator) LastValue() (core.Number, time.Time, error) {
gd := (*lastValueData)(g.checkpoint)
if gd == unsetLastValue {
return core.Number(0), time.Time{}, aggregator.ErrNoLastValue
return core.Number(0), time.Time{}, aggregator.ErrNoData
}
return gd.value.AsNumber(), gd.timestamp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestLastValueNotSet(t *testing.T) {
g.Checkpoint(context.Background(), descriptor)

value, timestamp, err := g.LastValue()
require.Equal(t, aggregator.ErrNoLastValue, err)
require.Equal(t, aggregator.ErrNoData, err)
require.True(t, timestamp.IsZero())
require.Equal(t, core.Number(0), value)
}
Loading

0 comments on commit 217a97d

Please sign in to comment.