Skip to content

Commit

Permalink
[query] influxdb write improvements (#3373)
Browse files Browse the repository at this point in the history
* [query] add gzip support to influxdb write endpoint

* [query] influxdb write handles empty request body

* [query] influxdb write supports setting timestamp precision
  • Loading branch information
tsharju authored Oct 4, 2021
1 parent 6b1a24d commit 7672c09
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 2 deletions.
35 changes: 33 additions & 2 deletions src/query/api/v1/handler/influxdb/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ package influxdb

import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
Expand Down Expand Up @@ -283,12 +285,41 @@ func NewInfluxWriterHandler(options options.HandlerOptions) http.Handler {
}

func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
bytes, err := ioutil.ReadAll(r.Body)
if r.Body == http.NoBody {
xhttp.WriteError(w, xhttp.NewError(errors.New("empty request body"), http.StatusBadRequest))
return
}

var bytes []byte
var err error
var reader io.ReadCloser

if r.Header.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(r.Body)
if err != nil {
xhttp.WriteError(w, xhttp.NewError(err, http.StatusBadRequest))
return
}
} else {
reader = r.Body
}

bytes, err = ioutil.ReadAll(reader)
if err != nil {
xhttp.WriteError(w, err)
return
}
points, err := imodels.ParsePoints(bytes)

err = reader.Close()
if err != nil {
xhttp.WriteError(w, err)
return
}

// InfluxDB line protocol v1.8 supports following precision values ns, u, ms, s, m and h
// If precision is not given, nanosecond precision is assumed
precision := r.URL.Query().Get("precision")
points, err := imodels.ParsePointsWithPrecision(bytes, time.Now().UTC(), precision)
if err != nil {
xhttp.WriteError(w, err)
return
Expand Down
165 changes: 165 additions & 0 deletions src/query/api/v1/handler/influxdb/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@
package influxdb

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/golang/mock/gomock"
imodels "github.com/influxdata/influxdb/models"
"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/query/api/v1/options"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -125,5 +135,160 @@ func TestDetermineTimeUnit(t *testing.T) {
assert.Equal(t, determineTimeUnit(zerot.Add(2*time.Millisecond)), xtime.Millisecond)
assert.Equal(t, determineTimeUnit(zerot.Add(3*time.Microsecond)), xtime.Microsecond)
assert.Equal(t, determineTimeUnit(zerot.Add(4*time.Nanosecond)), xtime.Nanosecond)
}

func makeOptions(ds ingest.DownsamplerAndWriter) options.HandlerOptions {
return options.EmptyHandlerOptions().
SetDownsamplerAndWriter(ds)
}

func makeInfluxDBLineProtocolMessage(t *testing.T, isGzipped bool, time time.Time, precision time.Duration) io.Reader {
t.Helper()
ts := fmt.Sprintf("%d", time.UnixNano()/precision.Nanoseconds())
line := fmt.Sprintf("weather,location=us-midwest,season=summer temperature=82 %s", ts)
var msg bytes.Buffer
if isGzipped {
gz := gzip.NewWriter(&msg)
_, err := gz.Write([]byte(line))
require.NoError(t, err)
err = gz.Close()
require.NoError(t, err)
} else {
msg.WriteString(line)
}
return bytes.NewReader(msg.Bytes())
}

func TestInfluxDBWrite(t *testing.T) {
tests := []struct {
name string
expectedStatus int
requestHeaders map[string]string
isGzipped bool
}{
{
name: "Gzip Encoded Message",
expectedStatus: http.StatusNoContent,
isGzipped: true,
requestHeaders: map[string]string{
"Content-Encoding": "gzip",
},
},
{
name: "Wrong Content Encoding",
expectedStatus: http.StatusBadRequest,
isGzipped: false,
requestHeaders: map[string]string{
"Content-Encoding": "gzip",
},
},
{
name: "Plaintext Message",
expectedStatus: http.StatusNoContent,
isGzipped: false,
requestHeaders: map[string]string{},
},
}

ctrl := xtest.NewController(t)
defer ctrl.Finish()

for _, testCase := range tests {
testCase := testCase
t.Run(testCase.name, func(tt *testing.T) {
mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl)
// For error reponses we don't expect WriteBatch to be called
if testCase.expectedStatus != http.StatusBadRequest {
mockDownsamplerAndWriter.
EXPECT().
WriteBatch(gomock.Any(), gomock.Any(), gomock.Any())
}

opts := makeOptions(mockDownsamplerAndWriter)
handler := NewInfluxWriterHandler(opts)
msg := makeInfluxDBLineProtocolMessage(t, testCase.isGzipped, time.Now(), time.Nanosecond)
req := httptest.NewRequest(InfluxWriteHTTPMethod, InfluxWriteURL, msg)
for header, value := range testCase.requestHeaders {
req.Header.Set(header, value)
}
writer := httptest.NewRecorder()
handler.ServeHTTP(writer, req)
resp := writer.Result()
require.Equal(t, testCase.expectedStatus, resp.StatusCode)
resp.Body.Close()
})
}
}

