-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a m3msg server for ingestion #1028
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1028 +/- ##
==========================================
+ Coverage 76.99% 77.07% +0.08%
==========================================
Files 439 439
Lines 37191 37191
==========================================
+ Hits 28636 28666 +30
+ Misses 6502 6481 -21
+ Partials 2053 2044 -9
Continue to review full report at Codecov.
|
writeFn WriteFn, | ||
iOpts instrument.Options, | ||
) (server.Server, error) { | ||
scope := iOpts.MetricsScope().Tagged(map[string]string{"server": "m3msg"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is what you're doing with tagged different than calling subscope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep it's different, subscope will add the string to the metric name, tagged will add a tag to the metric and does not mess with the name.
|
||
h.processMessage(msg) | ||
} | ||
if msgErr != nil && msgErr != io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how will the loop get restarted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the consumer is long lived with a tcp connection, c.Message() is a blocking call and the loop continues to call it to keep decoding messages from the connection
metricTimeNanos int64, | ||
value float64, | ||
sp policy.StoragePolicy, | ||
callback *RefCountedCallback, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supernit: Might be nice to make this have CallbackSuccess() and CallbackFailure()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to it, I use the current interface so it's cheaper for us to add more callback types in the future
|
||
// Callback performs the callback. | ||
func (r *RefCountedCallback) Callback(t CallbackType) { | ||
if t == OnSuccess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if they callback with failure, refCount will never reach 0 and m3msg will retry? Is there any concept of an explicit nack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we actually do nack in statsdex, like when we get any non-retriable errors, we still callback success and will ack the message, I could use another callback type for that case so it's easier to understand
} | ||
|
||
// NewRefCountedCallback creates a RefCountedCallback. | ||
func NewRefCountedCallback(msg consumer.Message) *RefCountedCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're not doing any pooling, is all this ref counting necessary? Seems like you could just either Ack or Nack at the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah the thing is, each message can contain more than one metric, so we only ack the message when all the metrics were ingested successfully. when one of them failed, we won't ack the message and it will be retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you "fail faster" if Callback(OnRetriableError) immediately cause a nack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately m3msg does not support that right now, retry only happens if a message is not acked within X amount of time. Could consider adding it in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Part 1 for the ingestion change in M3coordinator.
This diff adds a m3msg server which decodes traffic from m3msg consumer into metrics and takes in a WriteFn to write those metrics.
Part 2 will be implementing a storage based ingester to fulfill the WriteFn.