Skip to content

Commit

Permalink
allow non-blocking flushes across writers (#4132)
Browse files Browse the repository at this point in the history
added queue size bytes threshold

perform queue report in flush to avoid locking queue

simplified MultipleWriters UnitTest

linter

fix integration test

add/fix flush metrics

Update src/aggregator/client/writer.go
Co-authored-by: Vytenis Darulis <[email protected]>

Close() busy loop to clear out unsent data

review comments

change default queue size bytes to unlimited

recify multierror handling in integration test

fix linter

fix integration test
  • Loading branch information
shaan420 authored Sep 22, 2022
1 parent 42e0064 commit 4167b43
Show file tree
Hide file tree
Showing 14 changed files with 843 additions and 149 deletions.
4 changes: 4 additions & 0 deletions src/aggregator/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Configuration struct {
MaxBatchSize int `yaml:"maxBatchSize"`
MaxTimerBatchSize int `yaml:"maxTimerBatchSize"`
QueueSize int `yaml:"queueSize"`
QueueSizeBytes int `yaml:"queueSizeBytes"`
QueueDropType *DropType `yaml:"queueDropType"`
Connection ConnectionConfiguration `yaml:"connection"`
}
Expand Down Expand Up @@ -191,6 +192,9 @@ func (c *Configuration) newClientOptions(
if c.QueueSize != 0 {
opts = opts.SetInstanceQueueSize(c.QueueSize)
}
if c.QueueSizeBytes != 0 {
opts = opts.SetInstanceMaxQueueSizeBytes(c.QueueSizeBytes)
}
if c.QueueDropType != nil {
opts = opts.SetQueueDropType(*c.QueueDropType)
}
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func newConnection(addr string, opts ConnectionOptions) *connection {
),
metrics: newConnectionMetrics(opts.InstrumentOptions().MetricsScope()),
}

c.connectWithLockFn = c.connectWithLock
c.writeWithLockFn = c.writeWithLock

return c
}

