-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[aggregator] Add passthrough functionality in m3aggregator using rawtcp server #2235
Conversation
Is it worth investigation of how to improve performance of m3msg consumer with #2105 ? FWIW we are using M3Msg to talk directly to aggregators for all metrics flow and it performs quite well, granted we are avoiding all allocations at message consumption time (reuse a single protobuf message each time a message is processed): |
m3msg needs a significant rework to be usable for this - connection per topic and per consumer incurs a significant overhead. |
There's also a cost on more concurrent workloads due to pooling in m3msg - at high throughput, it can serialize everything. |
@@ -402,6 +422,8 @@ func computeExpectedAggregationBuckets( | |||
values, err = addForwardedMetricToAggregation(values, mu.forwarded) | |||
case timedMetric: | |||
values, err = addTimedMetricToAggregation(values, mu.timed) | |||
case passthroughMetric: | |||
err = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still pending?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
Passthrough metrics need no aggregation, so the aggregation bucket calculation is skipped here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm could you add a comment indicating the same thing
// Validate results. | ||
expected := computeExpectedPassthroughResults(t, dataset) | ||
actual := testServer.sortedResults() | ||
require.Equal(t, expected, actual) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this going to flake sometimes -- e.g. in case of retries or something?
i'd expect this to either dedupe actual, or check expected contained in actual. not sure if that's required tho, you tell me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Adding a dedup function for both expected and actual results.
fix integration test fix integration test fix Add metrics for passthrough in follower instances [aggregator] Add compatibility for rollup rules with timed metrics (#2251) Add pass-through in m3aggregator using rawtcp server fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM w/ one final nit
What this PR does / why we need it:
This PR introduces the pass-through functionality into m3aggregator.
Special notes for your reviewer:
#2105 is very cpu-demanding, so I removed the m3msg consumer in m3aggregator and use rawtcp instead.
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: