Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use StateLocker in MinMaxSumCount #546

Merged
merged 5 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/core/number.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ const (
Uint64NumberKind
)

// Zero returns a zero value for a given NumberKind
func (k NumberKind) Zero() Number {
switch k {
case Int64NumberKind:
return NewInt64Number(0)
case Float64NumberKind:
return NewFloat64Number(0.)
case Uint64NumberKind:
return NewUint64Number(0)
default:
return Number(0)
}
}

// Minimum returns the minimum representable value
// for a given NumberKind
func (k NumberKind) Minimum() Number {
Expand Down
150 changes: 85 additions & 65 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import (
"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/internal"
)

type (
// Aggregator aggregates measure events, keeping only the max,
// sum, and count.
Aggregator struct {
// current has to be aligned for 64-bit atomic operations.
current state
// checkpoint has to be aligned for 64-bit atomic operations.
checkpoint state
kind core.NumberKind
states [2]state
lock internal.StateLocker
kind core.NumberKind
}

state struct {
Expand All @@ -48,104 +48,116 @@ var _ aggregator.MinMaxSumCount = &Aggregator{}
// New returns a new measure aggregator for computing min, max, sum, and
// count. It does not compute quantile information other than Max.
//
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent. For greater consistency and lower
// performance, consider using Array or DDSketch aggregators.
// This aggregator uses the StateLocker pattern to guarantee
// the count, sum, min and max are consistent within a checkpoint
func New(desc *export.Descriptor) *Aggregator {
kind := desc.NumberKind()
return &Aggregator{
kind: desc.NumberKind(),
current: unsetMinMaxSumCount(desc.NumberKind()),
kind: kind,
states: [2]state{
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
{
count: core.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
},
}
}

func unsetMinMaxSumCount(kind core.NumberKind) state {
return state{min: kind.Maximum(), max: kind.Minimum()}
}

// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (core.Number, error) {
return c.checkpoint.sum, nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().sum, nil
}

// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return int64(c.checkpoint.count.AsUint64()), nil
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint().count.CoerceToInt64(core.Uint64NumberKind), nil
}

// Min returns the minimum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.min being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Maximum(), Min() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Min() (core.Number, error) {
if c.checkpoint.min == c.kind.Maximum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.min, nil
return c.checkpoint().min, nil
}

// Max returns the maximum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned if
// (due to a race condition) the checkpoint was set prior to
// current.max being computed in Update().
//
// Note: If a measure's recorded values for a given checkpoint are
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
// The error value aggregator.ErrEmptyDataSet will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (core.Number, error) {
if c.checkpoint.max == c.kind.Minimum() {
return core.Number(0), aggregator.ErrEmptyDataSet
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet
}
return c.checkpoint.max, nil
return c.checkpoint().max, nil
}

// Checkpoint saves the current state and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Min, Max, Sum, and Count are not consistent with each
// other.
// the empty set.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// N.B. There is no atomic operation that can update all three
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.

c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
c.checkpoint.min = c.current.min.SwapNumberAtomic(c.kind.Maximum())
c.lock.SwapActiveState(c.resetCheckpoint)
}

// checkpoint returns the "cold" state, i.e. state collected prior to the
// most recent Checkpoint() call
func (c *Aggregator) checkpoint() *state {
return &c.states[c.lock.ColdIdx()]
}

func (c *Aggregator) resetCheckpoint() {
checkpoint := c.checkpoint()

checkpoint.count.SetUint64(0)
checkpoint.sum.SetNumber(c.kind.Zero())
checkpoint.min.SetNumber(c.kind.Maximum())
checkpoint.max.SetNumber(c.kind.Minimum())
}

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
kind := desc.NumberKind()

c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)
cIdx := c.lock.Start()
defer c.lock.End(cIdx)

current := &c.states[cIdx]
current.count.AddUint64Atomic(1)
current.sum.AddNumberAtomic(kind, number)

for {
current := c.current.min.AsNumberAtomic()
cmin := current.min.AsNumberAtomic()

if number.CompareNumber(kind, current) >= 0 {
if number.CompareNumber(kind, cmin) >= 0 {
break
}
if c.current.min.CompareAndSwapNumber(current, number) {
if current.min.CompareAndSwapNumber(cmin, number) {
break
}
}
for {
current := c.current.max.AsNumberAtomic()
cmax := current.max.AsNumberAtomic()

if number.CompareNumber(kind, current) <= 0 {
if number.CompareNumber(kind, cmax) <= 0 {
break
}
if c.current.max.CompareAndSwapNumber(current, number) {
if current.max.CompareAndSwapNumber(cmax, number) {
break
}
}
Expand All @@ -159,14 +171,22 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
// Lock() synchronizes Merge() and Checkpoint() to ensure all operations of
// Merge() are performed on the same state.
c.lock.Lock()
defer c.lock.Unlock()

current := c.checkpoint()
ocheckpoint := o.checkpoint()

current.count.AddNumber(core.Uint64NumberKind, ocheckpoint.count)
current.sum.AddNumber(desc.NumberKind(), ocheckpoint.sum)

if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
c.checkpoint.min.SetNumber(o.checkpoint.min)
if current.min.CompareNumber(desc.NumberKind(), ocheckpoint.min) > 0 {
current.min.SetNumber(ocheckpoint.min)
}
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
if current.max.CompareNumber(desc.NumberKind(), ocheckpoint.max) < 0 {
current.max.SetNumber(ocheckpoint.max)
}
return nil
}
8 changes: 2 additions & 6 deletions sdk/metric/aggregator/minmaxsumcount/mmsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,8 @@ var (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.current",
Offset: unsafe.Offsetof(Aggregator{}.current),
},
{
Name: "Aggregator.checkpoint",
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
Name: "Aggregator.states",
Offset: unsafe.Offsetof(Aggregator{}.states),
},
{
Name: "state.count",
Expand Down
76 changes: 76 additions & 0 deletions sdk/metric/minmaxsumcount_stress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test is too large for the race detector. This SDK uses no locks
// that the race detector would help with, anyway.
// +build !race

package metric_test

import (
"context"
"math/rand"
"testing"
"time"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
)

func TestStressInt64MinMaxSumCount(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, nil, "", "", core.Int64NumberKind)
mmsc := minmaxsumcount.New(desc)

go func() {
rnd := rand.New(rand.NewSource(time.Now().Unix()))
v := rnd.Int63() % 103
for {
_ = mmsc.Update(context.Background(), core.NewInt64Number(v), desc)
v++
}
}()

startTime := time.Now()
for time.Since(startTime) < time.Second {
mmsc.Checkpoint(context.Background(), desc)

s, _ := mmsc.Sum()
c, _ := mmsc.Count()
min, e1 := mmsc.Min()
max, e2 := mmsc.Max()
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
t.Fail()
}
if c != 0 {
if e1 != nil || e2 != nil {
t.Fail()
}
lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64()

if hi-lo+1 != c {
t.Fail()
}
if c == 1 {
if lo != hi || lo != sum {
t.Fail()
}
} else {
if hi*(hi+1)/2-(lo-1)*lo/2 != sum {
t.Fail()
}
}
}
}
}