From 7efeb7aeeef0e1ab13bd7f791cd1503b647bdaab Mon Sep 17 00:00:00 2001 From: Markus Stenberg Date: Fri, 10 Jan 2020 15:54:01 +0200 Subject: [PATCH] Influxdb importer endpoint (at /api/v1/influxdb/write) --- glide.lock | 4 + glide.yaml | 5 + src/query/api/v1/handler/influxdb/rewrite.go | 91 ++++++ .../api/v1/handler/influxdb/rewrite_test.go | 53 ++++ src/query/api/v1/handler/influxdb/write.go | 290 ++++++++++++++++++ .../api/v1/handler/influxdb/write_test.go | 96 ++++++ src/query/api/v1/httpd/handler.go | 5 + 7 files changed, 544 insertions(+) create mode 100644 src/query/api/v1/handler/influxdb/rewrite.go create mode 100644 src/query/api/v1/handler/influxdb/rewrite_test.go create mode 100644 src/query/api/v1/handler/influxdb/write.go create mode 100644 src/query/api/v1/handler/influxdb/write_test.go diff --git a/glide.lock b/glide.lock index d02eea1f6c..49362f728d 100644 --- a/glide.lock +++ b/glide.lock @@ -641,6 +641,10 @@ imports: - stylecheck - unused - version +- name: github.com/influxdata/influxdb + version: 01c8dd416270f424ab0c40f9291e269ac6921964 + subpackages: + - models testImports: - name: github.com/glycerine/go-unsnap-stream version: 98d31706395aaac22e29676617f2ee37bee55b5a diff --git a/glide.yaml b/glide.yaml index f5c67a1714..9b85e6ed00 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,5 +1,10 @@ package: github.com/m3db/m3 import: + - package: github.com/influxdata/influxdb + version: 01c8dd416270f424ab0c40f9291e269ac6921964 + subpackages: + - models + - package: github.com/m3db/bitset version: 07973db6b78acb62ac207d0538055e874b49d90d diff --git a/src/query/api/v1/handler/influxdb/rewrite.go b/src/query/api/v1/handler/influxdb/rewrite.go new file mode 100644 index 0000000000..615ad1b6c8 --- /dev/null +++ b/src/query/api/v1/handler/influxdb/rewrite.go @@ -0,0 +1,91 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package influxdb + +import ( + "regexp" +) + +type regexpRewriter struct { + okStart, okRest [256]bool + replacement byte +} + +func newRegexpRewriter(startRe, restRe string) *regexpRewriter { + createArray := func(okRe string) (ret [256]bool) { + re := regexp.MustCompile(okRe) + // Check for only 7 bit non-control ASCII characters + for i := 32; i < 128; i++ { + if re.Match([]byte{byte(i)}) { + ret[i] = true + } + } + return + } + return ®expRewriter{okStart: createArray(startRe), okRest: createArray(restRe), replacement: byte('_')} +} + +func (rr *regexpRewriter) rewrite(input []byte) { + if len(input) == 0 { + return + } + if !rr.okStart[input[0]] { + input[0] = rr.replacement + } + for i := 1; i < len(input); i++ { + if !rr.okRest[input[i]] { + input[i] = rr.replacement + } + } +} + +// Utility, which handles both __name__ ('metric') tag, as well as +// rest of tags ('labels') +// +// It allow using any influxdb client, rewriting the tag names + the +// magic __name__ tag to match what Prometheus expects +type promRewriter struct { + metric, metricTail, label *regexpRewriter +} + +func newPromRewriter() *promRewriter { + return &promRewriter{ + metric: newRegexpRewriter( + "[a-zA-Z_:]", + "[a-zA-Z0-9_:]"), + metricTail: newRegexpRewriter( + "[a-zA-Z0-9_:]", + "[a-zA-Z0-9_:]"), + label: newRegexpRewriter( + "[a-zA-Z_]", "[a-zA-Z0-9_]")} +} + +func (pr *promRewriter) rewriteMetric(data []byte) { + pr.metric.rewrite(data) +} + +func (pr *promRewriter) rewriteMetricTail(data []byte) { + pr.metricTail.rewrite(data) +} + +func (pr *promRewriter) rewriteLabel(data []byte) { + pr.label.rewrite(data) +} diff --git a/src/query/api/v1/handler/influxdb/rewrite_test.go b/src/query/api/v1/handler/influxdb/rewrite_test.go new file mode 100644 index 0000000000..213a7b7239 --- /dev/null +++ b/src/query/api/v1/handler/influxdb/rewrite_test.go @@ -0,0 +1,53 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package influxdb + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type test struct { + in, outMetric, outMetricTail, outLabel string +} + +func TestPromRewriter(t *testing.T) { + r := newPromRewriter() + tests := []test{{"foo", "foo", "foo", "foo"}, + {".bar", "_bar", "_bar", "_bar"}, + {"b.ar", "b_ar", "b_ar", "b_ar"}, + {":bar", ":bar", ":bar", "_bar"}, + {"ba:r", "ba:r", "ba:r", "ba_r"}, + {"9bar", "_bar", "9bar", "_bar"}, + } + for _, test := range tests { + in1 := []byte(test.in) + r.rewriteMetric(in1) + assert.Equal(t, test.outMetric, string(in1)) + in2 := []byte(test.in) + r.rewriteMetricTail(in2) + assert.Equal(t, test.outMetricTail, string(in2)) + in3 := []byte(test.in) + r.rewriteLabel(in3) + assert.Equal(t, test.outLabel, string(in3)) + } +} diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go new file mode 100644 index 0000000000..ba8546e6b9 --- /dev/null +++ b/src/query/api/v1/handler/influxdb/write.go @@ -0,0 +1,290 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package influxdb + +import ( + "bytes" + "errors" + "fmt" + "io/ioutil" + "net/http" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/query/util/logging" + + xerrors "github.com/m3db/m3/src/x/errors" + xhttp "github.com/m3db/m3/src/x/net/http" + xtime "github.com/m3db/m3/src/x/time" + imodels "github.com/influxdata/influxdb/models" + "go.uber.org/zap" +) + +const ( + // InfluxWriteURL is the Influx DB write handler URL + InfluxWriteURL = handler.RoutePrefixV1 + "/influxdb/write" + + // InfluxWriteHTTPMethod is the HTTP method used with this resource + InfluxWriteHTTPMethod = http.MethodPost +) + +type ingestWriteHandler struct { + handlerOpts options.HandlerOptions + tagOpts models.TagOptions + promRewriter *promRewriter +} + +type ingestField struct { + name []byte // to be stored in __name__; rest of tags stay constant for the Point + value float64 +} + +type ingestIterator struct { + // what is being iterated (comes from outside) + points []imodels.Point + tagOpts models.TagOptions + promRewriter *promRewriter + + // internal + pointIndex int + err xerrors.MultiError + + // following entries are within current point, and initialized + // when we go to the first entry in the current point + fields []*ingestField + nextFieldIndex int + tags models.Tags +} + +func (ii *ingestIterator) populateFields() bool { + point := ii.points[ii.pointIndex] + it := point.FieldIterator() + n := 0 + ii.fields = make([]*ingestField, 0, 10) + bname := make([]byte, 0, len(point.Name())+1) + bname = append(bname, point.Name()...) + bname = append(bname, byte('_')) + bnamelen := len(bname) + ii.promRewriter.rewriteMetric(bname) + for it.Next() { + var value float64 = 0 + n += 1 + switch it.Type() { + case imodels.Boolean: + v, err := it.BooleanValue() + if err != nil { + ii.err = ii.err.Add(err) + continue + } + if v { + value = 1.0 + } + case imodels.Integer: + v, err := it.IntegerValue() + if err != nil { + ii.err = ii.err.Add(err) + continue + } + value = float64(v) + case imodels.Unsigned: + v, err := it.UnsignedValue() + if err != nil { + ii.err = ii.err.Add(err) + continue + } + value = float64(v) + case imodels.Float: + v, err := it.FloatValue() + if err != nil { + ii.err = ii.err.Add(err) + continue + } + value = v + default: + // TBD if we should stick strings as + // tags or not; to prevent cardinality + // explosion, we drop them for now + continue + } + tail := it.FieldKey() + name := make([]byte, 0, bnamelen+len(tail)) + name = append(name, bname...) + name = append(name, tail...) + ii.promRewriter.rewriteMetricTail(name[bnamelen:]) + ii.fields = append(ii.fields, &ingestField{name: name, value: value}) + } + return n > 0 +} + +func (ii *ingestIterator) Next() bool { + for len(ii.points) > ii.pointIndex { + if ii.nextFieldIndex == 0 { + // Populate tags only if we have fields we care about + if ii.populateFields() { + point := ii.points[ii.pointIndex] + ptags := point.Tags() + tags := models.NewTags(len(ptags), ii.tagOpts) + for _, tag := range ptags { + name := make([]byte, len(tag.Key)) + copy(name, tag.Key) + ii.promRewriter.rewriteLabel(name) + tags = tags.AddTagWithoutNormalizing(models.Tag{Name: name, Value: tag.Value}) + } + // sanity check no duplicate Name's; + // after Normalize, they are sorted so + // can just check them sequentially + valid := true + if len(tags.Tags) > 0 { + // Dummy w/o value set; used for dupe check and value is rewrittein in-place in SetName later on + tags = tags.AddTag(models.Tag{Name: tags.Opts.MetricName()}) + name := tags.Tags[0].Name + for i := 1; i < len(tags.Tags); i++ { + iname := tags.Tags[i].Name + if bytes.Equal(name, iname) { + ii.err = ii.err.Add(fmt.Errorf("non-unique Prometheus label %v", string(iname))) + valid = false + break + } + name = iname + } + } + if !valid { + ii.pointIndex += 1 + continue + } + ii.tags = tags + } + } + ii.nextFieldIndex += 1 + if ii.nextFieldIndex > len(ii.fields) { + ii.pointIndex += 1 + ii.nextFieldIndex = 0 + continue + } + return true + } + return false +} + +func (ii *ingestIterator) Current() (models.Tags, ts.Datapoints, xtime.Unit, []byte) { + if ii.pointIndex < len(ii.points) && ii.nextFieldIndex > 0 && len(ii.fields) > (ii.nextFieldIndex-1) { + point := ii.points[ii.pointIndex] + field := ii.fields[ii.nextFieldIndex-1] + tags := ii.tags.SetName(field.name) + + return tags, []ts.Datapoint{ts.Datapoint{Timestamp: point.Time(), + Value: field.value}}, xtime.Nanosecond, nil + } + return models.EmptyTags(), nil, 0, nil +} + +func (ii *ingestIterator) Reset() error { + ii.pointIndex = 0 + ii.nextFieldIndex = 0 + ii.err = xerrors.NewMultiError() + return nil +} + +func (ii *ingestIterator) Error() error { + return ii.err.FinalError() +} + +func NewInfluxWriterHandler(options options.HandlerOptions) http.Handler { + return &ingestWriteHandler{handlerOpts: options, + tagOpts: options.TagOptions(), + promRewriter: newPromRewriter()} +} + +func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + bytes, err := ioutil.ReadAll(r.Body) + if err != nil { + xhttp.Error(w, err, http.StatusInternalServerError) + return + } + points, err := imodels.ParsePoints(bytes) + if err != nil { + xhttp.Error(w, err, http.StatusInternalServerError) + return + } + opts := ingest.WriteOptions{} + iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter} + batchErr := iwh.handlerOpts.DownsamplerAndWriter().WriteBatch(r.Context(), iter, opts) + if batchErr == nil { + w.WriteHeader(http.StatusNoContent) + return + } + var ( + errs = batchErr.Errors() + lastRegularErr string + lastBadRequestErr string + numRegular int + numBadRequest int + ) + for _, err := range errs { + switch { + case client.IsBadRequestError(err): + numBadRequest++ + lastBadRequestErr = err.Error() + case xerrors.IsInvalidParams(err): + numBadRequest++ + lastBadRequestErr = err.Error() + default: + numRegular++ + lastRegularErr = err.Error() + } + } + + var status int + switch { + case numBadRequest == len(errs): + status = http.StatusBadRequest + default: + status = http.StatusInternalServerError + } + + logger := logging.WithContext(r.Context(), iwh.handlerOpts.InstrumentOpts()) + logger.Error("write error", + zap.String("remoteAddr", r.RemoteAddr), + zap.Int("httpResponseStatusCode", status), + zap.Int("numRegularErrors", numRegular), + zap.Int("numBadRequestErrors", numBadRequest), + zap.String("lastRegularError", lastRegularErr), + zap.String("lastBadRequestErr", lastBadRequestErr)) + + var resultErr string + if lastRegularErr != "" { + resultErr = fmt.Sprintf("retryable_errors: count=%d, last=%s", + numRegular, lastRegularErr) + } + if lastBadRequestErr != "" { + var sep string + if lastRegularErr != "" { + sep = ", " + } + resultErr = fmt.Sprintf("%s%sbad_request_errors: count=%d, last=%s", + resultErr, sep, numBadRequest, lastBadRequestErr) + } + xhttp.Error(w, errors.New(resultErr), status) +} diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go new file mode 100644 index 0000000000..4314f6ff88 --- /dev/null +++ b/src/query/api/v1/handler/influxdb/write_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package influxdb + +import ( + "fmt" + "testing" + + imodels "github.com/influxdata/influxdb/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// human-readable string out of what the iterator produces; +// they are easiest for human to handle +func (self *ingestIterator) pop(t *testing.T) string { + if self.Next() { + tags, dp, _, _ := self.Current() + assert.Equal(t, 1, len(dp)) + + return fmt.Sprintf("%s %v %s", tags.String(), dp[0].Value, dp[0].Timestamp) + } + return "" +} + +func TestIngestIterator(t *testing.T) { + // test prometheus-illegal measure and label components (should be _s) + // as well as all value types influxdb supports + s := `?measure:!,?tag1:!=tval1,?tag2:!=tval2 ?key1:!=3,?key2:!=2i 1574838670386469800 +?measure:!,?tag1:!=tval1,?tag2:!=tval2 ?key3:!="string",?key4:!=T 1574838670386469801 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} + require.NoError(t, iter.Error()) + for _, line := range []string{ + "__name__: _measure:___key1:_, _tag1__: tval1, _tag2__: tval2 3 2019-11-27 07:11:10.3864698 +0000 UTC", + "__name__: _measure:___key2:_, _tag1__: tval1, _tag2__: tval2 2 2019-11-27 07:11:10.3864698 +0000 UTC", + "__name__: _measure:___key4:_, _tag1__: tval1, _tag2__: tval2 1 2019-11-27 07:11:10.386469801 +0000 UTC", + "", + "", + } { + assert.Equal(t, line, iter.pop(t)) + } + require.NoError(t, iter.Error()) +} + +func TestIngestIteratorDuplicateTag(t *testing.T) { + // Ensure that duplicate tag causes error and no metrics entries + s := `measure,lab!=2,lab?=3 key=2i 1574838670386469800 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} + require.NoError(t, iter.Error()) + for _, line := range []string{ + "", + } { + assert.Equal(t, line, iter.pop(t)) + } + require.EqualError(t, iter.Error(), "non-unique Prometheus label lab_") +} + +func TestIngestIteratorDuplicateNameTag(t *testing.T) { + // Ensure that duplicate name tag causes error and no metrics entries + s := `measure,__name__=x key=2i 1574838670386469800 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + iter := &ingestIterator{points: points, promRewriter: newPromRewriter()} + require.NoError(t, iter.Error()) + for _, line := range []string{ + "", + } { + assert.Equal(t, line, iter.pop(t)) + } + require.EqualError(t, iter.Error(), "non-unique Prometheus label __name__") +} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index da2e4f07f2..6a37244a32 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/database" "github.com/m3db/m3/src/query/api/v1/handler/graphite" + "github.com/m3db/m3/src/query/api/v1/handler/influxdb" m3json "github.com/m3db/m3/src/query/api/v1/handler/json" "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/openapi" @@ -169,6 +170,10 @@ func (h *Handler) RegisterRoutes() error { wrapped(native.NewPromReadInstantHandler(h.options)).ServeHTTP, ).Methods(native.PromReadInstantHTTPMethods...) + // InfluxDB write endpoint. + h.router.HandleFunc(influxdb.InfluxWriteURL, + wrapped(influxdb.NewInfluxWriterHandler(h.options)).ServeHTTP).Methods(influxdb.InfluxWriteHTTPMethod) + // Native M3 search and write endpoints. h.router.HandleFunc(handler.SearchURL, wrapped(handler.NewSearchHandler(h.options)).ServeHTTP,