Skip to content

Commit

Permalink
[aggregator] Raw TCP Client write queueing/buffering refactor (#3342)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Mar 9, 2021
1 parent 5c30780 commit 079ac7d
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 560 deletions.
1 change: 0 additions & 1 deletion scripts/development/m3_stack/m3collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ reporter:
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
maxTimerBatchSize: 1120
queueSize: 10000
queueDropType: oldest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ aggregator:
watermark:
low: 0.001
high: 0.01
flushSize: 1440
maxTimerBatchSize: 140
queueSize: 1000
queueDropType: oldest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ downsample:
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
forceFlushEvery: 1s
flushWorkerCount: 4
maxTimerBatchSize: 1120
queueSize: 10000
queueSize: 100
queueDropType: oldest
encoder:
initBufferSize: 2048
Expand All @@ -56,7 +57,7 @@ downsample:

ingest:
ingester:
workerPoolSize: 10000
workerPoolSize: 100
opPool:
size: 10000
retry:
Expand Down
15 changes: 8 additions & 7 deletions src/aggregator/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type Configuration struct {
ShardCutoverWarmupDuration *time.Duration `yaml:"shardCutoverWarmupDuration"`
ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"`
Encoder EncoderConfiguration `yaml:"encoder"`
FlushSize int `yaml:"flushSize"`
FlushSize int `yaml:"flushSize,omitempty"` // FlushSize is deprecated
FlushWorkerCount int `yaml:"flushWorkerCount"`
ForceFlushEvery time.Duration `yaml:"forceFlushEvery"`
MaxBatchSize int `yaml:"maxBatchSize"`
MaxTimerBatchSize int `yaml:"maxTimerBatchSize"`
BatchFlushDeadline time.Duration `yaml:"batchFlushDeadline"`
QueueSize int `yaml:"queueSize"`
QueueDropType *DropType `yaml:"queueDropType"`
Connection ConnectionConfiguration `yaml:"connection"`
Expand Down Expand Up @@ -175,18 +176,18 @@ func (c *Configuration) newClientOptions(
if c.ShardCutoffLingerDuration != nil {
opts = opts.SetShardCutoffLingerDuration(*c.ShardCutoffLingerDuration)
}
if c.FlushSize != 0 {
opts = opts.SetFlushSize(c.FlushSize)
if c.FlushWorkerCount != 0 {
opts = opts.SetFlushWorkerCount(c.FlushWorkerCount)
}
if c.ForceFlushEvery != 0 {
opts = opts.SetForceFlushEvery(c.ForceFlushEvery)
}
if c.MaxBatchSize != 0 {
opts = opts.SetMaxBatchSize(c.MaxBatchSize)
}
if c.MaxTimerBatchSize != 0 {
opts = opts.SetMaxTimerBatchSize(c.MaxTimerBatchSize)
}
if c.BatchFlushDeadline != 0 {
opts = opts.SetBatchFlushDeadline(c.BatchFlushDeadline)
}
if c.QueueSize != 0 {
opts = opts.SetInstanceQueueSize(c.QueueSize)
}
Expand Down
12 changes: 6 additions & 6 deletions src/aggregator/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ encoder:
watermark:
low: 0.001
high: 0.01
flushSize: 1440
flushWorkerCount: 10
forceFlushEvery: 123s
maxBatchSize: 42
maxTimerBatchSize: 140
batchFlushDeadline: 123ms
queueSize: 1000
queueDropType: oldest
connection:
Expand Down Expand Up @@ -102,10 +102,10 @@ func TestConfigUnmarshal(t *testing.T) {
}, cfg.Encoder.BytesPool.Buckets)
require.Equal(t, 0.001, cfg.Encoder.BytesPool.Watermark.RefillLowWatermark)
require.Equal(t, 0.01, cfg.Encoder.BytesPool.Watermark.RefillHighWatermark)
require.Equal(t, 1440, cfg.FlushSize)
require.Equal(t, 10, cfg.FlushWorkerCount)
require.Equal(t, 123*time.Second, cfg.ForceFlushEvery)
require.Equal(t, 140, cfg.MaxTimerBatchSize)
require.Equal(t, 42, cfg.MaxBatchSize)
require.Equal(t, 123*time.Millisecond, cfg.BatchFlushDeadline)
require.Equal(t, 1000, cfg.QueueSize)
require.Equal(t, DropOldest, *cfg.QueueDropType)
require.Equal(t, time.Second, cfg.Connection.ConnectionTimeout)
Expand Down Expand Up @@ -153,10 +153,10 @@ func TestNewClientOptions(t *testing.T) {
require.True(t, store == opts.WatcherOptions().StagedPlacementStore())
require.Equal(t, 10*time.Minute, opts.ShardCutoverWarmupDuration())
require.Equal(t, time.Minute, opts.ShardCutoffLingerDuration())
require.Equal(t, 1440, opts.FlushSize())
require.Equal(t, 10, opts.FlushWorkerCount())
require.Equal(t, 123*time.Second, opts.ForceFlushEvery())
require.Equal(t, 140, opts.MaxTimerBatchSize())
require.Equal(t, 42, opts.MaxBatchSize())
require.Equal(t, 123*time.Millisecond, opts.BatchFlushDeadline())
require.Equal(t, DropOldest, opts.QueueDropType())
require.Equal(t, time.Second, opts.ConnectionOptions().ConnectionTimeout())
require.Equal(t, true, opts.ConnectionOptions().ConnectionKeepAlive())
Expand Down
67 changes: 30 additions & 37 deletions src/aggregator/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ const (

defaultAggregatorClient = LegacyAggregatorClient

defaultFlushSize = 1440
defaultFlushWorkerCount = 64

// defaultMaxTimerBatchSize is the default maximum timer batch size.
// By default there is no limit on the timer batch size.
defaultMaxTimerBatchSize = 0

// defaultInstanceQueueSize determines how many metrics can be buffered
// defaultInstanceQueueSize determines how many protobuf payloads can be buffered
// before it must wait for an existing batch to be flushed to an instance.
defaultInstanceQueueSize = 2 << 15 // ~65k
defaultInstanceQueueSize = 128

// By default traffic is cut over to shards 10 minutes before the designated
// cutover time in case there are issues with the instances owning the shards.
Expand All @@ -72,9 +72,6 @@ const (

// By default set maximum batch size to 8mb.
defaultMaxBatchSize = 2 << 22

// By default write at least every 100ms.
defaultBatchFlushDeadline = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -186,11 +183,17 @@ type Options interface {
// ConnectionOptions returns the connection options.
ConnectionOptions() ConnectionOptions

// SetFlushSize sets the buffer size to trigger a flush.
SetFlushSize(value int) Options
// SetFlushWorkerCount sets the max number of workers used for flushing.
SetFlushWorkerCount(value int) Options

// FlushWorkerCount returns the max number of workers used for flushing.
FlushWorkerCount() int

// SetForceFlushEvery sets the duration between forced flushes.
SetForceFlushEvery(value time.Duration) Options

// FlushSize returns the buffer size to trigger a flush.
FlushSize() int
// ForceFlushEvery returns the duration, if any, between forced flushes.
ForceFlushEvery() time.Duration

// SetMaxTimerBatchSize sets the maximum timer batch size.
SetMaxTimerBatchSize(value int) Options
Expand Down Expand Up @@ -218,12 +221,6 @@ type Options interface {
// MaxBatchSize returns the maximum buffer size that triggers a queue drain.
MaxBatchSize() int

// SetBatchFlushDeadline sets the deadline that triggers a write of queued buffers.
SetBatchFlushDeadline(value time.Duration) Options

// BatchFlushDeadline returns the deadline that triggers a write of queued buffers.
BatchFlushDeadline() time.Duration

// SetRWOptions sets RW options.
SetRWOptions(value xio.Options) Options

Expand All @@ -241,12 +238,12 @@ type options struct {
shardCutoffLingerDuration time.Duration
watcherOpts placement.WatcherOptions
connOpts ConnectionOptions
flushSize int
flushWorkerCount int
forceFlushEvery time.Duration
maxTimerBatchSize int
instanceQueueSize int
dropType DropType
maxBatchSize int
batchFlushDeadline time.Duration
m3msgOptions M3MsgOptions
rwOpts xio.Options
}
Expand All @@ -262,12 +259,11 @@ func NewOptions() Options {
shardCutoffLingerDuration: defaultShardCutoffLingerDuration,
watcherOpts: placement.NewWatcherOptions(),
connOpts: NewConnectionOptions(),
flushSize: defaultFlushSize,
flushWorkerCount: defaultFlushWorkerCount,
maxTimerBatchSize: defaultMaxTimerBatchSize,
instanceQueueSize: defaultInstanceQueueSize,
dropType: defaultDropType,
maxBatchSize: defaultMaxBatchSize,
batchFlushDeadline: defaultBatchFlushDeadline,
rwOpts: xio.NewOptions(),
}
}
Expand Down Expand Up @@ -395,14 +391,24 @@ func (o *options) ConnectionOptions() ConnectionOptions {
return o.connOpts
}

func (o *options) SetFlushSize(value int) Options {
func (o *options) SetFlushWorkerCount(value int) Options {
opts := *o
opts.flushSize = value
opts.flushWorkerCount = value
return &opts
}

func (o *options) FlushSize() int {
return o.flushSize
func (o *options) FlushWorkerCount() int {
return o.flushWorkerCount
}

func (o *options) SetForceFlushEvery(value time.Duration) Options {
opts := *o
opts.forceFlushEvery = value
return &opts
}

func (o *options) ForceFlushEvery() time.Duration {
return o.forceFlushEvery
}

func (o *options) SetMaxTimerBatchSize(value int) Options {
Expand Down Expand Up @@ -449,19 +455,6 @@ func (o *options) MaxBatchSize() int {
return o.maxBatchSize
}

func (o *options) SetBatchFlushDeadline(value time.Duration) Options {
opts := *o
if value < 0 {
value = defaultBatchFlushDeadline
}
opts.batchFlushDeadline = value
return &opts
}

func (o *options) BatchFlushDeadline() time.Duration {
return o.batchFlushDeadline
}

func (o *options) SetRWOptions(value xio.Options) Options {
opts := *o
opts.rwOpts = value
Expand Down
Loading

0 comments on commit 079ac7d

Please sign in to comment.