Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Clean up aggregated metric writer and encoder #4112

Merged
merged 2 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions src/aggregator/aggregator/handler/writer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ import (
"github.com/m3db/m3/src/x/pool"
)

const (
defaultEncodingTimeSamplingRate = 0
)

// Options provide a set of options for the writer.
type Options interface {
// SetClockOptions sets the clock options.
// SetClockOptions is deprecated
SetClockOptions(value clock.Options) Options

// ClockOptions returns the clock options.
// ClockOptions is deprecated
ClockOptions() clock.Options

// SetInstrumentOptions sets the instrument options.
Expand All @@ -50,14 +46,10 @@ type Options interface {
// BytesPool returns the bytes pool.
BytesPool() pool.BytesPool

// SetEncodingTimeSamplingRate sets the sampling rate at which the encoding time is
// included in the encoded data. A value of 0 means the encoding time is never included,
// and a value of 1 means the encoding time is always included.
// SetEncodingTimeSamplingRate is deprecated
SetEncodingTimeSamplingRate(value float64) Options

// EncodingTimeSamplingRate returns the sampling rate at which the encoding time is
// included in the encoded data. A value of 0 means the encoding time is never included,
// and a value of 1 means the encoding time is always included.
// EncodingTimeSamplingRate is deprecated
EncodingTimeSamplingRate() float64
}

Expand All @@ -71,9 +63,8 @@ type options struct {
// NewOptions provide a set of writer options.
func NewOptions() Options {
return &options{
clockOpts: clock.NewOptions(),
instrumentOpts: instrument.NewOptions(),
encodingTimeSamplingRate: defaultEncodingTimeSamplingRate,
clockOpts: clock.NewOptions(),
instrumentOpts: instrument.NewOptions(),
}
}

Expand Down Expand Up @@ -108,11 +99,9 @@ func (o *options) BytesPool() pool.BytesPool {
}

func (o *options) SetEncodingTimeSamplingRate(value float64) Options {
opts := *o
opts.encodingTimeSamplingRate = value
return &opts
return o
}

func (o *options) EncodingTimeSamplingRate() float64 {
return o.encodingTimeSamplingRate
return 0
}
63 changes: 21 additions & 42 deletions src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ package writer

import (
"errors"
"math/rand"

"github.com/m3db/m3/src/aggregator/sharding"
"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/x/clock"

"github.com/uber-go/tally"
)
Expand All @@ -38,44 +36,34 @@ var (
errWriterClosed = errors.New("writer is closed")
)

type randFn func() float64

type protobufWriterMetrics struct {
writerClosed tally.Counter
encodeSuccess tally.Counter
encodeErrors tally.Counter
routeSuccess tally.Counter
routeErrors tally.Counter
writerClosed tally.Counter
encodeErrors tally.Counter
routeErrors tally.Counter
}

func newProtobufWriterMetrics(scope tally.Scope) protobufWriterMetrics {
encodeScope := scope.SubScope("encode")
routeScope := scope.SubScope("route")
return protobufWriterMetrics{
writerClosed: scope.Counter("writer-closed"),
encodeSuccess: encodeScope.Counter("success"),
encodeErrors: encodeScope.Counter("errors"),
routeSuccess: routeScope.Counter("success"),
routeErrors: routeScope.Counter("errors"),
writerClosed: scope.Counter("writer-closed"),
encodeErrors: encodeScope.Counter("errors"),
routeErrors: routeScope.Counter("errors"),
}
}

// protobufWriter encodes data and routes them to the backend.
// protobufWriter is not thread safe.
type protobufWriter struct {
encodingTimeSamplingRate float64
encoder protobuf.AggregatedEncoder
p producer.Producer
numShards uint32
encoder *protobuf.AggregatedEncoder
p producer.Producer
numShards uint32

closed bool
m aggregated.MetricWithStoragePolicy
rand *rand.Rand
metrics protobufWriterMetrics

nowFn clock.NowFn
randFn randFn
shardFn sharding.ShardFn
closed bool
}

// NewProtobufWriter creates a writer that encodes metric in protobuf.
Expand All @@ -84,20 +72,15 @@ func NewProtobufWriter(
shardFn sharding.ShardFn,
opts Options,
) Writer {
nowFn := opts.ClockOptions().NowFn()
instrumentOpts := opts.InstrumentOptions()
w := &protobufWriter{
encodingTimeSamplingRate: opts.EncodingTimeSamplingRate(),
encoder: protobuf.NewAggregatedEncoder(opts.BytesPool()),
p: producer,
numShards: producer.NumShards(),
closed: false,
rand: rand.New(rand.NewSource(nowFn().UnixNano())),
metrics: newProtobufWriterMetrics(instrumentOpts.MetricsScope()),
nowFn: nowFn,
shardFn: shardFn,
encoder: protobuf.NewAggregatedEncoder(opts.BytesPool()),
p: producer,
numShards: producer.NumShards(),
closed: false,
metrics: newProtobufWriterMetrics(instrumentOpts.MetricsScope()),
shardFn: shardFn,
}
w.randFn = w.rand.Float64
return w
}

Expand All @@ -106,22 +89,18 @@ func (w *protobufWriter) Write(mp aggregated.ChunkedMetricWithStoragePolicy) err
w.metrics.writerClosed.Inc(1)
return errWriterClosed
}
var encodeNanos int64
if w.encodingTimeSamplingRate > 0 && w.randFn() < w.encodingTimeSamplingRate {
encodeNanos = w.nowFn().UnixNano()
}

m, shard := w.prepare(mp)
if err := w.encoder.Encode(m, encodeNanos); err != nil {
if err := w.encoder.Encode(m); err != nil {
w.metrics.encodeErrors.Inc(1)
return err
}

w.metrics.encodeSuccess.Inc(1)
if err := w.p.Produce(newMessage(shard, mp.StoragePolicy, w.encoder.Buffer())); err != nil {
w.metrics.routeErrors.Inc(1)
return err
}
w.metrics.routeSuccess.Inc(1)

return nil
}

Expand Down Expand Up @@ -198,8 +177,8 @@ func (f storagePolicyFilter) Filter(m producer.Message) bool {
if !ok {
return true
}
for _, accepted := range f.acceptedStoragePolicies {
if accepted == msg.sp {
for i := 0; i < len(f.acceptedStoragePolicies); i++ {
if f.acceptedStoragePolicies[i].Equivalent(msg.sp) {
return true
}
}
Expand Down
29 changes: 3 additions & 26 deletions src/aggregator/aggregator/handler/writer/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/producer"
"github.com/m3db/m3/src/x/clock"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -108,12 +107,7 @@ func TestProtobufWriterWriteClosed(t *testing.T) {
}

func TestProtobufWriterWrite(t *testing.T) {
now := time.Now()
nowFn := func() time.Time { return now }
opts := NewOptions().
SetClockOptions(clock.NewOptions().SetNowFn(nowFn)).
SetEncodingTimeSamplingRate(0.5)

opts := NewOptions()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
writer := testProtobufWriter(t, ctrl, opts)
Expand All @@ -133,7 +127,6 @@ func TestProtobufWriterWrite(t *testing.T) {
},
StoragePolicy: sp,
},
encodedAtNanos: d.EncodeNanos(),
})
return nil
}).AnyTimes()
Expand All @@ -147,14 +140,6 @@ func TestProtobufWriterWrite(t *testing.T) {
require.Fail(t, "unexpected chunked id %v", id)
return 0
}
var iter int
writer.randFn = func() float64 {
iter++
if iter%2 == 0 {
return 0.1
}
return 0.9
}

