Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Raw TCP Client write queueing/buffering refactor #3342

Merged
merged 13 commits into from
Mar 9, 2021
1 change: 0 additions & 1 deletion scripts/development/m3_stack/m3collector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ reporter:
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
maxTimerBatchSize: 1120
queueSize: 10000
queueDropType: oldest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ aggregator:
watermark:
low: 0.001
high: 0.01
flushSize: 1440
maxTimerBatchSize: 140
queueSize: 1000
queueDropType: oldest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ downsample:
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
forceFlushEvery: 1s
flushWorkerCount: 4
maxTimerBatchSize: 1120
queueSize: 10000
queueSize: 100
queueDropType: oldest
encoder:
initBufferSize: 2048
Expand All @@ -56,7 +57,7 @@ downsample:

ingest:
ingester:
workerPoolSize: 10000
workerPoolSize: 100
opPool:
size: 10000
retry:
Expand Down
14 changes: 7 additions & 7 deletions src/aggregator/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Configuration struct {
ShardCutoverWarmupDuration *time.Duration `yaml:"shardCutoverWarmupDuration"`
ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"`
Encoder EncoderConfiguration `yaml:"encoder"`
FlushSize int `yaml:"flushSize"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that break parsing of existing yaml configs where this field is present? Maybe leave it with omitempty annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the old one w/ deprecated comment

FlushWorkerCount int `yaml:"flushWorkerCount"`
ForceFlushEvery time.Duration `yaml:"forceFlushEvery"`
MaxBatchSize int `yaml:"maxBatchSize"`
MaxTimerBatchSize int `yaml:"maxTimerBatchSize"`
BatchFlushDeadline time.Duration `yaml:"batchFlushDeadline"`
QueueSize int `yaml:"queueSize"`
QueueDropType *DropType `yaml:"queueDropType"`
Connection ConnectionConfiguration `yaml:"connection"`
Expand Down Expand Up @@ -175,18 +175,18 @@ func (c *Configuration) newClientOptions(
if c.ShardCutoffLingerDuration != nil {
opts = opts.SetShardCutoffLingerDuration(*c.ShardCutoffLingerDuration)
}
if c.FlushSize != 0 {
opts = opts.SetFlushSize(c.FlushSize)
if c.FlushWorkerCount != 0 {
opts = opts.SetFlushWorkerCount(c.FlushWorkerCount)
}
if c.ForceFlushEvery != 0 {
opts = opts.SetForceFlushEvery(c.ForceFlushEvery)
}
if c.MaxBatchSize != 0 {
opts = opts.SetMaxBatchSize(c.MaxBatchSize)
}
if c.MaxTimerBatchSize != 0 {
opts = opts.SetMaxTimerBatchSize(c.MaxTimerBatchSize)
}
if c.BatchFlushDeadline != 0 {
opts = opts.SetBatchFlushDeadline(c.BatchFlushDeadline)
}
if c.QueueSize != 0 {
opts = opts.SetInstanceQueueSize(c.QueueSize)
}
Expand Down
12 changes: 6 additions & 6 deletions src/aggregator/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ encoder:
watermark:
low: 0.001
high: 0.01
flushSize: 1440
flushWorkerCount: 10
forceFlushEvery: 123s
maxBatchSize: 42
maxTimerBatchSize: 140
batchFlushDeadline: 123ms
queueSize: 1000
queueDropType: oldest
connection:
Expand Down Expand Up @@ -102,10 +102,10 @@ func TestConfigUnmarshal(t *testing.T) {
}, cfg.Encoder.BytesPool.Buckets)
require.Equal(t, 0.001, cfg.Encoder.BytesPool.Watermark.RefillLowWatermark)
require.Equal(t, 0.01, cfg.Encoder.BytesPool.Watermark.RefillHighWatermark)
require.Equal(t, 1440, cfg.FlushSize)
require.Equal(t, 10, cfg.FlushWorkerCount)
require.Equal(t, 123*time.Second, cfg.ForceFlushEvery)
require.Equal(t, 140, cfg.MaxTimerBatchSize)
require.Equal(t, 42, cfg.MaxBatchSize)
require.Equal(t, 123*time.Millisecond, cfg.BatchFlushDeadline)
require.Equal(t, 1000, cfg.QueueSize)
require.Equal(t, DropOldest, *cfg.QueueDropType)
require.Equal(t, time.Second, cfg.Connection.ConnectionTimeout)
Expand Down Expand Up @@ -153,10 +153,10 @@ func TestNewClientOptions(t *testing.T) {
require.True(t, store == opts.WatcherOptions().StagedPlacementStore())
require.Equal(t, 10*time.Minute, opts.ShardCutoverWarmupDuration())
require.Equal(t, time.Minute, opts.ShardCutoffLingerDuration())
require.Equal(t, 1440, opts.FlushSize())
require.Equal(t, 10, opts.FlushWorkerCount())
require.Equal(t, 123*time.Second, opts.ForceFlushEvery())
require.Equal(t, 140, opts.MaxTimerBatchSize())
require.Equal(t, 42, opts.MaxBatchSize())
require.Equal(t, 123*time.Millisecond, opts.BatchFlushDeadline())
require.Equal(t, DropOldest, opts.QueueDropType())
require.Equal(t, time.Second, opts.ConnectionOptions().ConnectionTimeout())
require.Equal(t, true, opts.ConnectionOptions().ConnectionKeepAlive())
Expand Down
67 changes: 30 additions & 37 deletions src/aggregator/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ const (

defaultAggregatorClient = LegacyAggregatorClient

defaultFlushSize = 1440
defaultFlushWorkerCount = 64

// defaultMaxTimerBatchSize is the default maximum timer batch size.
// By default there is no limit on the timer batch size.
defaultMaxTimerBatchSize = 0

// defaultInstanceQueueSize determines how many metrics can be buffered
// defaultInstanceQueueSize determines how many protobuf payloads can be buffered
// before it must wait for an existing batch to be flushed to an instance.
defaultInstanceQueueSize = 2 << 15 // ~65k
defaultInstanceQueueSize = 128

// By default traffic is cut over to shards 10 minutes before the designated
// cutover time in case there are issues with the instances owning the shards.
Expand All @@ -72,9 +72,6 @@ const (

// By default set maximum batch size to 8mb.
defaultMaxBatchSize = 2 << 22

// By default write at least every 100ms.
defaultBatchFlushDeadline = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -186,11 +183,17 @@ type Options interface {
// ConnectionOptions returns the connection options.
ConnectionOptions() ConnectionOptions

// SetFlushSize sets the buffer size to trigger a flush.
SetFlushSize(value int) Options
// SetFlushWorkerCount sets the max number of workers used for flushing.
SetFlushWorkerCount(value int) Options

// FlushWorkerCount returns the max number of workers used for flushing.
FlushWorkerCount() int

// SetForceFlushEvery sets the duration between forced flushes.
SetForceFlushEvery(value time.Duration) Options

// FlushSize returns the buffer size to trigger a flush.
FlushSize() int
// ForceFlushEvery returns the duration, if any, between forced flushes.
ForceFlushEvery() time.Duration

// SetMaxTimerBatchSize sets the maximum timer batch size.
SetMaxTimerBatchSize(value int) Options
Expand Down Expand Up @@ -218,12 +221,6 @@ type Options interface {
// MaxBatchSize returns the maximum buffer size that triggers a queue drain.
MaxBatchSize() int

// SetBatchFlushDeadline sets the deadline that triggers a write of queued buffers.
SetBatchFlushDeadline(value time.Duration) Options

// BatchFlushDeadline returns the deadline that triggers a write of queued buffers.
BatchFlushDeadline() time.Duration

// SetRWOptions sets RW options.
SetRWOptions(value xio.Options) Options

Expand All @@ -241,12 +238,12 @@ type options struct {
shardCutoffLingerDuration time.Duration
watcherOpts placement.WatcherOptions
connOpts ConnectionOptions
flushSize int
flushWorkerCount int
forceFlushEvery time.Duration
maxTimerBatchSize int
instanceQueueSize int
dropType DropType
maxBatchSize int
batchFlushDeadline time.Duration
m3msgOptions M3MsgOptions
rwOpts xio.Options
}
Expand All @@ -262,12 +259,11 @@ func NewOptions() Options {
shardCutoffLingerDuration: defaultShardCutoffLingerDuration,
watcherOpts: placement.NewWatcherOptions(),
connOpts: NewConnectionOptions(),
flushSize: defaultFlushSize,
flushWorkerCount: defaultFlushWorkerCount,
maxTimerBatchSize: defaultMaxTimerBatchSize,
instanceQueueSize: defaultInstanceQueueSize,
dropType: defaultDropType,
maxBatchSize: defaultMaxBatchSize,
batchFlushDeadline: defaultBatchFlushDeadline,
rwOpts: xio.NewOptions(),
}
}
Expand Down Expand Up @@ -395,14 +391,24 @@ func (o *options) ConnectionOptions() ConnectionOptions {
return o.connOpts
}

func (o *options) SetFlushSize(value int) Options {
func (o *options) SetFlushWorkerCount(value int) Options {
opts := *o
opts.flushSize = value
opts.flushWorkerCount = value
return &opts
}

func (o *options) FlushSize() int {
return o.flushSize
func (o *options) FlushWorkerCount() int {
return o.flushWorkerCount
}

func (o *options) SetForceFlushEvery(value time.Duration) Options {
opts := *o
opts.forceFlushEvery = value
return &opts
}

func (o *options) ForceFlushEvery() time.Duration {
return o.forceFlushEvery
}

func (o *options) SetMaxTimerBatchSize(value int) Options {
Expand Down Expand Up @@ -449,19 +455,6 @@ func (o *options) MaxBatchSize() int {
return o.maxBatchSize
}

func (o *options) SetBatchFlushDeadline(value time.Duration) Options {
opts := *o
if value < 0 {
value = defaultBatchFlushDeadline
}
opts.batchFlushDeadline = value
return &opts
}

func (o *options) BatchFlushDeadline() time.Duration {
return o.batchFlushDeadline
}

func (o *options) SetRWOptions(value xio.Options) Options {
opts := *o
opts.rwOpts = value
Expand Down
Loading