From 4167b43c9d5f8df90c8c458cd68ea859f2e93a67 Mon Sep 17 00:00:00 2001 From: shaan420 Date: Thu, 22 Sep 2022 09:16:08 -0700 Subject: [PATCH] allow non-blocking flushes across writers (#4132) added queue size bytes threshold perform queue report in flush to avoid locking queue simplified MultipleWriters UnitTest linter fix integration test add/fix flush metrics Update src/aggregator/client/writer.go Co-authored-by: Vytenis Darulis Close() busy loop to clear out unsent data review comments change default queue size bytes to unlimited recify multierror handling in integration test fix linter fix integration test --- src/aggregator/client/config.go | 4 + src/aggregator/client/conn.go | 2 +- src/aggregator/client/options.go | 28 ++ src/aggregator/client/queue.go | 107 ++++++-- src/aggregator/client/queue_mock.go | 14 + src/aggregator/client/queue_test.go | 136 ++++++++++ src/aggregator/client/writer.go | 125 ++++++--- .../client/writer_benchmark_test.go | 1 + src/aggregator/client/writer_mgr.go | 65 +---- src/aggregator/client/writer_mgr_test.go | 178 +++++++++++- src/aggregator/client/writer_test.go | 253 +++++++++++++++++- src/aggregator/integration/client.go | 12 +- src/x/errors/errors.go | 18 ++ src/x/errors/errors_test.go | 49 ++++ 14 files changed, 843 insertions(+), 149 deletions(-) diff --git a/src/aggregator/client/config.go b/src/aggregator/client/config.go index 0ddf074889..fe0d53eaf0 100644 --- a/src/aggregator/client/config.go +++ b/src/aggregator/client/config.go @@ -58,6 +58,7 @@ type Configuration struct { MaxBatchSize int `yaml:"maxBatchSize"` MaxTimerBatchSize int `yaml:"maxTimerBatchSize"` QueueSize int `yaml:"queueSize"` + QueueSizeBytes int `yaml:"queueSizeBytes"` QueueDropType *DropType `yaml:"queueDropType"` Connection ConnectionConfiguration `yaml:"connection"` } @@ -191,6 +192,9 @@ func (c *Configuration) newClientOptions( if c.QueueSize != 0 { opts = opts.SetInstanceQueueSize(c.QueueSize) } + if c.QueueSizeBytes != 0 { + opts = opts.SetInstanceMaxQueueSizeBytes(c.QueueSizeBytes) + } if c.QueueDropType != nil { opts = opts.SetQueueDropType(*c.QueueDropType) } diff --git a/src/aggregator/client/conn.go b/src/aggregator/client/conn.go index 705ff1d48f..bad6e889c8 100644 --- a/src/aggregator/client/conn.go +++ b/src/aggregator/client/conn.go @@ -102,9 +102,9 @@ func newConnection(addr string, opts ConnectionOptions) *connection { ), metrics: newConnectionMetrics(opts.InstrumentOptions().MetricsScope()), } + c.connectWithLockFn = c.connectWithLock c.writeWithLockFn = c.writeWithLock - return c } diff --git a/src/aggregator/client/options.go b/src/aggregator/client/options.go index f42dc69411..a7da0f4250 100644 --- a/src/aggregator/client/options.go +++ b/src/aggregator/client/options.go @@ -58,6 +58,12 @@ const ( // before it must wait for an existing batch to be flushed to an instance. defaultInstanceQueueSize = 128 + // defaultInstanceMaxQueueSizeBytes determines how many bytes across all payloads + // can be buffered in the queue before the eviction policy kicks in (policy is + // specified in DropType) + // 0 = no-limit (size-based limiting disabled by default) + defaultInstanceMaxQueueSizeBytes = 0 + // By default traffic is cut over to shards 10 minutes before the designated // cutover time in case there are issues with the instances owning the shards. defaultShardCutoverWarmupDuration = 10 * time.Minute @@ -212,6 +218,16 @@ type Options interface { // InstanceQueueSize returns the instance queue size. InstanceQueueSize() int + // SetInstanceQueueSize sets the instance max + // queue size threshold in bytes across all items in the queue. + SetInstanceMaxQueueSizeBytes(value int) Options + + // InstanceMaxQueueSizeBytes returns the instance max + // queue size threshold in bytes across all items in the queue + // after which the eviction policy (dictated by DropType) + // should be triggered. + InstanceMaxQueueSizeBytes() int + // SetQueueDropType sets the strategy for which metrics should metrics should be dropped when // the queue is full. SetQueueDropType(value DropType) Options @@ -247,6 +263,7 @@ type options struct { forceFlushEvery time.Duration maxTimerBatchSize int instanceQueueSize int + instanceMaxQueueSizeBytes int dropType DropType maxBatchSize int flushWorkerCount int @@ -267,6 +284,7 @@ func NewOptions() Options { flushWorkerCount: defaultFlushWorkerCount, maxTimerBatchSize: defaultMaxTimerBatchSize, instanceQueueSize: defaultInstanceQueueSize, + instanceMaxQueueSizeBytes: defaultInstanceMaxQueueSizeBytes, dropType: defaultDropType, maxBatchSize: defaultMaxBatchSize, rwOpts: xio.NewOptions(), @@ -436,6 +454,16 @@ func (o *options) InstanceQueueSize() int { return o.instanceQueueSize } +func (o *options) SetInstanceMaxQueueSizeBytes(value int) Options { + opts := *o + opts.instanceMaxQueueSizeBytes = value + return &opts +} + +func (o *options) InstanceMaxQueueSizeBytes() int { + return o.instanceMaxQueueSizeBytes +} + func (o *options) SetQueueDropType(value DropType) Options { opts := *o opts.dropType = value diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index 471290f1b6..b774d1f581 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -37,8 +37,14 @@ import ( ) const ( - _queueMinWriteBufSize = 65536 - _queueMaxWriteBufSize = 8 * _queueMinWriteBufSize + _queueMinWriteBufSize = 65536 + _queueMaxWriteBufSize = 8 * _queueMinWriteBufSize + _queueFlushBytesMetricBuckets = 16 + _queueFlushBytesMetricBucketStart = 1024 + _queueFlushItemsMetricBuckets = 8 + _queueFlushItemsMetricBucketStart = 16 + _queueBufSizesMetricBuckets = 16 + _queueBufSizesMetricBucketStart = 128 ) var ( @@ -111,6 +117,9 @@ type instanceQueue interface { // Size returns the number of items in the queue. Size() int + // SizeBytes returns the total bytes held up in the queue. + SizeBytes() int + // Close closes the queue, it blocks until the queue is drained. Close() error @@ -129,7 +138,7 @@ type queue struct { buf qbuf dropType DropType closed atomic.Bool - mtx sync.Mutex + bufMtx sync.Mutex } func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { @@ -140,9 +149,10 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { connOpts = opts.ConnectionOptions(). SetInstrumentOptions(connInstrumentOpts). SetRWOptions(opts.RWOptions()) - conn = newConnection(instance.Endpoint(), connOpts) - iOpts = opts.InstrumentOptions() - queueSize = opts.InstanceQueueSize() + conn = newConnection(instance.Endpoint(), connOpts) + iOpts = opts.InstrumentOptions() + queueSize = opts.InstanceQueueSize() + maxQueueSizeBytes = opts.InstanceMaxQueueSizeBytes() ) // Round up queue size to power of 2. @@ -157,7 +167,8 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { instance: instance, conn: conn, buf: qbuf{ - b: make([]protobuf.Buffer, int(qsize)), + b: make([]protobuf.Buffer, int(qsize)), + maxSizeBytes: uint32(maxQueueSizeBytes), }, } q.writeFn = q.conn.Write @@ -171,14 +182,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error { return errInstanceQueueClosed } + q.bufMtx.Lock() + defer q.bufMtx.Unlock() + if len(buf.Bytes()) == 0 { return nil } - q.mtx.Lock() - defer q.mtx.Unlock() - - if full := q.buf.full(); full { + full := q.buf.full() + for full { switch q.dropType { case DropCurrent: // Close the current buffer so it's resources are freed. @@ -193,10 +205,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error { default: return errInvalidDropType } + + full = q.buf.full() } + // NB: The qbuf can still hold a single super huge buffer way bigger + // maxSizeBytes. q.buf.push(buf) q.metrics.enqueueSuccesses.Inc(1) + q.metrics.queueBufSizes.RecordValue(float64(len(buf.Bytes()))) return nil } @@ -243,15 +260,24 @@ func (q *queue) Flush() { func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) { var n int - q.mtx.Lock() + // Some bits and pieces of this logic could be done under + // a read lock as opposed to taking a full lock but that + // would unnecessarily add complexity for no meaningful gain + // in performance. Besides, grabbing and releasing multiple times + // could be more expensive than grabbing a full lock once. + q.bufMtx.Lock() + + // Before initiating a flush, record the size of the queue + q.metrics.bytesToFlush.RecordValue(float64(q.buf.getSizeBytes())) + q.metrics.itemsToFlush.RecordValue(float64(q.buf.sizeItems())) - if q.buf.size() == 0 { - q.mtx.Unlock() + if q.buf.sizeItems() == 0 { + q.bufMtx.Unlock() return n, io.EOF } *tmpWriteBuf = (*tmpWriteBuf)[:0] - for q.buf.size() > 0 { + for q.buf.sizeItems() > 0 { protoBuffer := q.buf.peek() bytes := protoBuffer.Bytes() @@ -270,8 +296,8 @@ func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) { protoBuffer.Close() } - // mutex is not held while doing IO - q.mtx.Unlock() + q.bufMtx.Unlock() + // Perform the write after releasing the bufMtx if n == 0 { return n, io.EOF @@ -288,7 +314,11 @@ func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) { } func (q *queue) Size() int { - return int(q.buf.size()) + return int(q.buf.sizeItems()) +} + +func (q *queue) SizeBytes() int { + return int(q.buf.getSizeBytes()) } type queueMetrics struct { @@ -296,13 +326,33 @@ type queueMetrics struct { enqueueOldestDropped tally.Counter enqueueCurrentDropped tally.Counter enqueueClosedErrors tally.Counter + bytesToFlush tally.Histogram + itemsToFlush tally.Histogram + queueBufSizes tally.Histogram connWriteSuccesses tally.Counter connWriteErrors tally.Counter } func newQueueMetrics(s tally.Scope) queueMetrics { + bucketsItemsToFlush := append( + tally.ValueBuckets{0}, + tally.MustMakeExponentialValueBuckets(_queueFlushItemsMetricBucketStart, 2, _queueFlushItemsMetricBuckets)..., + ) + + bucketsBytesToFlush := append( + tally.ValueBuckets{0}, + tally.MustMakeExponentialValueBuckets(_queueFlushBytesMetricBucketStart, 2, _queueFlushBytesMetricBuckets)..., + ) + + bucketsQueueBufSizes := append( + tally.ValueBuckets{0}, + tally.MustMakeExponentialValueBuckets(_queueBufSizesMetricBucketStart, 2, _queueBufSizesMetricBuckets)..., + ) + enqueueScope := s.Tagged(map[string]string{"action": "enqueue"}) connWriteScope := s.Tagged(map[string]string{"action": "conn-write"}) + flushScope := s.Tagged(map[string]string{"action": "flush"}) + return queueMetrics{ enqueueSuccesses: enqueueScope.Counter("successes"), enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}). @@ -311,6 +361,10 @@ func newQueueMetrics(s tally.Scope) queueMetrics { Counter("dropped"), enqueueClosedErrors: enqueueScope.Tagged(map[string]string{"error-type": "queue-closed"}). Counter("errors"), + queueBufSizes: enqueueScope.Histogram("buf-sizes", bucketsQueueBufSizes), + bytesToFlush: flushScope.Histogram("flush-bytes", bucketsBytesToFlush), + itemsToFlush: flushScope.Histogram("flush-items", bucketsItemsToFlush), + connWriteSuccesses: connWriteScope.Counter("successes"), connWriteErrors: connWriteScope.Counter("errors"), } @@ -320,16 +374,23 @@ func newQueueMetrics(s tally.Scope) queueMetrics { type qbuf struct { b []protobuf.Buffer // buffer cursors - r uint32 - w uint32 + r uint32 + w uint32 + sizeBytes uint32 + maxSizeBytes uint32 } -func (q *qbuf) size() uint32 { +func (q *qbuf) sizeItems() uint32 { return q.w - q.r } +func (q *qbuf) getSizeBytes() uint32 { + return q.sizeBytes +} + func (q *qbuf) full() bool { - return q.size() == uint32(cap(q.b)) + return q.sizeItems() == uint32(cap(q.b)) || + (q.maxSizeBytes > 0 && q.sizeBytes >= q.maxSizeBytes) } func (q *qbuf) mask(idx uint32) uint32 { @@ -341,6 +402,7 @@ func (q *qbuf) push(buf protobuf.Buffer) { idx := q.mask(q.w) q.b[idx].Close() q.b[idx] = buf + q.sizeBytes += uint32(len(buf.Bytes())) } func (q *qbuf) shift() protobuf.Buffer { @@ -348,6 +410,7 @@ func (q *qbuf) shift() protobuf.Buffer { idx := q.mask(q.r) val := q.b[idx] q.b[idx] = protobuf.Buffer{} + q.sizeBytes -= uint32(len(val.Bytes())) return val } diff --git a/src/aggregator/client/queue_mock.go b/src/aggregator/client/queue_mock.go index 265f601785..45f15292db 100644 --- a/src/aggregator/client/queue_mock.go +++ b/src/aggregator/client/queue_mock.go @@ -108,3 +108,17 @@ func (mr *MockinstanceQueueMockRecorder) Size() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockinstanceQueue)(nil).Size)) } + +// SizeBytes mocks base method. +func (m *MockinstanceQueue) SizeBytes() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SizeBytes") + ret0, _ := ret[0].(int) + return ret0 +} + +// SizeBytes indicates an expected call of SizeBytes. +func (mr *MockinstanceQueueMockRecorder) SizeBytes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SizeBytes", reflect.TypeOf((*MockinstanceQueue)(nil).SizeBytes)) +} diff --git a/src/aggregator/client/queue_test.go b/src/aggregator/client/queue_test.go index 129e3197ac..17dffa6274 100644 --- a/src/aggregator/client/queue_test.go +++ b/src/aggregator/client/queue_test.go @@ -94,6 +94,142 @@ func TestInstanceQueueEnqueueQueueFullDropOldest(t *testing.T) { }, result) } +func TestInstanceQueueEnqueueQueueFullBytesDropCurrent(t *testing.T) { + opts := testOptions().SetQueueDropType(DropCurrent).SetInstanceMaxQueueSizeBytes(8) + queue := newInstanceQueue(testPlacementInstance, opts).(*queue) + + var result []byte + queue.writeFn = func(payload []byte) error { + result = payload + return nil + } + + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{1, 2, 3}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{4, 5, 6}))) + require.NoError(t, queue.Enqueue(testNewBuffer([]byte{7, 8, 9}))) + + require.Equal(t, errWriterQueueFull, queue.Enqueue(testNewBuffer([]byte{42}))) + + queue.Flush() + + require.EqualValues(t, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, result) +} + +func TestInstanceQueueEnqueueQueueFullBytesDropOldest(t *testing.T) { + tests := []struct { + name string + queueSizeBytes int + queueSizeItems int + toEnqueue []protobuf.Buffer + expected []byte + }{ + { + name: "last one removes all previous enqueued buffers", + queueSizeBytes: 5, + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{ + testNewBuffer([]byte{42}), + testNewBuffer([]byte{42, 43, 44}), + testNewBuffer([]byte{45, 46, 47}), + testNewBuffer([]byte{1, 2, 3, 4, 5}), + testNewBuffer([]byte{1}), + }, + expected: []byte{1}, + }, + { + name: "last two buffers remain", + queueSizeBytes: 5, + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{ + testNewBuffer([]byte{}), + testNewBuffer([]byte{1, 2, 3}), + testNewBuffer([]byte{42}), + testNewBuffer([]byte{}), + testNewBuffer([]byte{42, 43, 44}), + testNewBuffer([]byte{45, 46, 47}), + testNewBuffer([]byte{1}), + }, + expected: []byte{45, 46, 47, 1}, + }, + { + name: "zero length buffer enqueued", + queueSizeBytes: 5, + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{}, + expected: []byte{}, + }, + { + name: "big last buffer expands queue beyond limit", + queueSizeBytes: 5, + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{ + testNewBuffer([]byte{1, 2, 3}), + testNewBuffer([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}), + }, + expected: []byte{1, 2, 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0}, + }, + { + name: "last one dequeues all others", + queueSizeBytes: 5, + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{ + testNewBuffer([]byte{1}), + testNewBuffer([]byte{1}), + testNewBuffer([]byte{1}), + testNewBuffer([]byte{1}), + testNewBuffer([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}), + testNewBuffer([]byte{2}), + }, + expected: []byte{2}, + }, + { + name: "default queue sizes", + // default queueSizeBytes (0) no-limit + // default queueSizeItems (128) + toEnqueue: []protobuf.Buffer{ + testNewBuffer([]byte{}), + testNewBuffer([]byte{1, 2, 3}), + testNewBuffer([]byte{42}), + testNewBuffer([]byte{}), + testNewBuffer([]byte{42, 43, 44}), + testNewBuffer([]byte{45, 46, 47}), + testNewBuffer([]byte{1}), + }, + expected: []byte{1, 2, 3, 42, 42, 43, 44, 45, 46, 47, 1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + opts := testOptions() + if test.queueSizeBytes > 0 { + opts = opts.SetInstanceMaxQueueSizeBytes(test.queueSizeBytes) + } + if test.queueSizeItems > 0 { + opts = opts.SetInstanceQueueSize(test.queueSizeItems) + } + + queue := newInstanceQueue(testPlacementInstance, opts).(*queue) + + result := []byte{} + queue.writeFn = func(payload []byte) error { + result = payload + return nil + } + + for _, input := range test.toEnqueue { + require.NoError(t, queue.Enqueue(input), + "Enqueue failed for test %q", test.name) + } + + queue.Flush() + + require.EqualValues(t, test.expected, result, + "Enqueue expectation failed for test %q", test.name) + }) + } +} + func TestInstanceQueueEnqueueLargeBuffers(t *testing.T) { var ( opts = testOptions(). diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 8bc9466f41..36f37c4a70 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -23,6 +23,7 @@ package client import ( "errors" "sync" + "time" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/metrics/encoding" @@ -40,9 +41,31 @@ import ( ) var ( - errInstanceWriterClosed = errors.New("instance writer is closed") - errUnrecognizedMetricType = errors.New("unrecognized metric type") - errUnrecognizedPayloadType = errors.New("unrecognized payload type") + // ErrInstanceWriterClosed tracks write and flush attempts on closed writers. + ErrInstanceWriterClosed = errors.New("instance writer is closed") + + // ErrUnrecognizedMetricType tracks unrecognized metric types. + ErrUnrecognizedMetricType = errors.New("unrecognized metric type") + + // ErrUnrecognizedPayloadType tracks unrecognized payload types. + ErrUnrecognizedPayloadType = errors.New("unrecognized payload type") + + // ErrFlushInProgress is returned for a writer when it already has + // an ongoing flush operation. + ErrFlushInProgress = errors.New("flush is in progress") +) + +const ( + // When a Close() is invoked say during a placement change/expansion, + // we want to make sure we do a best-effort flush() of any data that + // is held up in the encoders. But when the Close() is invoked a flush() + // might already be in progress. So that we don't lose data accumulated + // between the last time a flush was invoked and this Close(), Close() + // will perform a busy-loop around flush() to make sure it gets to + // go one last time before terminating the writer. This param controls + // time duration between each successive poll of the isFlushActive + // flag. + _closeFlushSoakTimeMs = 100 ) type instanceWriter interface { @@ -70,8 +93,9 @@ type writer struct { newLockedEncoderFn newLockedEncoderFn maxTimerBatchSize int maxBatchSize int - sync.RWMutex - closed bool + encoderByShardMtx sync.RWMutex + closed atomic.Bool + isFlushActive atomic.Bool } func newInstanceWriter(instance placement.Instance, opts Options) instanceWriter { @@ -89,70 +113,85 @@ func newInstanceWriter(instance placement.Instance, opts Options) instanceWriter queue: newInstanceQueue(instance, queueOpts), encodersByShard: make(map[uint32]*lockedEncoder), } + + w.isFlushActive.Store(false) w.newLockedEncoderFn = newLockedEncoder return w } +// Write takes the payload and writes it to the per-shard encoder. func (w *writer) Write(shard uint32, payload payloadUnion) error { - w.RLock() - if w.closed { - w.RUnlock() - return errInstanceWriterClosed + if w.closed.Load() { + return ErrInstanceWriterClosed } + w.encoderByShardMtx.RLock() encoder, exists := w.encodersByShard[shard] if exists { err := w.encodeWithLock(encoder, payload) - w.RUnlock() + w.encoderByShardMtx.RUnlock() return err } - w.RUnlock() + w.encoderByShardMtx.RUnlock() - w.Lock() - if w.closed { - w.Unlock() - return errInstanceWriterClosed + w.encoderByShardMtx.Lock() + if w.closed.Load() { + w.encoderByShardMtx.Unlock() + return ErrInstanceWriterClosed } encoder, exists = w.encodersByShard[shard] if exists { err := w.encodeWithLock(encoder, payload) - w.Unlock() + w.encoderByShardMtx.Unlock() return err } encoder = w.newLockedEncoderFn(w.encoderOpts) w.encodersByShard[shard] = encoder err := w.encodeWithLock(encoder, payload) - w.Unlock() + w.encoderByShardMtx.Unlock() return err } +// Flush loops through all encoders in encodersByShard, grabs the encoded +// payload and writes it to the queue. From there the queue is drained +// towards the destination via the transport in a blocking manner func (w *writer) Flush() error { - w.RLock() - if w.closed { - w.RUnlock() - return errInstanceWriterClosed + if w.closed.Load() { + return ErrInstanceWriterClosed } - err := w.flushWithLock() - w.RUnlock() - if err != nil { + if !w.isFlushActive.CAS(false, true) { + // Flush is already active, bail + w.metrics.skippedFlushes.Inc(1) + return ErrFlushInProgress + } + + defer w.isFlushActive.Store(false) + + if err := w.flush(); err != nil { w.metrics.flushErrors.Inc(1) return err } + return nil } func (w *writer) Close() error { - w.Lock() - defer w.Unlock() + if !w.closed.CAS(false, true) { + return ErrInstanceWriterClosed + } - if w.closed { - return errInstanceWriterClosed + for !w.isFlushActive.CAS(false, true) { + // Busy-loop till we get another pass at flush + // to avoid losing any buffered up data. + time.Sleep(_closeFlushSoakTimeMs * time.Millisecond) } - w.closed = true - if err := w.flushWithLock(); err != nil { + defer w.isFlushActive.Store(false) + + if err := w.flush(); err != nil { w.metrics.flushErrors.Inc(1) } + return w.queue.Close() } @@ -160,6 +199,10 @@ func (w *writer) QueueSize() int { return w.queue.Size() } +func (w *writer) QueueSizeBytes() int { + return w.queue.SizeBytes() +} + func (w *writer) encodeWithLock( encoder *lockedEncoder, payload payloadUnion, @@ -184,7 +227,7 @@ func (w *writer) encodeWithLock( case passthroughType: err = w.encodePassthroughWithLock(encoder, payload.passthrough.metric, payload.passthrough.storagePolicy) default: - err = errUnrecognizedPayloadType + err = ErrUnrecognizedPayloadType } if err != nil { @@ -288,7 +331,7 @@ func (w *writer) encodeUntimedWithLock( default: } - return errUnrecognizedMetricType + return ErrUnrecognizedMetricType } func (w *writer) encodeForwardedWithLock( @@ -351,8 +394,10 @@ func (w *writer) encodePassthroughWithLock( return encoder.EncodeMessage(msg) } -func (w *writer) flushWithLock() error { +func (w *writer) flush() error { multiErr := xerrors.NewMultiError() + + w.encoderByShardMtx.RLock() for _, encoder := range w.encodersByShard { encoder.Lock() if encoder.Len() == 0 { @@ -365,6 +410,7 @@ func (w *writer) flushWithLock() error { multiErr = multiErr.Add(err) } } + w.encoderByShardMtx.RUnlock() w.queue.Flush() @@ -390,6 +436,7 @@ type writerMetrics struct { encodeErrors tally.Counter enqueueErrors tally.Counter flushErrors tally.Counter + skippedFlushes tally.Counter } func newWriterMetrics(s tally.Scope) writerMetrics { @@ -398,6 +445,7 @@ func newWriterMetrics(s tally.Scope) writerMetrics { encodeErrors: s.Tagged(map[string]string{actionTag: "encode-error"}).Counter(buffersMetric), enqueueErrors: s.Tagged(map[string]string{actionTag: "enqueue-error"}).Counter(buffersMetric), flushErrors: s.Tagged(map[string]string{actionTag: "flush-error"}).Counter(buffersMetric), + skippedFlushes: s.Tagged(map[string]string{actionTag: "skipped-flush"}).Counter(buffersMetric), } } @@ -414,7 +462,6 @@ func newLockedEncoder(encoderOpts protobuf.UnaggregatedOptions) *lockedEncoder { type refCountedWriter struct { instanceWriter refCount - dirty atomic.Bool } func newRefCountedWriter(instance placement.Instance, opts Options) *refCountedWriter { @@ -424,8 +471,10 @@ func newRefCountedWriter(instance placement.Instance, opts Options) *refCountedW } func (rcWriter *refCountedWriter) Close() { - // NB: closing the writer needs to be done asynchronously because it may - // be called by writer manager while holding a lock that blocks any writes - // from proceeding. - go rcWriter.instanceWriter.Close() // nolint: errcheck + // The following Close() used to be called asyncly + // since Close() grabbed a lock before but it does not + // anymore so we can simply call it in the same + // context. This is called when the writeMgr is + // shutting down. + rcWriter.instanceWriter.Close() // nolint: errcheck } diff --git a/src/aggregator/client/writer_benchmark_test.go b/src/aggregator/client/writer_benchmark_test.go index 50c6a3ef43..94587c44c6 100644 --- a/src/aggregator/client/writer_benchmark_test.go +++ b/src/aggregator/client/writer_benchmark_test.go @@ -149,6 +149,7 @@ type testNoOpQueue struct{} func (q testNoOpQueue) Enqueue(protobuf.Buffer) error { return nil } func (q testNoOpQueue) Close() error { return nil } func (q testNoOpQueue) Size() int { return 0 } +func (q testNoOpQueue) SizeBytes() int { return 0 } func (q testNoOpQueue) Flush() {} type testSerialWriter struct { diff --git a/src/aggregator/client/writer_mgr.go b/src/aggregator/client/writer_mgr.go index 49710201e0..46867974ef 100644 --- a/src/aggregator/client/writer_mgr.go +++ b/src/aggregator/client/writer_mgr.go @@ -38,12 +38,6 @@ var ( errInstanceWriterManagerClosed = errors.New("instance writer manager closed") ) -const ( - _queueMetricReportInterval = 10 * time.Second - _queueMetricBuckets = 8 - _queueMetricBucketStart = 64 -) - // instanceWriterManager manages instance writers. type instanceWriterManager interface { // AddInstances adds instances. @@ -67,23 +61,11 @@ type instanceWriterManager interface { } type writerManagerMetrics struct { - instancesAdded tally.Counter - instancesRemoved tally.Counter - queueLen tally.Histogram - dirtyWritersPercent tally.Histogram + instancesAdded tally.Counter + instancesRemoved tally.Counter } func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { - buckets := append( - tally.ValueBuckets{0}, - tally.MustMakeExponentialValueBuckets(_queueMetricBucketStart, 2, _queueMetricBuckets)..., - ) - - percentBuckets := append( - tally.ValueBuckets{0}, - tally.MustMakeLinearValueBuckets(5, 5, 20)..., - ) - return writerManagerMetrics{ instancesAdded: scope.Tagged(map[string]string{ "action": "add", @@ -91,8 +73,6 @@ func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { instancesRemoved: scope.Tagged(map[string]string{ "action": "remove", }).Counter("instances"), - queueLen: scope.Histogram("queue-length", buckets), - dirtyWritersPercent: scope.Histogram("dirty-writers-percent", percentBuckets), } } @@ -127,9 +107,6 @@ func newInstanceWriterManager(opts Options) (instanceWriterManager, error) { wm.pool = pool wm.pool.Init() - wm.wg.Add(1) - go wm.reportMetricsLoop() - if opts.ForceFlushEvery() > 0 { wm.wg.Add(1) go func() { @@ -200,7 +177,6 @@ func (mgr *writerManager) Write( mgr.RUnlock() return fmt.Errorf("writer for instance %s is not found", id) } - writer.dirty.Store(true) err := writer.Write(shardID, payload) mgr.RUnlock() @@ -221,30 +197,18 @@ func (mgr *writerManager) Flush() error { wg sync.WaitGroup ) - numDirty := 0 for _, w := range mgr.writers { - if !w.dirty.Load() { - continue - } - numDirty++ w := w wg.Add(1) mgr.pool.Go(func() { defer wg.Done() - w.dirty.CAS(true, false) if err := w.Flush(); err != nil { errCh <- err } }) } - percentInUse := 0.0 - if numDirty > 0 && len(mgr.writers) > 0 { - percentInUse = 100.0 * (float64(numDirty) / float64(len(mgr.writers))) - } - mgr.metrics.dirtyWritersPercent.RecordValue(percentInUse) - go func() { multiErr := xerrors.NewMultiError() for err := range errCh { @@ -279,31 +243,6 @@ func (mgr *writerManager) Close() error { return nil } -func (mgr *writerManager) reportMetricsLoop() { - defer mgr.wg.Done() - - ticker := time.NewTicker(_queueMetricReportInterval) - defer ticker.Stop() - - for { - select { - case <-mgr.doneCh: - return - case <-ticker.C: - mgr.reportMetrics() - } - } -} - -func (mgr *writerManager) reportMetrics() { - mgr.RLock() - defer mgr.RUnlock() - - for _, writer := range mgr.writers { - mgr.metrics.queueLen.RecordValue(float64(writer.QueueSize())) - } -} - func (mgr *writerManager) flushLoop(d time.Duration) { defer mgr.wg.Done() diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index c5a46fd55f..498f63e036 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -21,19 +21,25 @@ package client import ( + "context" "errors" + "net" + "strconv" "strings" + "sync" "testing" "time" "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/metrics/metric" + "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/x/clock" + xtest "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" - - "github.com/m3db/m3/src/cluster/placement" - "github.com/m3db/m3/src/x/clock" ) var ( @@ -65,6 +71,144 @@ func TestWriterManagerAddInstancesSingleRef(t *testing.T) { require.Equal(t, int32(2), w.refCount.n) } +// TestWriterManagerMultipleWriters tries to recreate the scenario that +// has multiple writers out of which one is slow. We have had multiple +// incidents where one slow writer blocks all the other writers in the +// aggregator client and cascades into a variety of issues including +// dropped metrics, OOM etc. +// How does this test mimic the slow writer? +// It first creates a custom dialer for the TcpClient and +// and makes the writeFn block on the context.Context. An instance created +// with this overridden connection options struct creates a slow writer. +// After the writer/instance is created, we re-override the write with +// another custom dialer and replace the write with non-blocking +// instructions. These writers are called normal writers. Instances added after +// updating the opts with the re-overridden dialer creates normal writers. +// The test creates several normal writers and one slow writer. +// First it writes and flushes to the slow writer and get it to block. +// It then begins a loop of writing one payload to every writer: slow and normal. +// We initiate every write-loop in a separate goroutine. +// We keep track of completed writes of the normal clients by way of the +// counter writesCompleted. +// As soon as all write-loops are done, meaning that all writers have +// finished writing all that had to write, the context is canceled. This unblocks +// the slow writer and then we perform the necessary validations. +func TestWriterManagerMultipleWriters(t *testing.T) { + ctrl := xtest.NewController(t) + waitForSlowWriter := make(chan bool) + + var writesCompleted atomic.Int32 + var flushInProgressCnt atomic.Int32 + + const ( + numIterations = 10 + numNormalInstances = 16 + ) + + ctx := context.Background() + ctx, cancelFn := context.WithCancel(ctx) + defer cancelFn() + + slowMockConn := NewMockConn(ctrl) + slowMockConn.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) (n int, err error) { + // Block till all normal writers have finished writing + // all write loops below + waitForSlowWriter <- true + <-ctx.Done() + return len(b), nil + }).AnyTimes() + slowMockConn.EXPECT().SetWriteDeadline(gomock.Any()).AnyTimes() + + slowWriterDialerFn := func(c context.Context, network string, address string) (net.Conn, error) { + return slowMockConn, nil + } + + normalMockConn := NewMockConn(ctrl) + normalMockConn.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) (n int, err error) { + writesCompleted.Inc() + return len(b), nil + }).AnyTimes() + normalMockConn.EXPECT().SetWriteDeadline(gomock.Any()).AnyTimes() + + normalWriterDialerFn := func(ctx context.Context, network string, address string) (net.Conn, error) { + return normalMockConn, nil + } + + // Override opts for slow writer + slowConnOpts := testConnectionOptions().SetContextDialer(slowWriterDialerFn) + + opts := testOptions().SetConnectionOptions(slowConnOpts).SetFlushWorkerCount(256) + mgr := mustMakeInstanceWriterManager(opts) + + slowInstance := placement.NewInstance(). + SetID("slowTestID"). + SetEndpoint("SlowTestEp") + + // Add slow writer/instance + require.NoError(t, mgr.AddInstances([]placement.Instance{slowInstance})) + + // Re-override opts for normal writers + mgr.opts = mgr.opts.SetConnectionOptions(testConnectionOptions().SetContextDialer(normalWriterDialerFn)) + + instances := []placement.Instance{} + for i := 0; i < numNormalInstances; i++ { + instances = append(instances, placement.NewInstance(). + SetID("testID"+strconv.Itoa(i)). + SetEndpoint("testEp"+strconv.Itoa(i)), + ) + } + + // Create normal writers + require.NoError(t, mgr.AddInstances(instances)) + + // start slow writer asynchronously and wait till it blocks + err := mgr.Write(slowInstance, 0, testCounterPayloadUnion(0)) + require.NoError(t, err) + + go mgr.Flush() //nolint:errcheck + <-waitForSlowWriter + + // Now the following write loop which contains writes over + // the slow writer and normal writers should be non-blocking. + var wg sync.WaitGroup + for i := 0; i < numIterations; i++ { + wg.Add(1) + go func(iterationID int64) { + defer wg.Done() + + payload := testCounterPayloadUnion(iterationID) + // The following Write() to the slow instance should increment + // flushInProgressCnt during Flush() + err := mgr.Write(slowInstance, 0, payload) + require.NoError(t, err) + + for _, instance := range instances { + err := mgr.Write(instance, 0, payload) + require.NoError(t, err) + } + + err = mgr.Flush() + if err != nil { + require.True(t, strings.Contains(err.Error(), ErrFlushInProgress.Error())) + flushInProgressCnt.Inc() + } + }(int64(i)) + } + + wg.Wait() + + // Now that all normal writers have finished writing + // cancel the slow writer and compare write counts + // to validate. + cancelFn() + + // Unfortunately we cannot compare the write counts because due to the + // unpredictable nature of how the goroutines race against each other + // some normal writes could be combined into one flush. Thus making the + // actual count less than the expected count. + require.Equal(t, int32(numIterations), flushInProgressCnt.Load()) +} + func TestWriterManagerRemoveInstancesClosed(t *testing.T) { mgr := mustMakeInstanceWriterManager(testOptions()) mgr.Lock() @@ -90,7 +234,7 @@ func TestWriterManagerRemoveInstancesSuccess(t *testing.T) { mgr.Lock() require.Equal(t, 1, len(mgr.writers)) w := mgr.writers[testPlacementInstance.ID()].instanceWriter.(*writer) - require.False(t, w.closed) + require.False(t, w.closed.Load()) mgr.Unlock() // Remove the instance list again and assert the writer is now removed. @@ -101,9 +245,7 @@ func TestWriterManagerRemoveInstancesSuccess(t *testing.T) { require.NoError(t, mgr.RemoveInstances(toRemove)) require.Equal(t, 0, len(mgr.writers)) require.True(t, clock.WaitUntil(func() bool { - w.Lock() - defer w.Unlock() - return w.closed + return w.closed.Load() }, 3*time.Second)) } @@ -303,11 +445,7 @@ func TestWriterManagerCloseSuccess(t *testing.T) { require.True(t, clock.WaitUntil(func() bool { for _, w := range mgr.writers { wr := w.instanceWriter.(*writer) - wr.Lock() - closed := wr.closed - wr.Unlock() - - if !closed { + if !wr.closed.Load() { return false } } @@ -323,3 +461,19 @@ func mustMakeInstanceWriterManager(opts Options) *writerManager { return wm.(*writerManager) } + +func testCounterPayloadUnion(val int64) payloadUnion { + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: unaggregated.MetricUnion{ + Type: metric.CounterType, + ID: []byte("foo"), + CounterVal: val, + }, + metadatas: testStagedMetadatas, + }, + } + + return payload +} diff --git a/src/aggregator/client/writer_test.go b/src/aggregator/client/writer_test.go index 8c6ca9bef4..67065b256e 100644 --- a/src/aggregator/client/writer_test.go +++ b/src/aggregator/client/writer_test.go @@ -22,10 +22,12 @@ package client import ( "bytes" + "context" "errors" "fmt" "io" "math" + "net" "sort" "strings" "sync" @@ -43,6 +45,7 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) @@ -56,8 +59,8 @@ func TestWriterWriteClosed(t *testing.T) { }, } w := newInstanceWriter(testPlacementInstance, testOptions()).(*writer) - w.closed = true - require.Equal(t, errInstanceWriterClosed, w.Write(0, payload)) + w.closed.Store(true) + require.Equal(t, ErrInstanceWriterClosed, w.Write(0, payload)) } func TestWriterWriteUntimedCounterEncodeError(t *testing.T) { @@ -90,6 +93,68 @@ func TestWriterWriteUntimedCounterEncodeError(t *testing.T) { require.Equal(t, errTestEncodeMetric, w.Write(0, payload)) } +// TestWriterFlushInProgress tests that a writer does not +// have multiple flushes in progress at the same time. It +// can only ever have one flush active at any given point in +// time. +func TestWriterFlushInProgress(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + blockCh := make(chan bool) + ctx := context.Background() + ctx, cancelFn := context.WithTimeout(ctx, 2*time.Second) + defer cancelFn() + + slowMockConn := NewMockConn(ctrl) + slowMockConn.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) (n int, err error) { + // notify that the slow writer is about to block + blockCh <- true + <-ctx.Done() + return len(b), nil + }) + slowMockConn.EXPECT().SetWriteDeadline(gomock.Any()) + + slowWriterDialerFn := func(c context.Context, network string, address string) (net.Conn, error) { + return slowMockConn, nil + } + + slowConnOpts := NewConnectionOptions().SetContextDialer(slowWriterDialerFn) + opts := testOptions().SetConnectionOptions(slowConnOpts) + w := newInstanceWriter(testPlacementInstance, opts).(*writer) + + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: testCounter, + metadatas: testStagedMetadatas, + }, + } + + require.Equal(t, nil, w.Write(0, payload)) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + w.Flush() //nolint:errcheck + }() + + // wait for previous flush to block + <-blockCh + + // now any newly initiated Flush should fail with err "in-progress" + // Note that if we didn't wait on blockCh above then this following + // Flush() can finish before the previous flush which defeats the + // purpose of the test :-) + require.EqualError(t, w.Flush(), ErrFlushInProgress.Error()) + + // unblock the slow writer + cancelFn() + + wg.Wait() +} + func TestWriterWriteUntimedCounterEncoderExists(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -151,7 +216,89 @@ func TestWriterWriteUntimedCounterEncoderDoesNotExist(t *testing.T) { require.NoError(t, w.Write(0, payload)) } -func TestWriterWriteUntimedCounterWithFlushingZeroSizeBefore(t *testing.T) { +func TestWriterEncoderSizeLimitItems(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := testOptions().SetMaxBatchSize(1).SetInstanceQueueSize(1) + w := newInstanceWriter(testPlacementInstance, opts).(*writer) + + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: testCounter, + metadatas: testStagedMetadatas, + }, + } + + // Perform a write which should enqueue the buf + require.NoError(t, w.Write(0, payload)) + sizeBefore := w.QueueSize() + + i := 0 + numWrites := 10 + for i < numWrites { + require.NoError(t, w.Write(0, payload)) + i++ + } + + // All writes must respect the queue size of 1 + // meaning that after numWrites attempts, + // the queueSize should still be 1 and the + // enqueued len in the encoders must be 0 + // since all buffers would have been relinquished + // on write since each write exceeded maxBatchSize of 1 + sizeAfter := w.QueueSize() + + enc, exists := w.encodersByShard[0] + require.True(t, exists) + require.NotNil(t, enc) + require.Equal(t, 1, len(w.encodersByShard)) + require.Equal(t, sizeBefore, sizeAfter) + require.Equal(t, 0, w.encodersByShard[0].Len()) +} + +func TestWriterEncoderSizeLimitBytes(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // The encoded length of the payload below is 155 bytes. + // Set the MaxBatchSize to 200 so that it is relinquished + // after we write the payload twice. + // The queue is configured with a size limit of 300 bytes + // Therefore it can hold one such relinquished buffer of total + // size (155+155) 310. Any more attempts to write should + // respect the size limit and the queue size should remain 310 + // Whereas the encoders should be fully drained for even number + // of writes. + opts := testOptions().SetMaxBatchSize(1).SetInstanceMaxQueueSizeBytes(300) + w := newInstanceWriter(testPlacementInstance, opts).(*writer) + + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: testCounter, + metadatas: testStagedMetadatas, + }, + } + + i := 0 + numWrites := 10 + for i < numWrites { + require.NoError(t, w.Write(0, payload)) + i++ + } + + enc, exists := w.encodersByShard[0] + require.True(t, exists) + require.NotNil(t, enc) + require.Equal(t, 1, len(w.encodersByShard)) + require.Equal(t, 310, w.QueueSizeBytes()) + require.Equal(t, 0, w.encodersByShard[0].Len()) +} + +//nolint:dupl +func TestWriterWriteUntimedCounterWithWriteZeroSizeBefore(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -179,6 +326,7 @@ func TestWriterWriteUntimedCounterWithFlushingZeroSizeBefore(t *testing.T) { enqueuedBuf = buf return nil }) + w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { @@ -201,7 +349,8 @@ func TestWriterWriteUntimedCounterWithFlushingZeroSizeBefore(t *testing.T) { require.Equal(t, []byte{1, 2, 3, 4, 5, 6, 7}, enqueuedBuf.Bytes()) } -func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { +//nolint:dupl +func TestWriterWriteUntimedCounterWithWritePositiveSizeBefore(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -230,6 +379,7 @@ func TestWriterWriteUntimedCounterWithFlushingPositiveSizeBefore(t *testing.T) { return nil }) w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) + w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -505,6 +655,7 @@ func TestWriterWriteUntimedBatchTimerEnqueueError(t *testing.T) { errTestEnqueue := errors.New("test enqueue error") queue := NewMockinstanceQueue(ctrl) queue.EXPECT().Enqueue(gomock.Any()).Return(errTestEnqueue) + opts := testOptions(). SetMaxTimerBatchSize(1). SetMaxBatchSize(1) @@ -581,6 +732,7 @@ func TestWriterWriteForwardedWithFlushingZeroSizeBefore(t *testing.T) { return nil }) w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) + w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -631,6 +783,7 @@ func TestWriterWriteForwardedWithFlushingPositiveSizeBefore(t *testing.T) { return nil }) w := newInstanceWriter(testPlacementInstance, testOptions().SetMaxBatchSize(3)).(*writer) + w.queue = queue w.newLockedEncoderFn = func(protobuf.UnaggregatedOptions) *lockedEncoder { return &lockedEncoder{UnaggregatedEncoder: encoder} @@ -707,8 +860,8 @@ func TestWriterWriteForwardedEnqueueError(t *testing.T) { func TestWriterFlushClosed(t *testing.T) { w := newInstanceWriter(testPlacementInstance, testOptions()).(*writer) - w.closed = true - require.Equal(t, errInstanceWriterClosed, w.Flush()) + w.closed.Store(true) + require.Equal(t, ErrInstanceWriterClosed, w.Flush()) } func TestWriterFlushPartialError(t *testing.T) { @@ -765,8 +918,8 @@ func TestWriterFlushPartialError(t *testing.T) { func TestWriterCloseAlreadyClosed(t *testing.T) { w := newInstanceWriter(testPlacementInstance, testOptions()).(*writer) - w.closed = true - require.Equal(t, errInstanceWriterClosed, w.Close()) + w.closed.Store(true) + require.Equal(t, ErrInstanceWriterClosed, w.Close()) } func TestWriterCloseSuccess(t *testing.T) { @@ -774,6 +927,84 @@ func TestWriterCloseSuccess(t *testing.T) { require.NoError(t, w.Close()) } +//nolint:dupl +func TestWriterCloseFlushInProgress(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + numWrites := 0 + + blockCh := make(chan bool) + ctx := context.Background() + ctx, cancelFn := context.WithTimeout(ctx, 2*time.Second) + defer cancelFn() + + slowMockConn := NewMockConn(ctrl) + slowMockConn.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) (n int, err error) { + // notify that the slow writer is about to block + blockCh <- true + <-ctx.Done() + numWrites++ + return len(b), nil + }).Times(2) + slowMockConn.EXPECT().SetWriteDeadline(gomock.Any()).Times(2) + + slowWriterDialerFn := func(c context.Context, network string, address string) (net.Conn, error) { + return slowMockConn, nil + } + + slowConnOpts := NewConnectionOptions().SetContextDialer(slowWriterDialerFn) + opts := testOptions().SetConnectionOptions(slowConnOpts) + w := newInstanceWriter(testPlacementInstance, opts).(*writer) + + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: testCounter, + metadatas: testStagedMetadatas, + }, + } + + require.Equal(t, nil, w.Write(0, payload)) + + var wg sync.WaitGroup + + // Initiate first flush + wg.Add(1) + go func() { + defer wg.Done() + w.Flush() //nolint:errcheck + }() + + // Wait for the first flush to block + <-blockCh + + // Now buffer up some additional data + require.Equal(t, nil, w.Write(0, payload)) + + wg.Add(1) + go func() { + defer wg.Done() + w.Close() //nolint:errcheck + }() + + // Wait for Close()-flush to busy-loop behind first flush + // 200ms is enough time for the Close()-flush to + // start busy-looping since the _closeFlushSoakTimeMs is + // 100ms + time.Sleep(200 * time.Millisecond) + + // Unblock the slow writer + cancelFn() + + // Unblock the Close()-flush + <-blockCh + + // Wait for first flush and Close() to complete + wg.Wait() + + assert.Equal(t, 2, numWrites) +} + func TestWriterConcurrentWriteStress(t *testing.T) { params := []struct { maxInputBatchSize int @@ -1090,13 +1321,11 @@ func TestRefCountedWriter(t *testing.T) { w := newRefCountedWriter(testPlacementInstance, opts) w.IncRef() - require.False(t, w.instanceWriter.(*writer).closed) + require.False(t, w.instanceWriter.(*writer).closed.Load()) w.DecRef() require.True(t, clock.WaitUntil(func() bool { wr := w.instanceWriter.(*writer) - wr.Lock() - defer wr.Unlock() - return wr.closed + return wr.closed.Load() }, 3*time.Second)) } diff --git a/src/aggregator/integration/client.go b/src/aggregator/integration/client.go index 6b4d595806..23e00fe97e 100644 --- a/src/aggregator/integration/client.go +++ b/src/aggregator/integration/client.go @@ -21,6 +21,7 @@ package integration import ( + "errors" "fmt" aggclient "github.com/m3db/m3/src/aggregator/client" @@ -29,6 +30,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/policy" + xerrors "github.com/m3db/m3/src/x/errors" ) type client struct { @@ -85,7 +87,15 @@ func (c *client) writePassthroughMetricWithMetadata( } func (c *client) flush() error { - return c.aggClient.Flush() + if err := c.aggClient.Flush(); err != nil { + for _, e := range xerrors.GetErrorsFromMultiError(err) { + if !errors.Is(e, aggclient.ErrFlushInProgress) { + return e + } + } + } + + return nil } func (c *client) close() error { diff --git a/src/x/errors/errors.go b/src/x/errors/errors.go index ad6c2381c8..14f01538bd 100644 --- a/src/x/errors/errors.go +++ b/src/x/errors/errors.go @@ -428,3 +428,21 @@ func (e Errors) Error() string { buf.WriteString("]") return buf.String() } + +// GetErrorsFromMultiError returns all errors in the multierror +// as an array of type "error". In case err is not of type +// MultiError then it simply returns an array with err in it. +// In case err is nil then the function will return nil as well. +func GetErrorsFromMultiError(err error) []error { + if err == nil { + return nil + } + + merr, ok := GetInnerMultiError(err) + if ok { + // is a MultiError + return merr.Errors() + } + + return []error{err} +} diff --git a/src/x/errors/errors_test.go b/src/x/errors/errors_test.go index 8a9b7d7540..d3fbc64bb0 100644 --- a/src/x/errors/errors_test.go +++ b/src/x/errors/errors_test.go @@ -130,3 +130,52 @@ func TestErrorsIsAnErrorAndFormatsErrors(t *testing.T) { assert.Equal(t, "[, "+ "]", errs.Error()) } + +func TestGetErrorsFromMultiError(t *testing.T) { + tests := []struct { + name string + createMultiErrorFn func() error + expectedNumErrors int + }{ + { + name: "standard MultiError with multiple errors", + createMultiErrorFn: func() error { + err := NewMultiError() + for _, errMsg := range []string{"foo", "bar", "baz"} { + err = err.Add(errors.New(errMsg)) + } + return err + }, + expectedNumErrors: 3, + }, + { + name: "empty MultiError", + createMultiErrorFn: func() error { + return NewMultiError() + }, + expectedNumErrors: 0, + }, + { + name: "nil MultiError", + createMultiErrorFn: func() error { + return nil + }, + expectedNumErrors: 0, + }, + { + name: "not a MultiError", + createMultiErrorFn: func() error { + return errors.New("random-error") + }, + expectedNumErrors: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.createMultiErrorFn() + assert.Equal(t, test.expectedNumErrors, len(GetErrorsFromMultiError(err)), + "%q failed", test.name) + }) + } +}