func TestInfluxDBWritePrecision(t *testing.T) {
tests := []struct {
name string
expectedStatus int
precision string
}{
{
name: "No precision",
expectedStatus: http.StatusNoContent,
precision: "",
},
{
name: "Millisecond precision",
expectedStatus: http.StatusNoContent,
precision: "ms",
},
{
name: "Second precision",
expectedStatus: http.StatusNoContent,
precision: "s",
},
}

ctrl := xtest.NewController(t)
defer ctrl.Finish()

for _, testCase := range tests {
testCase := testCase
t.Run(testCase.name, func(tt *testing.T) {
var precision time.Duration
switch testCase.precision {
case "":
precision = time.Nanosecond
case "ms":
precision = time.Millisecond
case "s":
precision = time.Second
}

now := time.Now()

mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl)
mockDownsamplerAndWriter.
EXPECT().
WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(
_ context.Context,
iter *ingestIterator,
opts ingest.WriteOptions,
) interface{} {
require.Equal(tt, now.Truncate(precision).UnixNano(), iter.points[0].UnixNano(), "correct precision")
return nil
}).Times(1)

opts := makeOptions(mockDownsamplerAndWriter)
handler := NewInfluxWriterHandler(opts)

msg := makeInfluxDBLineProtocolMessage(t, false, now, precision)
var url string
if testCase.precision == "" {
url = InfluxWriteURL
} else {
url = InfluxWriteURL + fmt.Sprintf("?precision=%s", testCase.precision)
}
req := httptest.NewRequest(InfluxWriteHTTPMethod, url, msg)
writer := httptest.NewRecorder()
handler.ServeHTTP(writer, req)
resp := writer.Result()
require.Equal(t, testCase.expectedStatus, resp.StatusCode)
resp.Body.Close()
})
}
}
15 changes: 15 additions & 0 deletions src/query/api/v1/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/graphite"
"github.com/m3db/m3/src/query/api/v1/handler/influxdb"
m3json "github.com/m3db/m3/src/query/api/v1/handler/json"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/native"
Expand Down Expand Up @@ -236,6 +237,20 @@ func TestJSONWritePost(t *testing.T) {
require.Equal(t, http.StatusBadRequest, res.Code, "Empty request")
}

func TestInfluxDBWritePost(t *testing.T) {
req := httptest.NewRequest(influxdb.InfluxWriteHTTPMethod, influxdb.InfluxWriteURL, nil)
res := httptest.NewRecorder()
ctrl := gomock.NewController(t)
storage, _ := m3.NewStorageAndSession(t, ctrl)

h, err := setupHandler(storage)
require.NoError(t, err, "unable to setup handler")
err = h.RegisterRoutes()
require.NoError(t, err)
h.Router().ServeHTTP(res, req)
require.Equal(t, http.StatusBadRequest, res.Code, "Empty request")
}

func TestRoutesGet(t *testing.T) {
req := httptest.NewRequest("GET", routesURL, nil)
res := httptest.NewRecorder()
Expand Down

0 comments on commit 7672c09

Please sign in to comment.