Expand Down
28 changes: 28 additions & 0 deletions src/aggregator/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const (
// before it must wait for an existing batch to be flushed to an instance.
defaultInstanceQueueSize = 128

// defaultInstanceMaxQueueSizeBytes determines how many bytes across all payloads
// can be buffered in the queue before the eviction policy kicks in (policy is
// specified in DropType)
// 0 = no-limit (size-based limiting disabled by default)
defaultInstanceMaxQueueSizeBytes = 0

// By default traffic is cut over to shards 10 minutes before the designated
// cutover time in case there are issues with the instances owning the shards.
defaultShardCutoverWarmupDuration = 10 * time.Minute
Expand Down Expand Up @@ -212,6 +218,16 @@ type Options interface {
// InstanceQueueSize returns the instance queue size.
InstanceQueueSize() int

// SetInstanceQueueSize sets the instance max
// queue size threshold in bytes across all items in the queue.
SetInstanceMaxQueueSizeBytes(value int) Options

// InstanceMaxQueueSizeBytes returns the instance max
// queue size threshold in bytes across all items in the queue
// after which the eviction policy (dictated by DropType)
// should be triggered.
InstanceMaxQueueSizeBytes() int

// SetQueueDropType sets the strategy for which metrics should metrics should be dropped when
// the queue is full.
SetQueueDropType(value DropType) Options
Expand Down Expand Up @@ -247,6 +263,7 @@ type options struct {
forceFlushEvery time.Duration
maxTimerBatchSize int
instanceQueueSize int
instanceMaxQueueSizeBytes int
dropType DropType
maxBatchSize int
flushWorkerCount int
Expand All @@ -267,6 +284,7 @@ func NewOptions() Options {
flushWorkerCount: defaultFlushWorkerCount,
maxTimerBatchSize: defaultMaxTimerBatchSize,
instanceQueueSize: defaultInstanceQueueSize,
instanceMaxQueueSizeBytes: defaultInstanceMaxQueueSizeBytes,
dropType: defaultDropType,
maxBatchSize: defaultMaxBatchSize,
rwOpts: xio.NewOptions(),
Expand Down Expand Up @@ -436,6 +454,16 @@ func (o *options) InstanceQueueSize() int {
return o.instanceQueueSize
}

func (o *options) SetInstanceMaxQueueSizeBytes(value int) Options {
opts := *o
opts.instanceMaxQueueSizeBytes = value
return &opts
}

func (o *options) InstanceMaxQueueSizeBytes() int {
return o.instanceMaxQueueSizeBytes
}

func (o *options) SetQueueDropType(value DropType) Options {
opts := *o
opts.dropType = value
Expand Down
107 changes: 85 additions & 22 deletions src/aggregator/client/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ import (
)

const (
_queueMinWriteBufSize = 65536
_queueMaxWriteBufSize = 8 * _queueMinWriteBufSize
_queueMinWriteBufSize = 65536
_queueMaxWriteBufSize = 8 * _queueMinWriteBufSize
_queueFlushBytesMetricBuckets = 16
_queueFlushBytesMetricBucketStart = 1024
_queueFlushItemsMetricBuckets = 8
_queueFlushItemsMetricBucketStart = 16
_queueBufSizesMetricBuckets = 16
_queueBufSizesMetricBucketStart = 128
)

var (
Expand Down Expand Up @@ -111,6 +117,9 @@ type instanceQueue interface {
// Size returns the number of items in the queue.
Size() int

// SizeBytes returns the total bytes held up in the queue.
SizeBytes() int

// Close closes the queue, it blocks until the queue is drained.
Close() error

Expand All @@ -129,7 +138,7 @@ type queue struct {
buf qbuf
dropType DropType
closed atomic.Bool
mtx sync.Mutex
bufMtx sync.Mutex
}

func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
Expand All @@ -140,9 +149,10 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
connOpts = opts.ConnectionOptions().
SetInstrumentOptions(connInstrumentOpts).
SetRWOptions(opts.RWOptions())
conn = newConnection(instance.Endpoint(), connOpts)
iOpts = opts.InstrumentOptions()
queueSize = opts.InstanceQueueSize()
conn = newConnection(instance.Endpoint(), connOpts)
iOpts = opts.InstrumentOptions()
queueSize = opts.InstanceQueueSize()
maxQueueSizeBytes = opts.InstanceMaxQueueSizeBytes()
)

// Round up queue size to power of 2.
Expand All @@ -157,7 +167,8 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
instance: instance,
conn: conn,
buf: qbuf{
b: make([]protobuf.Buffer, int(qsize)),
b: make([]protobuf.Buffer, int(qsize)),
maxSizeBytes: uint32(maxQueueSizeBytes),
},
}
q.writeFn = q.conn.Write
Expand All @@ -171,14 +182,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error {
return errInstanceQueueClosed
}

q.bufMtx.Lock()
defer q.bufMtx.Unlock()

if len(buf.Bytes()) == 0 {
return nil
}

q.mtx.Lock()
defer q.mtx.Unlock()

if full := q.buf.full(); full {
full := q.buf.full()
for full {
switch q.dropType {
case DropCurrent:
// Close the current buffer so it's resources are freed.
Expand All @@ -193,10 +205,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error {
default:
return errInvalidDropType
}

full = q.buf.full()
}

// NB: The qbuf can still hold a single super huge buffer way bigger
// maxSizeBytes.
q.buf.push(buf)
q.metrics.enqueueSuccesses.Inc(1)
q.metrics.queueBufSizes.RecordValue(float64(len(buf.Bytes())))
return nil
}

Expand Down Expand Up @@ -243,15 +260,24 @@ func (q *queue) Flush() {
func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) {
var n int

q.mtx.Lock()
// Some bits and pieces of this logic could be done under
// a read lock as opposed to taking a full lock but that
// would unnecessarily add complexity for no meaningful gain
// in performance. Besides, grabbing and releasing multiple times
// could be more expensive than grabbing a full lock once.
q.bufMtx.Lock()

// Before initiating a flush, record the size of the queue
q.metrics.bytesToFlush.RecordValue(float64(q.buf.getSizeBytes()))
q.metrics.itemsToFlush.RecordValue(float64(q.buf.sizeItems()))

if q.buf.size() == 0 {
q.mtx.Unlock()
if q.buf.sizeItems() == 0 {
q.bufMtx.Unlock()
return n, io.EOF
}

*tmpWriteBuf = (*tmpWriteBuf)[:0]
for q.buf.size() > 0 {
for q.buf.sizeItems() > 0 {
protoBuffer := q.buf.peek()
bytes := protoBuffer.Bytes()

Expand All @@ -270,8 +296,8 @@ func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) {
protoBuffer.Close()
}

// mutex is not held while doing IO
q.mtx.Unlock()
q.bufMtx.Unlock()
// Perform the write after releasing the bufMtx

if n == 0 {
return n, io.EOF
Expand All @@ -288,21 +314,45 @@ func (q *queue) flush(tmpWriteBuf *[]byte) (int, error) {
}

func (q *queue) Size() int {
return int(q.buf.size())
return int(q.buf.sizeItems())
}

func (q *queue) SizeBytes() int {
return int(q.buf.getSizeBytes())
}

type queueMetrics struct {
enqueueSuccesses tally.Counter
enqueueOldestDropped tally.Counter
enqueueCurrentDropped tally.Counter
enqueueClosedErrors tally.Counter
bytesToFlush tally.Histogram
itemsToFlush tally.Histogram
queueBufSizes tally.Histogram
connWriteSuccesses tally.Counter
connWriteErrors tally.Counter
}

func newQueueMetrics(s tally.Scope) queueMetrics {
bucketsItemsToFlush := append(
tally.ValueBuckets{0},
tally.MustMakeExponentialValueBuckets(_queueFlushItemsMetricBucketStart, 2, _queueFlushItemsMetricBuckets)...,
)

bucketsBytesToFlush := append(
tally.ValueBuckets{0},
tally.MustMakeExponentialValueBuckets(_queueFlushBytesMetricBucketStart, 2, _queueFlushBytesMetricBuckets)...,
)

bucketsQueueBufSizes := append(
tally.ValueBuckets{0},
tally.MustMakeExponentialValueBuckets(_queueBufSizesMetricBucketStart, 2, _queueBufSizesMetricBuckets)...,
)

enqueueScope := s.Tagged(map[string]string{"action": "enqueue"})
connWriteScope := s.Tagged(map[string]string{"action": "conn-write"})
flushScope := s.Tagged(map[string]string{"action": "flush"})

return queueMetrics{
enqueueSuccesses: enqueueScope.Counter("successes"),
enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}).
Expand All @@ -311,6 +361,10 @@ func newQueueMetrics(s tally.Scope) queueMetrics {
Counter("dropped"),
enqueueClosedErrors: enqueueScope.Tagged(map[string]string{"error-type": "queue-closed"}).
Counter("errors"),
queueBufSizes: enqueueScope.Histogram("buf-sizes", bucketsQueueBufSizes),
bytesToFlush: flushScope.Histogram("flush-bytes", bucketsBytesToFlush),
itemsToFlush: flushScope.Histogram("flush-items", bucketsItemsToFlush),

connWriteSuccesses: connWriteScope.Counter("successes"),
connWriteErrors: connWriteScope.Counter("errors"),
}
Expand All @@ -320,16 +374,23 @@ func newQueueMetrics(s tally.Scope) queueMetrics {
type qbuf struct {
b []protobuf.Buffer
// buffer cursors
r uint32
w uint32
r uint32
w uint32
sizeBytes uint32
maxSizeBytes uint32
}

func (q *qbuf) size() uint32 {
func (q *qbuf) sizeItems() uint32 {
return q.w - q.r
}

func (q *qbuf) getSizeBytes() uint32 {
return q.sizeBytes
}

func (q *qbuf) full() bool {
return q.size() == uint32(cap(q.b))
return q.sizeItems() == uint32(cap(q.b)) ||
(q.maxSizeBytes > 0 && q.sizeBytes >= q.maxSizeBytes)
}

func (q *qbuf) mask(idx uint32) uint32 {
Expand All @@ -341,13 +402,15 @@ func (q *qbuf) push(buf protobuf.Buffer) {
idx := q.mask(q.w)
q.b[idx].Close()
q.b[idx] = buf
q.sizeBytes += uint32(len(buf.Bytes()))
}

func (q *qbuf) shift() protobuf.Buffer {
q.r++
idx := q.mask(q.r)
val := q.b[idx]
q.b[idx] = protobuf.Buffer{}
q.sizeBytes -= uint32(len(val.Bytes()))
return val
}

Expand Down
14 changes: 14 additions & 0 deletions src/aggregator/client/queue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4167b43

Please sign in to comment.