Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use StateLocker in MinMaxSumCount #546

Merged
merged 5 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/core/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ const (
Uint64NumberKind
)

// Zero returns a zero value for a given NumberKind
func (k NumberKind) Zero() Number {
switch k {
case Int64NumberKind:
return NewInt64Number(0)
case Float64NumberKind:
return NewFloat64Number(0.)
case Uint64NumberKind:
return NewUint64Number(0)
default:
return Number(0)
}
}

// Minimum returns the minimum representable value
// for a given NumberKind
func (k NumberKind) Minimum() Number {
Expand Down
152 changes: 86 additions & 66 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator aggregates measure events, keeping only the max,
// sum, and count.
Aggregator struct {
// current has to be aligned for 64-bit atomic operations.
current state
// checkpoint has to be aligned for 64-bit atomic operations.
checkpoint state
kind core.NumberKind
// states has to be aligned for 64-bit atomic operations.
states [2]state
lock internal.StateLocker
kind core.NumberKind
}

state struct {
Expand All @@ -48,104 +48,116 @@ var _ aggregator.MinMaxSumCount = &Aggregator{}
// New returns a new measure aggregator for computing min, max, sum, and
// count. It does not compute quantile information other than Max.
//
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent. For greater consistency and lower
// performance, consider using Array or DDSketch aggregators.
// This aggregator uses the StateLocker pattern to guarantee
// the count, sum, min and max are consistent within a checkpoint
evantorrie marked this conversation as resolved.
Show resolved Hide resolved
func New(desc *export.Descriptor) *Aggregator {
kind := desc.NumberKind()
return &Aggregator{
kind: desc.NumberKind(),
current: unsetMinMaxSumCount(desc.NumberKind()),
kind: kind,
states: [2]state{
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
},
}
}

func unsetMinMaxSumCount(kind core.NumberKind) state {
return state{min: kind.Maximum(), max: kind.Minimum()}
}

// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (core.Number, error) {
return c.checkpoint.sum, nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return int64(c.checkpoint.count.AsUint64()), nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
}

// Min returns the minimum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.min being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Maximum(), Min() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
evantorrie marked this conversation as resolved.
Show resolved Hide resolved
func (c *Aggregator) Min() (core.Number, error) {
if c.checkpoint.min == c.kind.Maximum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.min, nil
return c.checkpoint().min, nil
}

// Max returns the maximum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.max being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (core.Number, error) {
if c.checkpoint.max == c.kind.Minimum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.max, nil
return c.checkpoint().max, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Min, Max, Sum, and Count are not consistent with each
// other.
// the empty set.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// N.B. There is no atomic operation that can update all three
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
c.checkpoint.min = c.current.min.SwapNumberAtomic(c.kind.Maximum())
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the "cold" state, i.e. state collected prior to the
// most recent Checkpoint() call
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(c.kind.Zero())
checkpoint.min.SetNumber(c.kind.Maximum())
checkpoint.max.SetNumber(c.kind.Minimum())
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
kind := desc.NumberKind()

c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)
cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

for {
current := c.current.min.AsNumberAtomic()
cmin := current.min.AsNumberAtomic()

if number.CompareNumber(kind, current) >= 0 {
if number.CompareNumber(kind, cmin) >= 0 {
break
}
if c.current.min.CompareAndSwapNumber(current, number) {
if current.min.CompareAndSwapNumber(cmin, number) {
break
}
}
for {
current := c.current.max.AsNumberAtomic()
cmax := current.max.AsNumberAtomic()

if number.CompareNumber(kind, current) <= 0 {
if number.CompareNumber(kind, cmax) <= 0 {
break
}
if c.current.max.CompareAndSwapNumber(current, number) {
if current.max.CompareAndSwapNumber(cmax, number) {
break
}
}
Expand All @@ -159,14 +171,22 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
// Merge() are performed on the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
ocheckpoint := o.checkpoint()

current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)

if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
c.checkpoint.min.SetNumber(o.checkpoint.min)
if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
current.min.SetNumber(ocheckpoint.min)
}
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
current.max.SetNumber(ocheckpoint.max)
}
return nil
}
8 changes: 2 additions & 6 deletions sdk/metric/aggregator/minmaxsumcount/mmsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,8 @@ var (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.current",
Offset: unsafe.Offsetof(Aggregator{}.current),
},
{
Name: "Aggregator.checkpoint",
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
Name: "Aggregator.states",
Offset: unsafe.Offsetof(Aggregator{}.states),
},
{
Name: "state.count",
Expand Down
83 changes: 83 additions & 0 deletions sdk/metric/minmaxsumcount_stress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test is too large for the race detector. This SDK uses no locks
// that the race detector would help with, anyway.
// +build !race

package metric_test

import (
"context"
"math/rand"
"testing"
"time"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
)

func TestStressInt64MinMaxSumCount(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind)
mmsc := minmaxsumcount.New(desc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
rnd := rand.New(rand.NewSource(time.Now().Unix()))
v := rnd.Int63() % 103
for {
select {
case <-ctx.Done():
return
default:
_ = mmsc.Update(ctx, core.NewInt64Number(v), desc)
}
v++
}
}()

startTime := time.Now()
for time.Since(startTime) < time.Second {
mmsc.Checkpoint(context.Background(), desc)

s, _ := mmsc.Sum()
c, _ := mmsc.Count()
min, e1 := mmsc.Min()
max, e2 := mmsc.Max()
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
t.Fail()
}
if c != 0 {
if e1 != nil || e2 != nil {
t.Fail()
}
lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64()

if hi-lo+1 != c {
t.Fail()
}
if c == 1 {
if lo != hi || lo != sum {
t.Fail()
}
} else {
if hi*(hi+1)/2-(lo-1)*lo/2 != sum {
t.Fail()
}
}
}
}
}
2 changes: 1 addition & 1 deletion sdk/metric/stress_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019, OpenTelemetry Authors
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down