-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[aggregator] Raw TCP Client write queueing/buffering refactor #3342
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3342 +/- ##
=========================================
- Coverage 72.4% 72.4% -0.1%
=========================================
Files 1098 1098
Lines 101927 101864 -63
=========================================
- Hits 73897 73825 -72
+ Misses 22955 22952 -3
- Partials 5075 5087 +12
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report at Codecov.
|
@@ -52,10 +52,10 @@ type Configuration struct { | |||
ShardCutoverWarmupDuration *time.Duration `yaml:"shardCutoverWarmupDuration"` | |||
ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"` | |||
Encoder EncoderConfiguration `yaml:"encoder"` | |||
FlushSize int `yaml:"flushSize"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that break parsing of existing yaml configs where this field is present? Maybe leave it with omitempty
annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kept the old one w/ deprecated comment
) | ||
|
||
// Round up queue size to power of 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code seems to be rounding down the number rather than rounding up :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given 3 ops in a row, might worth extracting it into a single-line function with a single "sanity" unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will round up as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, it def rounds up :)
src/aggregator/client/queue.go
Outdated
case <-q.doneCh: | ||
return | ||
} | ||
if cap(*buf) < _queueMaxWriteBufSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultra-nit: use len
instead of cap
for consistency?
src/aggregator/client/queue.go
Outdated
case <-q.doneCh: | ||
return | ||
} | ||
if cap(*buf) < _queueMaxWriteBufSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want to reset the buffer upon successful write regardless of any checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not immediately clear what the goal is here - we don't want to pool slices we've expanded past the max buffer size? Is this just to minimize unexpected memory growth and keep things bounded? Can you add a quick comment?
) | ||
|
||
// Round up queue size to power of 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will round up as expected.
src/aggregator/client/queue.go
Outdated
b := q.buf.shift() | ||
|
||
bytes := b.Bytes() | ||
if bytes == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you do a len
check here instead? not sure if it's possible to have a zero-length, non-nil slice returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good idea, this relies too much on protobuf.Buffers implementation - it doesn't have an easier way to test if it's a zero value.
src/aggregator/client/queue.go
Outdated
case <-q.doneCh: | ||
return | ||
} | ||
if cap(*buf) < _queueMaxWriteBufSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not immediately clear what the goal is here - we don't want to pool slices we've expanded past the max buffer size? Is this just to minimize unexpected memory growth and keep things bounded? Can you add a quick comment?
src/aggregator/client/queue.go
Outdated
q.writeAndReset() | ||
lastDrain = time.Now() | ||
// Check buffer capacity, not length, to make sure we're not pooling slices that are too large. | ||
// Otherwise, it could in multi-megabyte slices hanging around, in case we get a spike in writes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultra-nit: "... could result in ...". Thanks for comment btw
* master: [dbnode] Remove unused shardBlockVolume (#3347) Fix new Go 1.15+ vet check failures (#3345) [coordinator] Add config option to make rollup rules untimed (#3343) [aggregator] Raw TCP Client write queueing/buffering refactor (#3342) [dbnode] Fail M3TSZ encoding on DeltaOfDelta overflow (#3329)
What this PR does / why we need it:
prepareEnqueueBufferWithLock
meant it was possible to NOT flush metrics, until there was enough traffic to a shard to reach$flushSize
. It caused weird side effects, where metrics are emitted at a very inconsistent interval if the volume was small enough.Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: