diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index 4496033add..06f3fa7cea 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -23,6 +23,7 @@ package influxdb import ( "bytes" "compress/gzip" + "encoding/json" "errors" "fmt" "io" @@ -32,6 +33,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/api/v1/route" "github.com/m3db/m3/src/query/models" @@ -42,6 +44,7 @@ import ( "go.uber.org/zap" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/headers" xhttp "github.com/m3db/m3/src/x/net/http" xtime "github.com/m3db/m3/src/x/time" ) @@ -76,6 +79,7 @@ type ingestIterator struct { points []imodels.Point tagOpts models.TagOptions promRewriter *promRewriter + writeTags models.Tags // internal pointIndex int @@ -163,6 +167,10 @@ func (ii *ingestIterator) Next() bool { ii.promRewriter.rewriteLabel(name) tags = tags.AddTagWithoutNormalizing(models.Tag{Name: name, Value: tag.Value}) } + // Add or update tags given in Map-Tags-JSON header + for _, t := range ii.writeTags.Tags { + tags = tags.AddOrUpdateTag(t) + } // sanity check no duplicate Name's; // after Normalize, they are sorted so // can just check them sequentially @@ -279,9 +287,11 @@ func (ii *ingestIterator) CurrentMetadata() ts.Metadata { // NewInfluxWriterHandler returns a new influx write handler. func NewInfluxWriterHandler(options options.HandlerOptions) http.Handler { - return &ingestWriteHandler{handlerOpts: options, + return &ingestWriteHandler{ + handlerOpts: options, tagOpts: options.TagOptions(), - promRewriter: newPromRewriter()} + promRewriter: newPromRewriter(), + } } func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -324,8 +334,49 @@ func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) xhttp.WriteError(w, err) return } + + // Apply tags from "M3-Map-Tags-JSON" header + writeTags := models.NewTags(0, iwh.tagOpts) + var mapTagsOpts handleroptions.MapTagsOptions + if mapStr := r.Header.Get(headers.MapTagsByJSONHeader); mapStr != "" { + if err := json.Unmarshal([]byte(mapStr), &mapTagsOpts); err != nil { + xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest)) + return + } + for _, mapper := range mapTagsOpts.TagMappers { + if err := mapper.Validate(); err != nil { + xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest)) + return + } + + if op := mapper.Write; !op.IsEmpty() { + tag := []byte(op.Tag) + value := []byte(op.Value) + writeTags = writeTags.AddTag(models.Tag{Name: tag, Value: value}) + } + + if op := mapper.Drop; !op.IsEmpty() { + err := errors.New("'drop' operation is not yet supported") + xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest)) + return + } + + if op := mapper.DropWithValue; !op.IsEmpty() { + err := errors.New("'dropWithValue' operation is not yet supported") + xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest)) + return + } + + if op := mapper.Replace; !op.IsEmpty() { + err := errors.New("'replace' operation is not yet supported") + xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest)) + return + } + } + } + opts := ingest.WriteOptions{} - iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter} + iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter, writeTags: writeTags} batchErr := iwh.handlerOpts.DownsamplerAndWriter().WriteBatch(r.Context(), iter, opts) if batchErr == nil { w.WriteHeader(http.StatusNoContent) diff --git a/src/query/api/v1/handler/influxdb/write_test.go b/src/query/api/v1/handler/influxdb/write_test.go index 5a9f9523bf..3b74b5b246 100644 --- a/src/query/api/v1/handler/influxdb/write_test.go +++ b/src/query/api/v1/handler/influxdb/write_test.go @@ -35,6 +35,7 @@ import ( imodels "github.com/influxdata/influxdb/models" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/models" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/assert" @@ -128,6 +129,31 @@ func TestIngestIteratorIssue2125(t *testing.T) { assert.Equal(t, value2.Tags.String(), "__name__: measure_k2, lab: foo") } +func TestIngestIteratorWriteTags(t *testing.T) { + s := `measure,lab=foo k1=1,k2=2 1574838670386469800 +` + points, err := imodels.ParsePoints([]byte(s)) + require.NoError(t, err) + + writeTags := models.EmptyTags(). + AddTag(models.Tag{Name: []byte("lab"), Value: []byte("bar")}). + AddTag(models.Tag{Name: []byte("new"), Value: []byte("tag")}) + + iter := &ingestIterator{points: points, promRewriter: newPromRewriter(), writeTags: writeTags} + + assert.True(t, iter.Next()) + value1 := iter.Current() + require.NoError(t, iter.Error()) + + assert.Equal(t, value1.Tags.String(), "__name__: measure_k1, lab: bar, new: tag") + + assert.True(t, iter.Next()) + value2 := iter.Current() + require.NoError(t, iter.Error()) + + assert.Equal(t, value2.Tags.String(), "__name__: measure_k2, lab: bar, new: tag") +} + func TestDetermineTimeUnit(t *testing.T) { now := time.Now() zerot := now.Add(time.Duration(-now.UnixNano() % int64(time.Second))) @@ -160,11 +186,21 @@ func makeInfluxDBLineProtocolMessage(t *testing.T, isGzipped bool, time time.Tim } func TestInfluxDBWrite(t *testing.T) { + type checkWriteBatchFunc func(context.Context, *ingestIterator, ingest.WriteOptions) interface{} + + // small helper for tests where we dont want to check the batch + dontCheckWriteBatch := checkWriteBatchFunc( + func(context.Context, *ingestIterator, ingest.WriteOptions) interface{} { + return nil + }, + ) + tests := []struct { - name string - expectedStatus int - requestHeaders map[string]string - isGzipped bool + name string + expectedStatus int + requestHeaders map[string]string + isGzipped bool + checkWriteBatch checkWriteBatchFunc }{ { name: "Gzip Encoded Message", @@ -173,6 +209,7 @@ func TestInfluxDBWrite(t *testing.T) { requestHeaders: map[string]string{ "Content-Encoding": "gzip", }, + checkWriteBatch: dontCheckWriteBatch, }, { name: "Wrong Content Encoding", @@ -181,12 +218,29 @@ func TestInfluxDBWrite(t *testing.T) { requestHeaders: map[string]string{ "Content-Encoding": "gzip", }, + checkWriteBatch: dontCheckWriteBatch, + }, + { + name: "Plaintext Message", + expectedStatus: http.StatusNoContent, + isGzipped: false, + requestHeaders: map[string]string{}, + checkWriteBatch: dontCheckWriteBatch, }, { - name: "Plaintext Message", + name: "Map-Tags-JSON Add Tag", expectedStatus: http.StatusNoContent, isGzipped: false, - requestHeaders: map[string]string{}, + requestHeaders: map[string]string{ + "M3-Map-Tags-JSON": `{"tagMappers": [{"write": {"tag": "t", "value": "v"}}]}`, + }, + checkWriteBatch: checkWriteBatchFunc( + func(_ context.Context, iter *ingestIterator, opts ingest.WriteOptions) interface{} { + _, found := iter.writeTags.Get([]byte("t")) + require.True(t, found, "tag t will be overwritten") + return nil + }, + ), }, } @@ -201,7 +255,9 @@ func TestInfluxDBWrite(t *testing.T) { if testCase.expectedStatus != http.StatusBadRequest { mockDownsamplerAndWriter. EXPECT(). - WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()) + WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(testCase.checkWriteBatch). + Times(1) } opts := makeOptions(mockDownsamplerAndWriter)