inputs := []aggregated.ChunkedMetricWithStoragePolicy{
testChunkedMetricWithStoragePolicy,
Expand All @@ -165,27 +150,21 @@ func TestProtobufWriterWrite(t *testing.T) {
for _, input := range inputs {
require.NoError(t, writer.Write(input))
}

encodedAtNanos := now.UnixNano()
expectedData := map[uint32][]decodeData{
1: []decodeData{
1: {
{
MetricWithStoragePolicy: testMetricWithStoragePolicy,
encodedAtNanos: 0,
},
{
MetricWithStoragePolicy: testMetricWithStoragePolicy,
encodedAtNanos: encodedAtNanos,
},
},
2: []decodeData{
2: {
{
MetricWithStoragePolicy: testMetricWithStoragePolicy2,
encodedAtNanos: encodedAtNanos,
},
{
MetricWithStoragePolicy: testMetricWithStoragePolicy2,
encodedAtNanos: 0,
},
},
}
Expand All @@ -205,6 +184,4 @@ func testProtobufWriter(t *testing.T, ctrl *gomock.Controller, opts Options) *pr

type decodeData struct {
aggregated.MetricWithStoragePolicy

encodedAtNanos int64
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package m3msg

import (
"context"
"fmt"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -85,7 +84,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
}

encoder := protobuf.NewAggregatedEncoder(nil)
require.NoError(t, encoder.Encode(m1, 2000))
require.NoError(t, encoder.Encode(m1))
enc := proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
Expand All @@ -107,7 +106,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
},
StoragePolicy: precisionStoragePolicy,
}
require.NoError(t, encoder.Encode(m2, 3000))
require.NoError(t, encoder.Encode(m2))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
Expand All @@ -117,19 +116,17 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
require.NoError(t, dec.Decode(&a))
require.Equal(t, 2, w.ingested())

payload, ok := w.m[key(string(m1.ID), 2000)]
payload, ok := w.m[string(m1.ID)] //nolint:govet
require.True(t, ok)
require.Equal(t, string(m1.ID), payload.id)
require.Equal(t, m1.TimeNanos, payload.metricNanos)
require.Equal(t, 2000, int(payload.encodeNanos))
require.Equal(t, m1.Value, payload.value)
require.Equal(t, m1.StoragePolicy, payload.sp)

payload, ok = w.m[key(string(m2.ID), 3000)]
payload, ok = w.m[string(m2.ID)]
require.True(t, ok)
require.Equal(t, string(m2.ID), payload.id)
require.Equal(t, m2.TimeNanos, payload.metricNanos)
require.Equal(t, 3000, int(payload.encodeNanos))
require.Equal(t, m2.Value, payload.value)
require.Equal(t, m2.StoragePolicy, payload.sp)
}
Expand Down Expand Up @@ -168,7 +165,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {
}

encoder := protobuf.NewAggregatedEncoder(nil)
require.NoError(t, encoder.Encode(m1, 2000))
require.NoError(t, encoder.Encode(m1))
enc := proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
Expand All @@ -191,7 +188,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {
},
StoragePolicy: policy.MustParseStoragePolicy("5m:180d"),
}
require.NoError(t, encoder.Encode(m2, 3000))
require.NoError(t, encoder.Encode(m2))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
Expand All @@ -212,7 +209,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {
},
StoragePolicy: baseStoragePolicy,
}
require.NoError(t, encoder.Encode(m3, 3000))
require.NoError(t, encoder.Encode(m3))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
Expand Down Expand Up @@ -248,7 +245,7 @@ func (m *mockWriter) write(
value: value,
sp: sp,
}
m.m[key(payload.id, encodeNanos)] = payload
m.m[payload.id] = payload
m.Unlock()
callbackable.Callback(OnSuccess)
}
Expand All @@ -260,10 +257,6 @@ func (m *mockWriter) ingested() int {
return m.n
}

func key(id string, encodeTime int64) string {
return fmt.Sprintf("%s%d", id, encodeTime)
}

type payload struct {
id string
metricNanos int64
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/encoding/protobuf/aggregated_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (d AggregatedDecoder) StoragePolicy() policy.StoragePolicy {
return d.sp
}

// EncodeNanos returns the decoded encodeNanos.
// EncodeNanos is deprecated.
func (d AggregatedDecoder) EncodeNanos() int64 {
return d.pb.EncodeNanos
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func BenchmarkDecodeStoragePolicy(b *testing.B) {
dec = NewAggregatedDecoder(nil)
sp policy.StoragePolicy
)
if err := enc.Encode(testAggregatedMetric1, 2000); err != nil {
if err := enc.Encode(testAggregatedMetric1); err != nil {
b.Fatal(err)
}

Expand Down
Loading