diff --git a/scripts/development/m3_stack/m3collector.yml b/scripts/development/m3_stack/m3collector.yml index f3d4c15eca..667b5d18d9 100644 --- a/scripts/development/m3_stack/m3collector.yml +++ b/scripts/development/m3_stack/m3collector.yml @@ -51,7 +51,6 @@ reporter: initWatchTimeout: 10s hashType: murmur32 shardCutoffLingerDuration: 1m - flushSize: 1440 maxTimerBatchSize: 1120 queueSize: 10000 queueDropType: oldest diff --git a/scripts/docker-integration-tests/aggregator_legacy/m3aggregator.yml b/scripts/docker-integration-tests/aggregator_legacy/m3aggregator.yml index e70caa4b59..5166a2981d 100644 --- a/scripts/docker-integration-tests/aggregator_legacy/m3aggregator.yml +++ b/scripts/docker-integration-tests/aggregator_legacy/m3aggregator.yml @@ -128,7 +128,6 @@ aggregator: watermark: low: 0.001 high: 0.01 - flushSize: 1440 maxTimerBatchSize: 140 queueSize: 1000 queueDropType: oldest diff --git a/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml b/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml index acc91e156e..281662faca 100644 --- a/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator_legacy/m3coordinator.yml @@ -35,9 +35,10 @@ downsample: initWatchTimeout: 10s hashType: murmur32 shardCutoffLingerDuration: 1m - flushSize: 1440 + forceFlushEvery: 1s + flushWorkerCount: 4 maxTimerBatchSize: 1120 - queueSize: 10000 + queueSize: 100 queueDropType: oldest encoder: initBufferSize: 2048 @@ -56,7 +57,7 @@ downsample: ingest: ingester: - workerPoolSize: 10000 + workerPoolSize: 100 opPool: size: 10000 retry: diff --git a/src/aggregator/client/config.go b/src/aggregator/client/config.go index 60b980a1cb..0ddf074889 100644 --- a/src/aggregator/client/config.go +++ b/src/aggregator/client/config.go @@ -52,10 +52,11 @@ type Configuration struct { ShardCutoverWarmupDuration *time.Duration `yaml:"shardCutoverWarmupDuration"` ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"` Encoder EncoderConfiguration `yaml:"encoder"` - FlushSize int `yaml:"flushSize"` + FlushSize int `yaml:"flushSize,omitempty"` // FlushSize is deprecated + FlushWorkerCount int `yaml:"flushWorkerCount"` + ForceFlushEvery time.Duration `yaml:"forceFlushEvery"` MaxBatchSize int `yaml:"maxBatchSize"` MaxTimerBatchSize int `yaml:"maxTimerBatchSize"` - BatchFlushDeadline time.Duration `yaml:"batchFlushDeadline"` QueueSize int `yaml:"queueSize"` QueueDropType *DropType `yaml:"queueDropType"` Connection ConnectionConfiguration `yaml:"connection"` @@ -175,8 +176,11 @@ func (c *Configuration) newClientOptions( if c.ShardCutoffLingerDuration != nil { opts = opts.SetShardCutoffLingerDuration(*c.ShardCutoffLingerDuration) } - if c.FlushSize != 0 { - opts = opts.SetFlushSize(c.FlushSize) + if c.FlushWorkerCount != 0 { + opts = opts.SetFlushWorkerCount(c.FlushWorkerCount) + } + if c.ForceFlushEvery != 0 { + opts = opts.SetForceFlushEvery(c.ForceFlushEvery) } if c.MaxBatchSize != 0 { opts = opts.SetMaxBatchSize(c.MaxBatchSize) @@ -184,9 +188,6 @@ func (c *Configuration) newClientOptions( if c.MaxTimerBatchSize != 0 { opts = opts.SetMaxTimerBatchSize(c.MaxTimerBatchSize) } - if c.BatchFlushDeadline != 0 { - opts = opts.SetBatchFlushDeadline(c.BatchFlushDeadline) - } if c.QueueSize != 0 { opts = opts.SetInstanceQueueSize(c.QueueSize) } diff --git a/src/aggregator/client/config_test.go b/src/aggregator/client/config_test.go index 5054cacbce..9033d0fcdb 100644 --- a/src/aggregator/client/config_test.go +++ b/src/aggregator/client/config_test.go @@ -61,10 +61,10 @@ encoder: watermark: low: 0.001 high: 0.01 -flushSize: 1440 +flushWorkerCount: 10 +forceFlushEvery: 123s maxBatchSize: 42 maxTimerBatchSize: 140 -batchFlushDeadline: 123ms queueSize: 1000 queueDropType: oldest connection: @@ -102,10 +102,10 @@ func TestConfigUnmarshal(t *testing.T) { }, cfg.Encoder.BytesPool.Buckets) require.Equal(t, 0.001, cfg.Encoder.BytesPool.Watermark.RefillLowWatermark) require.Equal(t, 0.01, cfg.Encoder.BytesPool.Watermark.RefillHighWatermark) - require.Equal(t, 1440, cfg.FlushSize) + require.Equal(t, 10, cfg.FlushWorkerCount) + require.Equal(t, 123*time.Second, cfg.ForceFlushEvery) require.Equal(t, 140, cfg.MaxTimerBatchSize) require.Equal(t, 42, cfg.MaxBatchSize) - require.Equal(t, 123*time.Millisecond, cfg.BatchFlushDeadline) require.Equal(t, 1000, cfg.QueueSize) require.Equal(t, DropOldest, *cfg.QueueDropType) require.Equal(t, time.Second, cfg.Connection.ConnectionTimeout) @@ -153,10 +153,10 @@ func TestNewClientOptions(t *testing.T) { require.True(t, store == opts.WatcherOptions().StagedPlacementStore()) require.Equal(t, 10*time.Minute, opts.ShardCutoverWarmupDuration()) require.Equal(t, time.Minute, opts.ShardCutoffLingerDuration()) - require.Equal(t, 1440, opts.FlushSize()) + require.Equal(t, 10, opts.FlushWorkerCount()) + require.Equal(t, 123*time.Second, opts.ForceFlushEvery()) require.Equal(t, 140, opts.MaxTimerBatchSize()) require.Equal(t, 42, opts.MaxBatchSize()) - require.Equal(t, 123*time.Millisecond, opts.BatchFlushDeadline()) require.Equal(t, DropOldest, opts.QueueDropType()) require.Equal(t, time.Second, opts.ConnectionOptions().ConnectionTimeout()) require.Equal(t, true, opts.ConnectionOptions().ConnectionKeepAlive()) diff --git a/src/aggregator/client/options.go b/src/aggregator/client/options.go index da6efb33d2..716539981f 100644 --- a/src/aggregator/client/options.go +++ b/src/aggregator/client/options.go @@ -48,15 +48,15 @@ const ( defaultAggregatorClient = LegacyAggregatorClient - defaultFlushSize = 1440 + defaultFlushWorkerCount = 64 // defaultMaxTimerBatchSize is the default maximum timer batch size. // By default there is no limit on the timer batch size. defaultMaxTimerBatchSize = 0 - // defaultInstanceQueueSize determines how many metrics can be buffered + // defaultInstanceQueueSize determines how many protobuf payloads can be buffered // before it must wait for an existing batch to be flushed to an instance. - defaultInstanceQueueSize = 2 << 15 // ~65k + defaultInstanceQueueSize = 128 // By default traffic is cut over to shards 10 minutes before the designated // cutover time in case there are issues with the instances owning the shards. @@ -72,9 +72,6 @@ const ( // By default set maximum batch size to 8mb. defaultMaxBatchSize = 2 << 22 - - // By default write at least every 100ms. - defaultBatchFlushDeadline = 100 * time.Millisecond ) var ( @@ -186,11 +183,17 @@ type Options interface { // ConnectionOptions returns the connection options. ConnectionOptions() ConnectionOptions - // SetFlushSize sets the buffer size to trigger a flush. - SetFlushSize(value int) Options + // SetFlushWorkerCount sets the max number of workers used for flushing. + SetFlushWorkerCount(value int) Options + + // FlushWorkerCount returns the max number of workers used for flushing. + FlushWorkerCount() int + + // SetForceFlushEvery sets the duration between forced flushes. + SetForceFlushEvery(value time.Duration) Options - // FlushSize returns the buffer size to trigger a flush. - FlushSize() int + // ForceFlushEvery returns the duration, if any, between forced flushes. + ForceFlushEvery() time.Duration // SetMaxTimerBatchSize sets the maximum timer batch size. SetMaxTimerBatchSize(value int) Options @@ -218,12 +221,6 @@ type Options interface { // MaxBatchSize returns the maximum buffer size that triggers a queue drain. MaxBatchSize() int - // SetBatchFlushDeadline sets the deadline that triggers a write of queued buffers. - SetBatchFlushDeadline(value time.Duration) Options - - // BatchFlushDeadline returns the deadline that triggers a write of queued buffers. - BatchFlushDeadline() time.Duration - // SetRWOptions sets RW options. SetRWOptions(value xio.Options) Options @@ -241,12 +238,12 @@ type options struct { shardCutoffLingerDuration time.Duration watcherOpts placement.WatcherOptions connOpts ConnectionOptions - flushSize int + flushWorkerCount int + forceFlushEvery time.Duration maxTimerBatchSize int instanceQueueSize int dropType DropType maxBatchSize int - batchFlushDeadline time.Duration m3msgOptions M3MsgOptions rwOpts xio.Options } @@ -262,12 +259,11 @@ func NewOptions() Options { shardCutoffLingerDuration: defaultShardCutoffLingerDuration, watcherOpts: placement.NewWatcherOptions(), connOpts: NewConnectionOptions(), - flushSize: defaultFlushSize, + flushWorkerCount: defaultFlushWorkerCount, maxTimerBatchSize: defaultMaxTimerBatchSize, instanceQueueSize: defaultInstanceQueueSize, dropType: defaultDropType, maxBatchSize: defaultMaxBatchSize, - batchFlushDeadline: defaultBatchFlushDeadline, rwOpts: xio.NewOptions(), } } @@ -395,14 +391,24 @@ func (o *options) ConnectionOptions() ConnectionOptions { return o.connOpts } -func (o *options) SetFlushSize(value int) Options { +func (o *options) SetFlushWorkerCount(value int) Options { opts := *o - opts.flushSize = value + opts.flushWorkerCount = value return &opts } -func (o *options) FlushSize() int { - return o.flushSize +func (o *options) FlushWorkerCount() int { + return o.flushWorkerCount +} + +func (o *options) SetForceFlushEvery(value time.Duration) Options { + opts := *o + opts.forceFlushEvery = value + return &opts +} + +func (o *options) ForceFlushEvery() time.Duration { + return o.forceFlushEvery } func (o *options) SetMaxTimerBatchSize(value int) Options { @@ -449,19 +455,6 @@ func (o *options) MaxBatchSize() int { return o.maxBatchSize } -func (o *options) SetBatchFlushDeadline(value time.Duration) Options { - opts := *o - if value < 0 { - value = defaultBatchFlushDeadline - } - opts.batchFlushDeadline = value - return &opts -} - -func (o *options) BatchFlushDeadline() time.Duration { - return o.batchFlushDeadline -} - func (o *options) SetRWOptions(value xio.Options) Options { opts := *o opts.rwOpts = value diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index d1dbcb88b4..a018da7e12 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -23,20 +23,32 @@ package client import ( "errors" "fmt" + "math" "strings" "sync" - "time" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/metrics/encoding/protobuf" "github.com/uber-go/tally" + "go.uber.org/atomic" "go.uber.org/zap" ) +const ( + _queueMinWriteBufSize = 65536 + _queueMaxWriteBufSize = 4 * _queueMinWriteBufSize +) + var ( errInstanceQueueClosed = errors.New("instance queue is closed") errWriterQueueFull = errors.New("writer queue is full") + errInvalidDropType = errors.New("invalid queue drop type") + + _queueConnWriteBufPool = sync.Pool{New: func() interface{} { + b := make([]byte, 0, _queueMinWriteBufSize) + return &b + }} ) // DropType determines which metrics should be dropped when the queue is full. @@ -100,27 +112,23 @@ type instanceQueue interface { // Close closes the queue, it blocks until the queue is drained. Close() error + + // Flush flushes the queue, it blocks until the queue is drained. + Flush() } type writeFn func([]byte) error type queue struct { - sync.RWMutex - - log *zap.Logger - metrics queueMetrics - dropType DropType - instance placement.Instance - conn *connection - bufCh chan protobuf.Buffer - doneCh chan struct{} - closed bool - buf []byte - maxBatchSize int - batchFlushDeadline time.Duration - wg sync.WaitGroup - - writeFn writeFn + mtx sync.Mutex + closed atomic.Bool + conn *connection + log *zap.Logger + metrics queueMetrics + dropType DropType + buf qbuf + instance placement.Instance + writeFn writeFn } func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { @@ -131,92 +139,108 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { connOpts = opts.ConnectionOptions(). SetInstrumentOptions(connInstrumentOpts). SetRWOptions(opts.RWOptions()) - conn = newConnection(instance.Endpoint(), connOpts) - iOpts = opts.InstrumentOptions() - queueSize = opts.InstanceQueueSize() - maxBatchSize = opts.MaxBatchSize() - writeInterval = opts.BatchFlushDeadline() + conn = newConnection(instance.Endpoint(), connOpts) + iOpts = opts.InstrumentOptions() + queueSize = opts.InstanceQueueSize() ) + + // Round up queue size to power of 2. + // buf is a ring buffer of byte buffers, so it should definitely be many orders of magnitude + // below max uint32. + qsize := uint32(roundUpToPowerOfTwo(queueSize)) + q := &queue{ - dropType: opts.QueueDropType(), - log: iOpts.Logger(), - metrics: newQueueMetrics(iOpts.MetricsScope(), queueSize), - instance: instance, - conn: conn, - bufCh: make(chan protobuf.Buffer, queueSize), - doneCh: make(chan struct{}), - maxBatchSize: maxBatchSize, - batchFlushDeadline: writeInterval, - buf: make([]byte, 0, maxBatchSize), + dropType: opts.QueueDropType(), + log: iOpts.Logger(), + metrics: newQueueMetrics(iOpts.MetricsScope()), + instance: instance, + conn: conn, + buf: qbuf{ + b: make([]protobuf.Buffer, int(qsize)), + }, } q.writeFn = q.conn.Write - q.wg.Add(1) - go func() { - defer q.wg.Done() - q.drain() - }() - return q } func (q *queue) Enqueue(buf protobuf.Buffer) error { - q.RLock() - if q.closed { - q.RUnlock() + if q.closed.Load() { q.metrics.enqueueClosedErrors.Inc(1) return errInstanceQueueClosed } - for { - select { - case q.bufCh <- buf: - q.RUnlock() - q.metrics.enqueueSuccesses.Inc(1) - return nil - default: - if q.dropType == DropCurrent { - q.RUnlock() - - // Close the buffer so it's resources are freed. - buf.Close() - q.metrics.enqueueCurrentDropped.Inc(1) - return errWriterQueueFull - } - } + if len(buf.Bytes()) == 0 { + return nil + } + + q.mtx.Lock() + defer q.mtx.Unlock() - select { - case buf := <-q.bufCh: - // Close the buffer so it's resources are freed. + if full := q.buf.full(); full { + switch q.dropType { + case DropCurrent: + // Close the current buffer so it's resources are freed. buf.Close() + q.metrics.enqueueCurrentDropped.Inc(1) + return errWriterQueueFull + case DropOldest: + // Consume oldest buffer instead. + oldest := q.buf.shift() + oldest.Close() q.metrics.enqueueOldestDropped.Inc(1) default: + return errInvalidDropType } } + + q.buf.push(buf) + q.metrics.enqueueSuccesses.Inc(1) + return nil } func (q *queue) Close() error { - q.Lock() - if q.closed { + if !q.closed.CAS(false, true) { return errInstanceQueueClosed } - q.closed = true - q.Unlock() - - close(q.doneCh) - q.wg.Wait() - close(q.bufCh) return nil } -func (q *queue) writeAndReset() { - if len(q.buf) == 0 { +func (q *queue) Flush() { + q.mtx.Lock() + + if q.buf.size() == 0 { + q.mtx.Unlock() return } - if err := q.writeFn(q.buf); err != nil { + + buf := _queueConnWriteBufPool.Get().(*[]byte) + + for q.buf.size() > 0 { + b := q.buf.shift() + + bytes := b.Bytes() + if len(bytes) == 0 { + continue + } + + (*buf) = append(*buf, bytes...) + b.Close() + } + + // mutex is not held while doing IO + q.mtx.Unlock() + + size := len(*buf) + if size == 0 { + _queueConnWriteBufPool.Put(buf) + return + } + + if err := q.writeFn(*buf); err != nil { q.log.Error("error writing data", - zap.Int("buffer_size", len(q.buf)), + zap.Int("buffer_size", size), zap.String("target_instance_id", q.instance.ID()), zap.String("target_instance", q.instance.Endpoint()), zap.Error(err), @@ -225,52 +249,19 @@ func (q *queue) writeAndReset() { } else { q.metrics.connWriteSuccesses.Inc(1) } - q.buf = q.buf[:0] -} -func (q *queue) drain() { - defer q.conn.Close() - timer := time.NewTimer(q.batchFlushDeadline) - lastDrain := time.Now() - write := func() { - q.writeAndReset() - lastDrain = time.Now() + // Check buffer capacity, not length, to make sure we're not pooling slices that are too large. + // Otherwise, it could result in multi-megabyte slices hanging around, in case we get a spike in writes. + if cap(*buf) > _queueMaxWriteBufSize { + return } - for { - select { - case qitem := <-q.bufCh: - drained := false - msg := qitem.Bytes() - if len(q.buf)+len(msg) > q.maxBatchSize { - write() - drained = true - } - q.buf = append(q.buf, msg...) - qitem.Close() - - if drained || (len(q.buf) < q.maxBatchSize && - time.Since(lastDrain) < q.batchFlushDeadline) { - continue - } - - write() - case ts := <-timer.C: - delta := ts.Sub(lastDrain) - if delta < q.batchFlushDeadline { - timer.Reset(q.batchFlushDeadline - delta) - continue - } - write() - timer.Reset(q.batchFlushDeadline) - case <-q.doneCh: - return - } - } + (*buf) = (*buf)[:0] + _queueConnWriteBufPool.Put(buf) } func (q *queue) Size() int { - return len(q.bufCh) + return int(q.buf.size()) } type queueMetrics struct { @@ -282,7 +273,7 @@ type queueMetrics struct { connWriteErrors tally.Counter } -func newQueueMetrics(s tally.Scope, queueSize int) queueMetrics { +func newQueueMetrics(s tally.Scope) queueMetrics { enqueueScope := s.Tagged(map[string]string{"action": "enqueue"}) connWriteScope := s.Tagged(map[string]string{"action": "conn-write"}) return queueMetrics{ @@ -297,3 +288,42 @@ func newQueueMetrics(s tally.Scope, queueSize int) queueMetrics { connWriteErrors: connWriteScope.Counter("errors"), } } + +// qbuf is a specialized ring buffer for proto payloads +type qbuf struct { + b []protobuf.Buffer + // buffer cursors + r uint32 + w uint32 +} + +func (q *qbuf) size() uint32 { + return q.w - q.r +} + +func (q *qbuf) full() bool { + return q.size() == uint32(cap(q.b)) +} + +func (q *qbuf) mask(idx uint32) uint32 { + return idx & (uint32(cap(q.b)) - 1) +} + +func (q *qbuf) push(buf protobuf.Buffer) { + q.w++ + idx := q.mask(q.w) + q.b[idx].Close() + q.b[idx] = buf +} + +func (q *qbuf) shift() protobuf.Buffer { + q.r++ + idx := q.mask(q.r) + val := q.b[idx] + q.b[idx] = protobuf.Buffer{} + return val +} + +func roundUpToPowerOfTwo(val int) int { + return int(math.Pow(2, math.Ceil(math.Log2(float64(val))))) +} diff --git a/src/aggregator/client/queue_mock.go b/src/aggregator/client/queue_mock.go index b40fbeb653..00e39b3e3b 100644 --- a/src/aggregator/client/queue_mock.go +++ b/src/aggregator/client/queue_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/aggregator/client/queue.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -96,3 +96,15 @@ func (mr *MockinstanceQueueMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockinstanceQueue)(nil).Close)) } + +// Flush mocks base method +func (m *MockinstanceQueue) Flush() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Flush") +} + +// Flush indicates an expected call of Flush +func (mr *MockinstanceQueueMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockinstanceQueue)(nil).Flush)) +} diff --git a/src/aggregator/client/queue_test.go b/src/aggregator/client/queue_test.go index 32f726d837..9e1c81cc37 100644 --- a/src/aggregator/client/queue_test.go +++ b/src/aggregator/client/queue_test.go @@ -21,9 +21,7 @@ package client import ( - "sync" "testing" - "time" "github.com/m3db/m3/src/metrics/encoding/protobuf" @@ -36,52 +34,64 @@ func TestInstanceQueueEnqueueClosed(t *testing.T) { opts := testOptions() queue := newInstanceQueue(testPlacementInstance, opts).(*queue) queue.writeFn = func([]byte) error { return nil } - queue.closed = true + queue.closed.Store(true) require.Equal(t, errInstanceQueueClosed, queue.Enqueue(testNewBuffer(nil))) } func TestInstanceQueueEnqueueQueueFullDropCurrent(t *testing.T) { opts := testOptions(). - SetInstanceQueueSize(1). - SetQueueDropType(DropCurrent). - SetBatchFlushDeadline(1 * time.Microsecond). - SetMaxBatchSize(1) + SetInstanceQueueSize(2). + SetQueueDropType(DropCurrent) queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - ready := make(chan struct{}) - // Fill up the queue and park the draining goroutine so the queue remains full. - queue.writeFn = func([]byte) error { - ready <- struct{}{} - select {} + var result []byte + queue.writeFn = func(payload []byte) error { + result = payload + return nil } - - queue.bufCh <- testNewBuffer([]byte{42, 42, 42}) - queue.bufCh <- testNewBuffer([]byte{42, 42, 42}) - queue.bufCh <- testNewBuffer([]byte{42, 42, 42}) - <-ready + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42, 43, 44}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{45, 46, 47}))) require.Equal(t, errWriterQueueFull, queue.Enqueue(testNewBuffer([]byte{42}))) + queue.Flush() + require.EqualValues(t, []byte{42, 43, 44, 45, 46, 47}, result) } func TestInstanceQueueEnqueueQueueFullDropOldest(t *testing.T) { opts := testOptions(). - SetInstanceQueueSize(1). - SetBatchFlushDeadline(1 * time.Microsecond). - SetMaxBatchSize(1) + SetInstanceQueueSize(4) queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - ready := make(chan struct{}) - // Fill up the queue and park the draining goroutine so the queue remains full - // until the enqueueing goroutine pulls a buffer off the channel. - queue.writeFn = func([]byte) error { - ready <- struct{}{} - select {} + var result []byte + queue.writeFn = func(payload []byte) error { + result = payload + return nil } - queue.bufCh <- testNewBuffer([]byte{42}) - queue.bufCh <- testNewBuffer([]byte{42}) - <-ready - require.NoError(t, queue.Enqueue(testNewBuffer(nil))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42, 43, 44}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{45, 46, 47}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{1, 2, 3}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{1}))) + + queue.Flush() + require.EqualValues(t, []byte{ + 42, 43, 44, 45, 46, 47, 1, 2, 3, 1, + }, result) + + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{1, 2, 3}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42, 43, 44}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{45, 46, 47}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{1}))) + + queue.Flush() + + require.EqualValues(t, []byte{ + 42, 42, 43, 44, 45, 46, 47, 1, + }, result) } func TestInstanceQueueEnqueueSuccessDrainSuccess(t *testing.T) { @@ -91,7 +101,7 @@ func TestInstanceQueueEnqueueSuccessDrainSuccess(t *testing.T) { res []byte ) - ready := make(chan struct{}) + ready := make(chan struct{}, 1) queue.writeFn = func(data []byte) error { defer func() { ready <- struct{}{} @@ -103,77 +113,16 @@ func TestInstanceQueueEnqueueSuccessDrainSuccess(t *testing.T) { data := []byte("foobar") require.NoError(t, queue.Enqueue(testNewBuffer(data))) + queue.Flush() <-ready - require.Equal(t, data, res) -} -func TestInstanceQueueDrainBatching(t *testing.T) { - var ( - res []byte - resLock sync.Mutex - ) - - newBatchedTestQueue := func(flushDeadline time.Duration, batchSize int) *queue { - res = []byte{} - opts := testOptions(). - SetBatchFlushDeadline(flushDeadline). - SetMaxBatchSize(batchSize) - - queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - - queue.writeFn = func(data []byte) error { - resLock.Lock() - res = append(res, data...) - resLock.Unlock() - return nil - } - return queue - } - - // Test batching by size - data := []byte("foobar") - expected := []byte("foobarfoobarfoobar") - queue := newBatchedTestQueue(500*time.Millisecond, 3*len(data)) - assert.NoError(t, queue.Enqueue(testNewBuffer(data))) - assert.NoError(t, queue.Enqueue(testNewBuffer(data))) - assert.NoError(t, queue.Enqueue(testNewBuffer(data))) - - // Wait for the queue to be drained. - for i := 0; i <= 5; i++ { - resLock.Lock() - if len(res) == len(expected) { - resLock.Unlock() - break - } - resLock.Unlock() - // Total sleep must be less than flush deadline - time.Sleep(5 * time.Millisecond) - } - - assert.Equal(t, expected, res) - - // Test batching by time - queue = newBatchedTestQueue(40*time.Millisecond, 10000) - - assert.NoError(t, queue.Enqueue(testNewBuffer(data))) - assert.NoError(t, queue.Enqueue(testNewBuffer(data))) - - time.Sleep(20 * time.Millisecond) - resLock.Lock() - assert.Equal(t, []byte{}, res) - resLock.Unlock() - - time.Sleep(25 * time.Millisecond) - - resLock.Lock() - assert.Equal(t, []byte("foobarfoobar"), res) - resLock.Unlock() + require.Equal(t, data, res) } func TestInstanceQueueEnqueueSuccessDrainError(t *testing.T) { opts := testOptions() queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - drained := make(chan struct{}) + drained := make(chan struct{}, 1) queue.writeFn = func(data []byte) error { defer func() { drained <- struct{}{} @@ -182,7 +131,7 @@ func TestInstanceQueueEnqueueSuccessDrainError(t *testing.T) { } require.NoError(t, queue.Enqueue(testNewBuffer([]byte{42}))) - + queue.Flush() // Wait for the queue to be drained. <-drained } @@ -190,7 +139,7 @@ func TestInstanceQueueEnqueueSuccessDrainError(t *testing.T) { func TestInstanceQueueEnqueueSuccessWriteError(t *testing.T) { opts := testOptions() queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - done := make(chan struct{}) + done := make(chan struct{}, 1) queue.writeFn = func(data []byte) error { err := queue.conn.Write(data) done <- struct{}{} @@ -198,7 +147,7 @@ func TestInstanceQueueEnqueueSuccessWriteError(t *testing.T) { } require.NoError(t, queue.Enqueue(testNewBuffer([]byte{0x1, 0x2}))) - + queue.Flush() // Wait for the queue to be drained. <-done } @@ -206,7 +155,7 @@ func TestInstanceQueueEnqueueSuccessWriteError(t *testing.T) { func TestInstanceQueueCloseAlreadyClosed(t *testing.T) { opts := testOptions() queue := newInstanceQueue(testPlacementInstance, opts).(*queue) - queue.closed = true + queue.closed.Store(true) require.Equal(t, errInstanceQueueClosed, queue.Close()) } @@ -215,9 +164,26 @@ func TestInstanceQueueCloseSuccess(t *testing.T) { opts := testOptions() queue := newInstanceQueue(testPlacementInstance, opts).(*queue) require.NoError(t, queue.Close()) - require.True(t, queue.closed) - _, ok := <-queue.bufCh - require.False(t, ok) + require.True(t, queue.closed.Load()) + require.Error(t, queue.Enqueue(testNewBuffer([]byte("foo")))) +} + +func TestInstanceQueueSizeIsPowerOfTwo(t *testing.T) { + for _, tt := range []struct { + size int + expected int + }{ + {1, 1}, + {2, 2}, + {3, 4}, + {4, 4}, + {42, 64}, + {123, 128}, + } { + opts := testOptions().SetInstanceQueueSize(tt.size) + q := newInstanceQueue(testPlacementInstance, opts).(*queue) + require.Equal(t, tt.expected, cap(q.buf.b)) + } } func TestDropTypeUnmarshalYAML(t *testing.T) { @@ -247,4 +213,21 @@ func TestDropTypeUnmarshalYAML(t *testing.T) { } } +func TestRoundUpToPowerOfTwo(t *testing.T) { + for _, tt := range []struct { + in, out int + }{ + {1, 1}, + {2, 2}, + {3, 4}, + {4, 4}, + {5, 8}, + {7, 8}, + {33, 64}, + {42, 64}, + } { + assert.Equal(t, tt.out, roundUpToPowerOfTwo(tt.in)) + } +} + func testNewBuffer(data []byte) protobuf.Buffer { return protobuf.NewBuffer(data, nil) } diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go index 79246ca73e..dc81ad6509 100644 --- a/src/aggregator/client/tcp_client.go +++ b/src/aggregator/client/tcp_client.go @@ -71,7 +71,10 @@ func NewTCPClient(opts Options) (*TCPClient, error) { writerMgrScope := instrumentOpts.MetricsScope().SubScope("writer-manager") writerMgrOpts := opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(writerMgrScope)) - writerMgr = newInstanceWriterManager(writerMgrOpts) + writerMgr, err := newInstanceWriterManager(writerMgrOpts) + if err != nil { + return nil, err + } onPlacementChangedFn := func(prev, curr placement.Placement) { writerMgr.AddInstances(curr.Instances()) // nolint: errcheck @@ -255,7 +258,8 @@ func (c *TCPClient) Flush() error { // Close closes the client. func (c *TCPClient) Close() error { - c.placementWatcher.Unwatch() // nolint: errcheck + c.writerMgr.Flush() //nolint:errcheck + c.placementWatcher.Unwatch() //nolint:errcheck // writerMgr errors out if trying to close twice return c.writerMgr.Close() } diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index 916740bf96..e01fb9cc09 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -842,5 +842,7 @@ func testTCPClientOptions() Options { SetShardCutoverWarmupDuration(time.Minute). SetShardCutoffLingerDuration(10 * time.Minute). SetAggregatorClientType(TCPAggregatorClient). - SetWatcherOptions(plOpts) + SetWatcherOptions(plOpts). + SetForceFlushEvery(0). + SetFlushWorkerCount(8) } diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 9a85d96447..a8eda6febf 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -22,7 +22,6 @@ package client import ( "errors" - "fmt" "sync" "github.com/m3db/m3/src/cluster/placement" @@ -36,12 +35,14 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/uber-go/tally" + "go.uber.org/atomic" "go.uber.org/zap" ) var ( - errInstanceWriterClosed = errors.New("instance writer is closed") - errUnrecognizedMetricType = errors.New("unrecognized metric type") + errInstanceWriterClosed = errors.New("instance writer is closed") + errUnrecognizedMetricType = errors.New("unrecognized metric type") + errUnrecognizedPayloadType = errors.New("unrecognized payload type") ) type instanceWriter interface { @@ -67,7 +68,7 @@ type writer struct { metrics writerMetrics encoderOpts protobuf.UnaggregatedOptions queue instanceQueue - flushSize int + maxBatchSize int maxTimerBatchSize int encodersByShard map[uint32]*lockedEncoder @@ -84,7 +85,7 @@ func newInstanceWriter(instance placement.Instance, opts Options) instanceWriter w := &writer{ log: iOpts.Logger(), metrics: newWriterMetrics(scope), - flushSize: opts.FlushSize(), + maxBatchSize: opts.MaxBatchSize(), maxTimerBatchSize: opts.MaxTimerBatchSize(), encoderOpts: opts.EncoderOptions(), queue: newInstanceQueue(instance, queueOpts), @@ -165,21 +166,51 @@ func (w *writer) encodeWithLock( encoder *lockedEncoder, payload payloadUnion, ) error { + encoder.Lock() + + var ( + sizeBefore = encoder.Len() + err error + ) + switch payload.payloadType { case untimedType: - return w.encodeUntimedWithLock(encoder, payload.untimed.metric, payload.untimed.metadatas) + err = w.encodeUntimedWithLock(encoder, payload.untimed.metric, payload.untimed.metadatas) case forwardedType: - return w.encodeForwardedWithLock(encoder, payload.forwarded.metric, payload.forwarded.metadata) + err = w.encodeForwardedWithLock(encoder, payload.forwarded.metric, payload.forwarded.metadata) case timedType: - return w.encodeTimedWithLock(encoder, payload.timed.metric, payload.timed.metadata) + err = w.encodeTimedWithLock(encoder, payload.timed.metric, payload.timed.metadata) case timedWithStagedMetadatasType: elem := payload.timedWithStagedMetadatas - return w.encodeTimedWithStagedMetadatasWithLock(encoder, elem.metric, elem.metadatas) + err = w.encodeTimedWithStagedMetadatasWithLock(encoder, elem.metric, elem.metadatas) case passthroughType: - return w.encodePassthroughWithLock(encoder, payload.passthrough.metric, payload.passthrough.storagePolicy) + err = w.encodePassthroughWithLock(encoder, payload.passthrough.metric, payload.passthrough.storagePolicy) default: - return fmt.Errorf("unknown payload type: %v", payload.payloadType) + err = errUnrecognizedPayloadType + } + + if err != nil { + w.metrics.encodeErrors.Inc(1) + w.log.Error("encode untimed metric error", + zap.Any("payload", payload), + zap.Int("payloadType", int(payload.payloadType)), + zap.Error(err), + ) + // Rewind buffer and clear out the encoder error. + encoder.Truncate(sizeBefore) //nolint:errcheck + encoder.Unlock() + return err } + + if encoder.Len() < w.maxBatchSize { + encoder.Unlock() + return nil + } + + buffer := encoder.Relinquish() + encoder.Unlock() + + return w.enqueueBuffer(buffer) } func (w *writer) encodeUntimedWithLock( @@ -187,13 +218,6 @@ func (w *writer) encodeUntimedWithLock( metricUnion unaggregated.MetricUnion, metadatas metadata.StagedMetadatas, ) error { - encoder.Lock() - - var ( - sizeBefore = encoder.Len() - encodeErr error - enqueueErr error - ) switch metricUnion.Type { case metric.CounterType: msg := encoding.UnaggregatedMessageUnion{ @@ -202,7 +226,8 @@ func (w *writer) encodeUntimedWithLock( Counter: metricUnion.Counter(), StagedMetadatas: metadatas, }} - encodeErr = encoder.EncodeMessage(msg) + + return encoder.EncodeMessage(msg) case metric.TimerType: // If there is no limit on the timer batch size, write the full batch. if w.maxTimerBatchSize == 0 { @@ -212,8 +237,8 @@ func (w *writer) encodeUntimedWithLock( BatchTimer: metricUnion.BatchTimer(), StagedMetadatas: metadatas, }} - encodeErr = encoder.EncodeMessage(msg) - break + + return encoder.EncodeMessage(msg) } // Otherwise, honor maximum timer batch size. @@ -223,6 +248,7 @@ func (w *writer) encodeUntimedWithLock( numTimerValues = len(timerValues) start, end int ) + for start = 0; start < numTimerValues; start = end { end = start + w.maxTimerBatchSize if end > numTimerValues { @@ -239,35 +265,20 @@ func (w *writer) encodeUntimedWithLock( BatchTimer: singleBatchTimer, StagedMetadatas: metadatas, }} - encodeErr = encoder.EncodeMessage(msg) - if encodeErr != nil { - break - } - - // If the buffer isn't big enough continue to the next iteration. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - continue + if err := encoder.EncodeMessage(msg); err != nil { + return err } - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - - // Unlock the encoder before we enqueue the old buffer to ensure other - // goroutines have an oppurtunity to encode metrics while larger timer + // Unlock the encoder before we encode another metric to ensure other + // goroutines have an opportunity to encode metrics while larger timer // batches are being encoded. - encoder.Unlock() - - enqueueErr = w.enqueueBuffer(buffer) - - // Re-lock the encoder and update variables since the encoder's buffer - // may have been updated. - encoder.Lock() - sizeBefore = encoder.Len() - - if enqueueErr != nil { - break + if end < numTimerValues { + encoder.Unlock() + encoder.Lock() } } + + return nil case metric.GaugeType: msg := encoding.UnaggregatedMessageUnion{ Type: encoding.GaugeWithMetadatasType, @@ -275,39 +286,11 @@ func (w *writer) encodeUntimedWithLock( Gauge: metricUnion.Gauge(), StagedMetadatas: metadatas, }} - encodeErr = encoder.EncodeMessage(msg) + return encoder.EncodeMessage(msg) default: - encodeErr = errUnrecognizedMetricType } - if encodeErr != nil { - w.log.Error("encode untimed metric error", - zap.Any("metric", metricUnion), - zap.Any("metadatas", metadatas), - zap.Error(encodeErr), - ) - // Rewind buffer and clear out the encoder error. - encoder.Truncate(sizeBefore) - encoder.Unlock() - w.metrics.encodeErrors.Inc(1) - return encodeErr - } - - if enqueueErr != nil { - encoder.Unlock() - return enqueueErr - } - - // If the buffer size is not big enough, do nothing. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - encoder.Unlock() - return nil - } - - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - encoder.Unlock() - return w.enqueueBuffer(buffer) + return errUnrecognizedMetricType } func (w *writer) encodeForwardedWithLock( @@ -315,38 +298,14 @@ func (w *writer) encodeForwardedWithLock( metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata, ) error { - encoder.Lock() - - sizeBefore := encoder.Len() msg := encoding.UnaggregatedMessageUnion{ Type: encoding.ForwardedMetricWithMetadataType, ForwardedMetricWithMetadata: aggregated.ForwardedMetricWithMetadata{ ForwardedMetric: metric, ForwardMetadata: metadata, }} - if err := encoder.EncodeMessage(msg); err != nil { - w.log.Error("encode forwarded metric error", - zap.Any("metric", metric), - zap.Any("metadata", metadata), - zap.Error(err), - ) - // Rewind buffer and clear out the encoder error. - encoder.Truncate(sizeBefore) - encoder.Unlock() - w.metrics.encodeErrors.Inc(1) - return err - } - - // If the buffer size is not big enough, do nothing. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - encoder.Unlock() - return nil - } - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - encoder.Unlock() - return w.enqueueBuffer(buffer) + return encoder.EncodeMessage(msg) } func (w *writer) encodeTimedWithLock( @@ -354,38 +313,14 @@ func (w *writer) encodeTimedWithLock( metric aggregated.Metric, metadata metadata.TimedMetadata, ) error { - encoder.Lock() - - sizeBefore := encoder.Len() msg := encoding.UnaggregatedMessageUnion{ Type: encoding.TimedMetricWithMetadataType, TimedMetricWithMetadata: aggregated.TimedMetricWithMetadata{ Metric: metric, TimedMetadata: metadata, }} - if err := encoder.EncodeMessage(msg); err != nil { - w.log.Error("encode timed metric error", - zap.Any("metric", metric), - zap.Any("metadata", metadata), - zap.Error(err), - ) - // Rewind buffer and clear out the encoder error. - encoder.Truncate(sizeBefore) - encoder.Unlock() - w.metrics.encodeErrors.Inc(1) - return err - } - - // If the buffer size is not big enough, do nothing. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - encoder.Unlock() - return nil - } - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - encoder.Unlock() - return w.enqueueBuffer(buffer) + return encoder.EncodeMessage(msg) } func (w *writer) encodeTimedWithStagedMetadatasWithLock( @@ -393,38 +328,14 @@ func (w *writer) encodeTimedWithStagedMetadatasWithLock( metric aggregated.Metric, metadatas metadata.StagedMetadatas, ) error { - encoder.Lock() - - sizeBefore := encoder.Len() msg := encoding.UnaggregatedMessageUnion{ Type: encoding.TimedMetricWithMetadatasType, TimedMetricWithMetadatas: aggregated.TimedMetricWithMetadatas{ Metric: metric, StagedMetadatas: metadatas, }} - if err := encoder.EncodeMessage(msg); err != nil { - w.log.Error("encode timed metric error", - zap.Any("metric", metric), - zap.Any("metadatas", metadatas), - zap.Error(err), - ) - // Rewind buffer and clear out the encoder error. - encoder.Truncate(sizeBefore) - encoder.Unlock() - w.metrics.encodeErrors.Inc(1) - return err - } - - // If the buffer size is not big enough, do nothing. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - encoder.Unlock() - return nil - } - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - encoder.Unlock() - return w.enqueueBuffer(buffer) + return encoder.EncodeMessage(msg) } func (w *writer) encodePassthroughWithLock( @@ -432,59 +343,14 @@ func (w *writer) encodePassthroughWithLock( metric aggregated.Metric, storagePolicy policy.StoragePolicy, ) error { - encoder.Lock() - - sizeBefore := encoder.Len() msg := encoding.UnaggregatedMessageUnion{ Type: encoding.PassthroughMetricWithMetadataType, PassthroughMetricWithMetadata: aggregated.PassthroughMetricWithMetadata{ Metric: metric, StoragePolicy: storagePolicy, }} - if err := encoder.EncodeMessage(msg); err != nil { - w.log.Error("encode passthrough metric error", - zap.Any("metric", metric), - zap.Any("storagepolicy", storagePolicy), - zap.Error(err), - ) - // Rewind buffer and clear out the encoder error. - encoder.Truncate(sizeBefore) - encoder.Unlock() - w.metrics.encodeErrors.Inc(1) - return err - } - - // If the buffer size is not big enough, do nothing. - if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { - encoder.Unlock() - return nil - } - // Otherwise we enqueue the current buffer. - buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) - encoder.Unlock() - return w.enqueueBuffer(buffer) -} - -// prepareEnqueueBufferWithLock prepares the writer to enqueue a -// buffer onto its instance queue. It gets a new buffer from pool, -// copies the bytes exceeding sizeBefore to it, resets the encoder -// with the new buffer, and returns the old buffer. -func (w *writer) prepareEnqueueBufferWithLock( - encoder *lockedEncoder, - sizeBefore int, -) protobuf.Buffer { - buf := encoder.Relinquish() - if sizeBefore == 0 { - // If a single write causes the buffer to exceed the flush size, - // reset and send the buffer as is. - return buf - } - // Otherwise we reset the buffer and copy the bytes exceeding sizeBefore, - // and return the old buffer. - encoder.Reset(buf.Bytes()[sizeBefore:]) - buf.Truncate(sizeBefore) - return buf + return encoder.EncodeMessage(msg) } func (w *writer) flushWithLock() error { @@ -501,6 +367,9 @@ func (w *writer) flushWithLock() error { multiErr = multiErr.Add(err) } } + + w.queue.Flush() + return multiErr.FinalError() } @@ -545,6 +414,7 @@ func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder { } type refCountedWriter struct { + dirty atomic.Bool instanceWriter refCount } diff --git a/src/aggregator/client/writer_benchmark_test.go b/src/aggregator/client/writer_benchmark_test.go index 0a44d192b7..50c6a3ef43 100644 --- a/src/aggregator/client/writer_benchmark_test.go +++ b/src/aggregator/client/writer_benchmark_test.go @@ -32,8 +32,6 @@ import ( "github.com/uber-go/tally" ) -const () - var ( testLargerBatchTimer = unaggregated.MetricUnion{ Type: metric.TimerType, @@ -49,7 +47,7 @@ func BenchmarkParallelWriter(b *testing.B) { log: opts.InstrumentOptions().Logger(), metrics: newWriterMetrics(tally.NoopScope), encoderOpts: opts.EncoderOptions(), - flushSize: opts.FlushSize(), + maxBatchSize: opts.MaxBatchSize(), queue: testNoOpQueue{}, encodersByShard: make(map[uint32]*lockedEncoder), } @@ -82,7 +80,7 @@ func BenchmarkSerialOneShardWriter(b *testing.B) { log: opts.InstrumentOptions().Logger(), metrics: newWriterMetrics(tally.NoopScope), encoderOpts: opts.EncoderOptions(), - flushSize: opts.FlushSize(), + maxBatchSize: opts.MaxBatchSize(), queue: testNoOpQueue{}, encodersByShard: make(map[uint32]*lockedEncoder), } @@ -116,7 +114,7 @@ func BenchmarkSerialWriter(b *testing.B) { log: opts.InstrumentOptions().Logger(), metrics: newWriterMetrics(tally.NoopScope), encoderOpts: opts.EncoderOptions(), - flushSize: opts.FlushSize(), + maxBatchSize: opts.MaxBatchSize(), queue: testNoOpQueue{}, encodersByShard: make(map[uint32]*lockedEncoder), } @@ -151,6 +149,7 @@ type testNoOpQueue struct{} func (q testNoOpQueue) Enqueue(protobuf.Buffer) error { return nil } func (q testNoOpQueue) Close() error { return nil } func (q testNoOpQueue) Size() int { return 0 } +func (q testNoOpQueue) Flush() {} type testSerialWriter struct { *writer diff --git a/src/aggregator/client/writer_mgr.go b/src/aggregator/client/writer_mgr.go index 249236902a..aea69d4370 100644 --- a/src/aggregator/client/writer_mgr.go +++ b/src/aggregator/client/writer_mgr.go @@ -26,10 +26,12 @@ import ( "sync" "time" + "github.com/uber-go/tally" + "golang.org/x/sys/cpu" + "github.com/m3db/m3/src/cluster/placement" xerrors "github.com/m3db/m3/src/x/errors" - - "github.com/uber-go/tally" + xsync "github.com/m3db/m3/src/x/sync" ) var ( @@ -65,9 +67,10 @@ type instanceWriterManager interface { } type writerManagerMetrics struct { - instancesAdded tally.Counter - instancesRemoved tally.Counter - queueLen tally.Histogram + instancesAdded tally.Counter + instancesRemoved tally.Counter + queueLen tally.Histogram + dirtyWritersPercent tally.Histogram } func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { @@ -76,6 +79,11 @@ func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { tally.MustMakeExponentialValueBuckets(_queueMetricBucketStart, 2, _queueMetricBuckets)..., ) + percentBuckets := append( + tally.ValueBuckets{0}, + tally.MustMakeLinearValueBuckets(5, 5, 20)..., + ) + return writerManagerMetrics{ instancesAdded: scope.Tagged(map[string]string{ "action": "add", @@ -83,7 +91,8 @@ func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { instancesRemoved: scope.Tagged(map[string]string{ "action": "remove", }).Counter("instances"), - queueLen: scope.Histogram("queue-length", buckets), + queueLen: scope.Histogram("queue-length", buckets), + dirtyWritersPercent: scope.Histogram("dirty-writers-percent", percentBuckets), } } @@ -95,18 +104,40 @@ type writerManager struct { writers map[string]*refCountedWriter closed bool metrics writerManagerMetrics + _ cpu.CacheLinePad + pool xsync.PooledWorkerPool } -func newInstanceWriterManager(opts Options) instanceWriterManager { +func newInstanceWriterManager(opts Options) (instanceWriterManager, error) { wm := &writerManager{ opts: opts, writers: make(map[string]*refCountedWriter), metrics: newWriterManagerMetrics(opts.InstrumentOptions().MetricsScope()), doneCh: make(chan struct{}), } + + pool, err := xsync.NewPooledWorkerPool( + opts.FlushWorkerCount(), + xsync.NewPooledWorkerPoolOptions().SetKillWorkerProbability(0.05), + ) + if err != nil { + return nil, err + } + + wm.pool = pool + wm.pool.Init() + wm.wg.Add(1) go wm.reportMetricsLoop() - return wm + + if opts.ForceFlushEvery() > 0 { + wm.wg.Add(1) + go func() { + wm.flushLoop(opts.ForceFlushEvery()) + }() + } + + return wm, nil } func (mgr *writerManager) AddInstances(instances []placement.Instance) error { @@ -169,24 +200,62 @@ func (mgr *writerManager) Write( mgr.RUnlock() return fmt.Errorf("writer for instance %s is not found", id) } + writer.dirty.Store(true) err := writer.Write(shardID, payload) mgr.RUnlock() + return err } func (mgr *writerManager) Flush() error { mgr.RLock() + defer mgr.RUnlock() + if mgr.closed { - mgr.RUnlock() return errInstanceWriterManagerClosed } - multiErr := xerrors.NewMultiError() + + var ( + errCh = make(chan error, 1) + mErrCh = make(chan xerrors.MultiError, 1) + wg sync.WaitGroup + ) + + numDirty := 0 for _, w := range mgr.writers { - if err := w.Flush(); err != nil { - multiErr = multiErr.Add(err) + if !w.dirty.Load() { + continue } + numDirty++ + w := w + wg.Add(1) + mgr.pool.Go(func() { + defer wg.Done() + + w.dirty.CAS(true, false) + if err := w.Flush(); err != nil { + errCh <- err + } + }) } - mgr.RUnlock() + + percentInUse := 0.0 + if numDirty > 0 && len(mgr.writers) > 0 { + percentInUse = 100.0 * (float64(numDirty) / float64(len(mgr.writers))) + } + mgr.metrics.dirtyWritersPercent.RecordValue(percentInUse) + + go func() { + multiErr := xerrors.NewMultiError() + for err := range errCh { + multiErr = multiErr.Add(err) + } + mErrCh <- multiErr + }() + wg.Wait() + close(errCh) + + multiErr := <-mErrCh return multiErr.FinalError() } @@ -234,3 +303,19 @@ func (mgr *writerManager) reportMetrics() { mgr.metrics.queueLen.RecordValue(float64(writer.QueueSize())) } } + +func (mgr *writerManager) flushLoop(d time.Duration) { + defer mgr.wg.Done() + + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-mgr.doneCh: + return + case <-ticker.C: + mgr.Flush() //nolint:errcheck + } + } +} diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index 17d962ef6d..c5a46fd55f 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -27,7 +27,9 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/goleak" "github.com/m3db/m3/src/cluster/placement" @@ -41,44 +43,55 @@ var ( ) func TestWriterManagerAddInstancesClosed(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.closed = true + mgr.Unlock() require.Equal(t, errInstanceWriterManagerClosed, mgr.AddInstances(nil)) } func TestWriterManagerAddInstancesSingleRef(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) // Add instance lists twice and assert the writer refcount matches expectation. for i := 0; i < 2; i++ { require.NoError(t, mgr.AddInstances([]placement.Instance{testPlacementInstance})) } + mgr.Lock() require.Equal(t, 1, len(mgr.writers)) w, exists := mgr.writers[testPlacementInstance.ID()] + mgr.Unlock() require.True(t, exists) require.Equal(t, int32(2), w.refCount.n) } func TestWriterManagerRemoveInstancesClosed(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.closed = true + mgr.Unlock() require.Equal(t, errInstanceWriterManagerClosed, mgr.RemoveInstances(nil)) } func TestWriterManagerRemoveInstancesSuccess(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) // Add instance lists twice. for i := 0; i < 2; i++ { require.NoError(t, mgr.AddInstances([]placement.Instance{testPlacementInstance})) } + mgr.Lock() require.Equal(t, 1, len(mgr.writers)) + mgr.Unlock() // Remove the instance list once and assert they are not closed. require.NoError(t, mgr.RemoveInstances([]placement.Instance{testPlacementInstance})) + + mgr.Lock() require.Equal(t, 1, len(mgr.writers)) w := mgr.writers[testPlacementInstance.ID()].instanceWriter.(*writer) require.False(t, w.closed) + mgr.Unlock() // Remove the instance list again and assert the writer is now removed. nonexistent := placement.NewInstance(). @@ -97,9 +110,11 @@ func TestWriterManagerRemoveInstancesSuccess(t *testing.T) { func TestWriterManagerRemoveInstancesNonBlocking(t *testing.T) { var ( opts = testOptions().SetInstanceQueueSize(200) - mgr = newInstanceWriterManager(opts).(*writerManager) + mgr = mustMakeInstanceWriterManager(opts) ) require.NoError(t, mgr.AddInstances([]placement.Instance{testPlacementInstance})) + + mgr.Lock() require.Equal(t, 1, len(mgr.writers)) w := mgr.writers[testPlacementInstance.ID()].instanceWriter.(*writer) @@ -107,6 +122,8 @@ func TestWriterManagerRemoveInstancesNonBlocking(t *testing.T) { time.Sleep(time.Second) return nil } + mgr.Unlock() + data := []byte("foo") for i := 0; i < opts.InstanceQueueSize(); i++ { require.NoError(t, w.queue.Enqueue(testNewBuffer(data))) @@ -128,8 +145,10 @@ func TestWriterManagerWriteUntimedClosed(t *testing.T) { metadatas: testStagedMetadatas, }, } - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.closed = true + mgr.Unlock() err := mgr.Write(testPlacementInstance, 0, payload) require.Equal(t, errInstanceWriterManagerClosed, err) } @@ -142,8 +161,7 @@ func TestWriterManagerWriteUntimedNoInstances(t *testing.T) { metadatas: testStagedMetadatas, }, } - mgr := newInstanceWriterManager(testOptions()).(*writerManager) - mgr.closed = false + mgr := mustMakeInstanceWriterManager(testOptions()) err := mgr.Write(testPlacementInstance, 0, payload) require.Error(t, err) require.NoError(t, mgr.Close()) @@ -175,11 +193,13 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) { payloadRes = payload return nil }) - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.writers[instances[0].ID()] = &refCountedWriter{ refCount: refCount{n: 1}, instanceWriter: writer, } + mgr.Unlock() payload := payloadUnion{ payloadType: untimedType, @@ -189,7 +209,9 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) { }, } require.NoError(t, mgr.Write(testPlacementInstance, 0, payload)) - require.Equal(t, 1, len(mgr.writers)) + mgr.Lock() + assert.Equal(t, 1, len(mgr.writers)) + mgr.Unlock() require.Equal(t, uint32(0), shardRes) require.Equal(t, untimedType, payloadRes.payloadType) require.Equal(t, testCounter, payloadRes.untimed.metric) @@ -197,7 +219,7 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) { } func TestWriterManagerFlushClosed(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) mgr.closed = true require.Equal(t, errInstanceWriterManagerClosed, mgr.Flush()) } @@ -207,7 +229,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) { defer ctrl.Finish() var ( - numFlushes int + numFlushes atomic.Int64 instances = []placement.Instance{ testPlacementInstance, placement.NewInstance(). @@ -218,21 +240,24 @@ func TestWriterManagerFlushPartialError(t *testing.T) { writer1 := NewMockinstanceWriter(ctrl) writer1.EXPECT().QueueSize().AnyTimes() + writer1.EXPECT().Write(gomock.Any(), gomock.Any()) writer1.EXPECT(). Flush(). DoAndReturn(func() error { - numFlushes++ + numFlushes.Inc() return nil }) errTestFlush := errors.New("test flush error") writer2 := NewMockinstanceWriter(ctrl) writer2.EXPECT().QueueSize().AnyTimes() + writer2.EXPECT().Write(gomock.Any(), gomock.Any()) writer2.EXPECT(). Flush(). DoAndReturn(func() error { return errTestFlush }) - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.writers[instances[0].ID()] = &refCountedWriter{ refCount: refCount{n: 1}, instanceWriter: writer1, @@ -241,28 +266,40 @@ func TestWriterManagerFlushPartialError(t *testing.T) { refCount: refCount{n: 1}, instanceWriter: writer2, } + mgr.Unlock() + mgr.Write(instances[0], 0, payloadUnion{}) //nolint:errcheck + mgr.Write(instances[1], 0, payloadUnion{}) //nolint:errcheck err := mgr.Flush() require.Error(t, err) require.True(t, strings.Contains(err.Error(), errTestFlush.Error())) - require.Equal(t, 1, numFlushes) + require.Equal(t, int64(1), numFlushes.Load()) } func TestWriterManagerCloseAlreadyClosed(t *testing.T) { - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) + mgr.Lock() mgr.closed = true + mgr.Unlock() require.Equal(t, errInstanceWriterManagerClosed, mgr.Close()) } func TestWriterManagerCloseSuccess(t *testing.T) { - opts := goleak.IgnoreCurrent() // TODO: other tests don't clean up properly - defer goleak.VerifyNone(t, opts) + // TODO: other tests don't clean up properly, and pool has no Shutdown method + defer goleak.VerifyNone( + t, + goleak.IgnoreCurrent(), + goleak.IgnoreTopFunction("github.com/m3db/m3/src/x/sync.(*pooledWorkerPool).spawnWorker.func1"), + ) - mgr := newInstanceWriterManager(testOptions()).(*writerManager) + mgr := mustMakeInstanceWriterManager(testOptions()) // Add instance list and close. require.NoError(t, mgr.AddInstances([]placement.Instance{testPlacementInstance})) require.NoError(t, mgr.Close()) + mgr.Lock() require.True(t, mgr.closed) + mgr.Unlock() + require.True(t, clock.WaitUntil(func() bool { for _, w := range mgr.writers { wr := w.instanceWriter.(*writer) @@ -277,3 +314,12 @@ func TestWriterManagerCloseSuccess(t *testing.T) { return true }, 3*time.Second)) } + +func mustMakeInstanceWriterManager(opts Options) *writerManager { + wm, err := newInstanceWriterManager(opts) + if err != nil { + panic(err) + } + + return wm.(*writerManager) +} diff --git a/src/aggregator/client/writer_test.go b/src/aggregator/client/writer_test.go index cf29b4c633..8c6ca9bef4 100644 --- a/src/aggregator/client/writer_test.go +++ b/src/aggregator/client/writer_test.go @@ -179,7 +179,7 @@ func TestWriterWriteUntimedCounterWithFlushingZeroSizeBefore(t *testing.T) { enqueuedBuf = buf return nil }) - w := newInstanceWriter(testPlacementInstance, testOptions().SetFlushSize(3)).(*writer) + w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -207,7 +207,6 @@ func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { var ( stream = protobuf.NewBuffer([]byte{1, 2, 3, 4, 5, 6, 7}, nil) - resetBytes []byte enqueuedBuf protobuf.Buffer ) encoder := protobuf.NewMockUnaggregatedEncoder(ctrl) @@ -222,9 +221,6 @@ func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { }).Return(nil), encoder.EXPECT().Len().Return(7), encoder.EXPECT().Relinquish().Return(stream), - encoder.EXPECT(). - Reset([]byte{4, 5, 6, 7}). - DoAndReturn(func(data []byte) { resetBytes = data }), ) queue := NewMockinstanceQueue(ctrl) queue.EXPECT(). @@ -233,7 +229,7 @@ func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { enqueuedBuf = buf return nil }) - w := newInstanceWriter(testPlacementInstance, testOptions().SetFlushSize(3)).(*writer) + w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -252,8 +248,7 @@ func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { require.True(t, exists) require.NotNil(t, enc) require.Equal(t, 1, len(w.encodersByShard)) - require.Equal(t, []byte{0x4, 0x5, 0x6, 0x7}, resetBytes) - require.Equal(t, []byte{0x1, 0x2, 0x3}, enqueuedBuf.Bytes()) + require.Equal(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, enqueuedBuf.Bytes()) } func TestWriterWriteUntimedBatchTimerNoBatchSizeLimit(t *testing.T) { @@ -312,7 +307,7 @@ func TestWriterWriteUntimedBatchTimerSmallBatchSize(t *testing.T) { StagedMetadatas: testStagedMetadatas, }, }).Return(nil), - encoder.EXPECT().Len().Return(7).Times(2), + encoder.EXPECT().Len().Return(7), ) opts := testOptions().SetMaxTimerBatchSize(140) w := newInstanceWriter(testPlacementInstance, opts).(*writer) @@ -354,7 +349,7 @@ func TestWriterWriteUntimedBatchTimerLargeBatchSize(t *testing.T) { expectedNumBatches = int(math.Ceil(float64(numValues) / float64(maxBatchSize))) ) encoder := protobuf.NewMockUnaggregatedEncoder(ctrl) - encoder.EXPECT().Len().Return(7).Times(expectedNumBatches + 2) + encoder.EXPECT().Len().Return(7).MinTimes(2) encoder.EXPECT(). EncodeMessage(gomock.Any()). DoAndReturn(func(msg encoding.UnaggregatedMessageUnion) error { @@ -364,6 +359,8 @@ func TestWriterWriteUntimedBatchTimerLargeBatchSize(t *testing.T) { metadataRes = append(metadataRes, msg.BatchTimerWithMetadatas.StagedMetadatas) return nil }).Times(expectedNumBatches) + encoder.EXPECT().Relinquish() + opts := testOptions().SetMaxTimerBatchSize(maxBatchSize) w := newInstanceWriter(testPlacementInstance, opts).(*writer) w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { @@ -378,6 +375,7 @@ func TestWriterWriteUntimedBatchTimerLargeBatchSize(t *testing.T) { }, } require.NoError(t, w.Write(0, payload)) + require.NoError(t, w.Flush()) var ( expectedMsgTypes []encoding.UnaggregatedMessageType @@ -403,37 +401,57 @@ func TestWriterWriteUntimedBatchTimerLargeBatchSize(t *testing.T) { } func TestWriterWriteUntimedLargeBatchTimerUsesMultipleBuffers(t *testing.T) { - numValues := 1400 + const ( + numValues = 1400 + testIDName = "testLargeBatchTimer" + ) + timerValues := make([]float64, numValues) for i := 0; i < numValues; i++ { timerValues[i] = float64(i) } - testLargeBatchTimer := unaggregated.MetricUnion{ - Type: metric.TimerType, - ID: []byte("testLargeBatchTimer"), - BatchTimerVal: timerValues, - } - testScope := tally.NewTestScope("", nil) - iOpts := instrument.NewOptions().SetMetricsScope(testScope) - opts := testOptions(). - SetMaxTimerBatchSize(140). - SetInstrumentOptions(iOpts) - - w := newInstanceWriter(testPlacementInstance, opts).(*writer) + var ( + testLargeBatchTimer = unaggregated.MetricUnion{ + Type: metric.TimerType, + ID: []byte(testIDName), + BatchTimerVal: timerValues, + } + payload = payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: testLargeBatchTimer, + metadatas: testStagedMetadatas, + }, + } + testScope = tally.NewTestScope("", nil) + iOpts = instrument.NewOptions().SetMetricsScope(testScope) + opts = testOptions(). + SetMaxBatchSize(1000). + SetMaxTimerBatchSize(10). + SetInstrumentOptions(iOpts) + + w = newInstanceWriter(testPlacementInstance, opts).(*writer) + q = w.queue.(*queue) + payloadCount int + ) - payload := payloadUnion{ - payloadType: untimedType, - untimed: untimedPayload{ - metric: testLargeBatchTimer, - metadatas: testStagedMetadatas, - }, + q.writeFn = func(payload []byte) error { + payloadCount += strings.Count(string(payload), testIDName) + return nil } + require.NoError(t, w.Write(0, payload)) + require.NoError(t, w.Flush()) + time.Sleep(1 * time.Second) // TODO: remove once queue is sync + require.NoError(t, w.Close()) enqueuedCounter := testScope.Snapshot().Counters()["buffers+action=enqueued"] require.NotNil(t, enqueuedCounter) - require.Equal(t, int64(5), enqueuedCounter.Value()) + // Expect 1 byte buffer to be enqueued to write to network, + // but timer itself should be split to multiple protobuf payloads. + require.Equal(t, int64(1), enqueuedCounter.Value()) + require.Equal(t, 140, payloadCount) } func TestWriterWriteUntimedBatchTimerWriteError(t *testing.T) { @@ -458,10 +476,10 @@ func TestWriterWriteUntimedBatchTimerWriteError(t *testing.T) { encoder.EXPECT(). EncodeMessage(gomock.Any()). Return(nil), - encoder.EXPECT().Len().Return(5), encoder.EXPECT(). EncodeMessage(gomock.Any()). Return(errTestWrite), + encoder.EXPECT().Truncate(3).Return(nil), ) opts := testOptions().SetMaxTimerBatchSize(3) @@ -489,7 +507,7 @@ func TestWriterWriteUntimedBatchTimerEnqueueError(t *testing.T) { queue.EXPECT().Enqueue(gomock.Any()).Return(errTestEnqueue) opts := testOptions(). SetMaxTimerBatchSize(1). - SetFlushSize(1) + SetMaxBatchSize(1) w := newInstanceWriter(testPlacementInstance, opts).(*writer) w.queue = queue @@ -562,7 +580,7 @@ func TestWriterWriteForwardedWithFlushingZeroSizeBefore(t *testing.T) { enqueuedBuf = buf return nil }) - w := newInstanceWriter(testPlacementInstance, testOptions().SetFlushSize(3)).(*writer) + w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -590,7 +608,6 @@ func TestWriterWriteForwardedWithFlushingPositiveSizeBefore(t *testing.T) { var ( stream = protobuf.NewBuffer([]byte{1, 2, 3, 4, 5, 6, 7}, nil) - resetBytes []byte enqueuedBuf protobuf.Buffer ) encoder := protobuf.NewMockUnaggregatedEncoder(ctrl) @@ -605,9 +622,6 @@ func TestWriterWriteForwardedWithFlushingPositiveSizeBefore(t *testing.T) { }).Return(nil), encoder.EXPECT().Len().Return(7), encoder.EXPECT().Relinquish().Return(stream), - encoder.EXPECT(). - Reset([]byte{4, 5, 6, 7}). - DoAndReturn(func(data []byte) { resetBytes = data }), ) queue := NewMockinstanceQueue(ctrl) queue.EXPECT(). @@ -616,7 +630,7 @@ func TestWriterWriteForwardedWithFlushingPositiveSizeBefore(t *testing.T) { enqueuedBuf = buf return nil }) - w := newInstanceWriter(testPlacementInstance, testOptions().SetFlushSize(3)).(*writer) + w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -635,8 +649,7 @@ func TestWriterWriteForwardedWithFlushingPositiveSizeBefore(t *testing.T) { require.True(t, exists) require.NotNil(t, enc) require.Equal(t, 1, len(w.encodersByShard)) - require.Equal(t, []byte{0x4, 0x5, 0x6, 0x7}, resetBytes) - require.Equal(t, []byte{0x1, 0x2, 0x3}, enqueuedBuf.Bytes()) + require.Equal(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}, enqueuedBuf.Bytes()) } func TestWriterWriteForwardedEncodeError(t *testing.T) { @@ -678,7 +691,7 @@ func TestWriterWriteForwardedEnqueueError(t *testing.T) { queue.EXPECT().Enqueue(gomock.Any()).Return(errTestEnqueue) opts := testOptions(). SetMaxTimerBatchSize(1). - SetFlushSize(1) + SetMaxBatchSize(1) w := newInstanceWriter(testPlacementInstance, opts).(*writer) w.queue = queue @@ -719,6 +732,7 @@ func TestWriterFlushPartialError(t *testing.T) { return nil }). Times(2) + queue.EXPECT().Flush().MinTimes(1) opts := testOptions() w := newInstanceWriter(testPlacementInstance, opts).(*writer) w.queue = queue @@ -764,20 +778,20 @@ func TestWriterConcurrentWriteStress(t *testing.T) { params := []struct { maxInputBatchSize int maxTimerBatchSize int - flushSize int + maxBatchSize int }{ // High likelihood of counter/gauge encoding triggering a flush in between // releasing and re-acquiring locks when encoding large timer batches. { maxInputBatchSize: 150, maxTimerBatchSize: 150, - flushSize: 1000, + maxBatchSize: 1000, }, // Large timer batches. { maxInputBatchSize: 1000, maxTimerBatchSize: 140, - flushSize: 1440, + maxBatchSize: 1440, }, } @@ -786,7 +800,7 @@ func TestWriterConcurrentWriteStress(t *testing.T) { t, param.maxInputBatchSize, param.maxTimerBatchSize, - param.flushSize, + param.maxBatchSize, ) } } @@ -795,7 +809,7 @@ func testWriterConcurrentWriteStress( t *testing.T, maxInputBatchSize int, maxTimerBatchSize int, - flushSize int, + maxBatchSize int, ) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -862,9 +876,10 @@ func testWriterConcurrentWriteStress( return nil }). AnyTimes() + queue.EXPECT().Flush().MinTimes(1) opts := testOptions(). SetMaxTimerBatchSize(maxTimerBatchSize). - SetFlushSize(flushSize) + SetMaxBatchSize(maxBatchSize) w := newInstanceWriter(testPlacementInstance, opts).(*writer) w.queue = queue diff --git a/src/aggregator/config/m3aggregator.yml b/src/aggregator/config/m3aggregator.yml index 366581d4e2..f933709641 100644 --- a/src/aggregator/config/m3aggregator.yml +++ b/src/aggregator/config/m3aggregator.yml @@ -130,7 +130,6 @@ aggregator: watermark: low: 0.001 high: 0.01 - flushSize: 1440 maxTimerBatchSize: 140 queueSize: 1000 queueDropType: oldest diff --git a/src/collector/config/m3collector.yml b/src/collector/config/m3collector.yml index b555eeccd7..b9a0181b8d 100644 --- a/src/collector/config/m3collector.yml +++ b/src/collector/config/m3collector.yml @@ -51,7 +51,6 @@ reporter: initWatchTimeout: 10s hashType: murmur32 shardCutoffLingerDuration: 1m - flushSize: 1440 maxTimerBatchSize: 1120 queueSize: 10000 queueDropType: oldest diff --git a/src/collector/integration/options.go b/src/collector/integration/options.go index 5f0b236f3f..1a6a1b6178 100644 --- a/src/collector/integration/options.go +++ b/src/collector/integration/options.go @@ -110,10 +110,12 @@ func newTestOptions() testOptions { instrumentOpts: instrument.NewOptions(), serverOpts: server.NewOptions(), serverStateChangeTimeout: defaultServerStateChangeTimeout, - store: mem.NewStore(), - cacheOpts: cache.NewOptions(), - matcherOpts: matcher.NewOptions(), - aggClientOpts: aggclient.NewOptions(), + store: mem.NewStore(), + cacheOpts: cache.NewOptions(), + matcherOpts: matcher.NewOptions(), + aggClientOpts: aggclient.NewOptions(). + SetMaxBatchSize(65536). + SetFlushWorkerCount(4), aggReporterOpts: aggreporter.NewReporterOptions(), } } diff --git a/src/collector/integration/report_with_rule_updates_test.go b/src/collector/integration/report_with_rule_updates_test.go index e4ebbf8f39..6fbbcca1cb 100644 --- a/src/collector/integration/report_with_rule_updates_test.go +++ b/src/collector/integration/report_with_rule_updates_test.go @@ -183,6 +183,7 @@ func testReportWithRuleUpdates( }) } expectedResultsLock.Unlock() + require.NoError(t, reporter.Flush()) } }()