-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[coordinator] Influxdb importer endpoint (at /api/v1/influxdb/write) #2083
Conversation
Hey @fingon thanks for the change, going to take a pass reviewing this now - apologies for the delay. |
* Author: Markus Stenberg <[email protected]> | ||
* | ||
* Copyright (c) 2019 Aiven Oy | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These headers will have to be replaced with the default headers as appears in all other source in the repo.
* Author: Markus Stenberg <[email protected]> | ||
* | ||
* Copyright (c) 2019 Aiven Oy | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These headers will have to be replaced with the default headers as appears in all other source in the repo.
* Author: Markus Stenberg <[email protected]> | ||
* | ||
* Copyright (c) 2019 Aiven Oy | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These headers will have to be replaced with the default headers as appears in all other source in the repo.
// | ||
// It allow using any influxdb client, rewriting the tag names + the | ||
// magic __name__ tag to match what Prometheus expects | ||
type promRewriter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably able to do this with just string utilities to avoid regexp matching? I think it's fine to merge this however without considering most optimal performance and iterate on this later, just wanted to call it out as potentially limiting for overall throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I was thinking a bitmap to make it even faster (i.e. [255]bool arrays, describing if each rune was valid), since this is a write endpoint and probably expected to be pretty hot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[256]byte is ~10x faster than regexp match even with precompiled regexp (+ you get essentially free replace on top of it too so not needing the slow path at all). So changing to this, although I'd like to quote Knuth here, as it isn't really showing in our profiling :-)
) | ||
|
||
const ( | ||
InfluxWriteURL = handler.RoutePrefixV1 + "/influxdb/write" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised the linter didn't catch this, since InfluxWriteURL
it will need a comment. i.e. // InfluxWriteURL is the Influx DB write handler URL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... I think our linter hasn't been running for a while (tbh kinda dreading turning it back on since I'm sure there'll be hours of boring renaming involved :p )
return self.err.FinalError() | ||
} | ||
|
||
func NewInfluxWriterHandler(downsamplerAndWriter ingest.DownsamplerAndWriter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, was holding off on reviewing this since was hoping to get #2073 in first which refactors how we set up handlers a little (now this should just take a opts options.HandlerOptions
and then take the downsampler and iOpts from there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted to using it; now the API looks much cleaner at least.
src/query/api/v1/httpd/handler.go
Outdated
@@ -240,6 +241,10 @@ func (h *Handler) RegisterRoutes() error { | |||
h.tagOptions, h.timeoutOpts, h.instrumentOpts)).ServeHTTP, | |||
).Methods(native.PromReadInstantHTTPMethods...) | |||
|
|||
// InfluxDB write endpoint | |||
h.router.HandleFunc(influxdb.InfluxWriteURL, | |||
wrapped(influxdb.NewInfluxWriterHandler(h.downsamplerAndWriter, h.instrumentOpts)).ServeHTTP).Methods(m3json.JSONWriteHTTPMethod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; Could you define HTTP methods within handler/influxdb
instead?
|
||
func newPromRewriter() *promRewriter { | ||
return &promRewriter{ | ||
Metric: newRegexpRewriter("^[a-zA-Z_:][a-zA-Z0-9_:]*$", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Think this would be more straightforward to use if promRewriter
didn't expose these, but instead had validateMetric(..)
and validateLabel(..)
methods
"io/ioutil" | ||
"net/http" | ||
|
||
imodels "github.com/influxdata/influxdb/models" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: our linter enforces the following grouping format:
// stdlib imports
// m3db imports
// other imports (here that would be imodels "github.com/influxdata/influxdb/models"
and "go.uber.org/zap"
)
point := self.points[self.pointIndex] | ||
it := point.FieldIterator() | ||
n := 0 | ||
for it.Next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like Next()
for FieldIterator
is pretty heavy; iterating twice is likely to be more expensive than taking the hit from resizing allocations... may be better to just init to a default value of 10
or so?
case imodels.Boolean: | ||
v, err := it.BooleanValue() | ||
if err != nil { | ||
err = fmt.Errorf("Error decoding boolean: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; Not sure if this is a github issue or if this is some weird character that's making it show up red here, feel free to ignore this if it's working fine
case imodels.Integer: | ||
v, err := it.IntegerValue() | ||
if err != nil { | ||
err = fmt.Errorf("Error decoding integer: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: errors should start with lower case; also looks like the influx errors are quite verbose already, so you should be able to just use err
without wrapping it further
@@ -0,0 +1,49 @@ | |||
/* | |||
* Author: Markus Stenberg <[email protected]> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, may have to use the default headers... it's super annoying but blame the lawyers 🤦♂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a problem for us, but in some jurisdictions (e.g. some EU countries IIRC) you cannot actually assign copyright from the original employee, just perma-license it onward.
// explosion, we drop them for now | ||
continue | ||
} | ||
nlen := len(point.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can pull this outside of the loop, since this seems constant for each iteration (also can probably append the _
character to point.Name()
outside of the loop to avoid doing it here)
return input | ||
} | ||
|
||
// Stateful utility, which handles both __name__ ('metric') tag, as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be stateful, or can it just be utility methods? Doesn't look like it changes behavior at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's bit historic comment anyway, guess I'll just remove stateful; internally it has some state (regexps in the old version, and now []byte(256) tables in the new one).
name := make([]byte, nlen+1+len(it.FieldKey())) | ||
copy(name, point.Name()) | ||
copy(name[nlen:], []byte("_")) | ||
copy(name[nlen+1:], it.FieldKey()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly, append
is almost as fast as copy
, and provides some safeguards against out of bounds panics in case some code changes in the future
ptags := point.Tags() | ||
tags := models.NewTags(len(ptags), nil) | ||
for _, tag := range ptags { | ||
name := self.promRewriter.Label.ToValid(tag.Key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... think that there may be a chance of name
clashing across different tags which can lead to undefined behavior (i.e. if you have two labels, foo.bar
and foo_bar
, they both evaluate to foo_bar
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Also sticking in name (whether intentionally or not) seems like bad idea, so I will prevent that too.
} | ||
|
||
func (self *regexpRewriter) rewrite(input []byte) { | ||
if len(input) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you make this early return to match the Effective Go guide on control flow?
https://golang.org/doc/effective_go.html
i.e.
if len(input) == 0 {
return
}
if !self.okStart[input[0]] {
input[0] = self.replacement
}
for i := 1; i < len(input); i++ {
if !self.okRest[input[i]] {
input[i] = self.replacement
}
}
return ®expRewriter{okStart: createArray(startRe), okRest: createArray(restRe), replacement: byte('_')} | ||
} | ||
|
||
func (self *regexpRewriter) rewrite(input []byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Mind using a more "Go"-like self var ref? i.e. func (r *regexpRewriter)
Usually most go packages will use a single letter, most relevant to the base "thing" that the struct implements. "Rewriter" in this instance -> "r".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll switch to my backup style, e.g. letter per word. (My editor does the self bits by default but little search&replace won't hurt).
if self.populateFields() { | ||
point := self.points[self.pointIndex] | ||
ptags := point.Tags() | ||
tags := models.NewTags(len(ptags), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll want to flow the tag options from the server into the second argument to models.NewTags(...)
here. Otherwise the Tag ID scheme won't be the same as what is configured by the m3coordinator config.
See tagOpts
and tagOptions
in the Prometheus remote write endpoint for reference:
https://github.com/m3db/m3/blob/master/src/query/api/v1/handler/prometheus/remote/write.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really my only major comment is about flowing the tag options to the handler.
Otherwise LGTM and this is ready to go in.
An integration test would be great but not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov Report
@@ Coverage Diff @@
## master #2083 +/- ##
========================================
Coverage ? 72.3%
========================================
Files ? 1010
Lines ? 86847
Branches ? 0
========================================
Hits ? 62876
Misses ? 19789
Partials ? 4182
Continue to review full report at Codecov.
|
Hey, just as a heads up; we're attempting to release |
createArray := func(okRe string) (ret [256]bool) { | ||
re := regexp.MustCompile(okRe) | ||
// Check for only 7 bit non-control ASCII characters | ||
for i := 32; i < 128; i++ { | ||
if re.Match([]byte{byte(i)}) { | ||
ret[i] = true | ||
} | ||
} | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we drop regex from this completely? i.e. generate this bool array from a list of valid characters directly? The fewer moving parts the better 👍
What this PR does / why we need it:
At least with telegraf, the influxdb ingestion seems to work much better than prometheus, and there's also a lot more influxdb clients out there. So simple write endpoint seemed helpful, and at least in our case, it solved some scalability issues we had at coordinators.
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?:
TBD - probably does, if this is desirable to start with.