From 85fe1ba552afd95f42dd6ac9251bbe0136e10c15 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Sat, 2 Nov 2024 12:49:46 -0300 Subject: [PATCH] feat: receiver/prometheusremotewrite - Implement body unmarshaling (#35624) #### Description This PR builds on top of #35535 and #35565. We're now making sure we're able to unmarshal a remote write request, while also exercising the decompression that is made by OTel's confighttp. Signed-off-by: Arthur Silva Sens --- .chloggen/prwreceiver-bodyunmarshal.yaml | 27 ++++++++++++++ receiver/prometheusremotewritereceiver/go.mod | 9 +++-- receiver/prometheusremotewritereceiver/go.sum | 12 +++++++ .../prometheusremotewritereceiver/receiver.go | 35 ++++++++++++++++++- .../receiver_test.go | 29 +++++++++++++-- 5 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 .chloggen/prwreceiver-bodyunmarshal.yaml diff --git a/.chloggen/prwreceiver-bodyunmarshal.yaml b/.chloggen/prwreceiver-bodyunmarshal.yaml new file mode 100644 index 000000000000..e7f2b784eeb9 --- /dev/null +++ b/.chloggen/prwreceiver-bodyunmarshal.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement body unmarshaling for Prometheus Remote Write requests + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35624] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't do anything. It's just a placeholder for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 520f9d3e0e75..72f0aa1a4d76 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 + github.com/golang/snappy v0.0.4 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.112.0 @@ -11,6 +13,7 @@ require ( go.opentelemetry.io/collector/confmap v1.18.0 go.opentelemetry.io/collector/consumer v0.112.0 go.opentelemetry.io/collector/consumer/consumertest v0.112.0 + go.opentelemetry.io/collector/pdata v1.18.0 go.opentelemetry.io/collector/receiver v0.112.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -26,6 +29,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dennwc/varint v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -33,9 +37,7 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -72,16 +74,17 @@ require ( go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 // indirect go.opentelemetry.io/collector/extension v0.112.0 // indirect go.opentelemetry.io/collector/extension/auth v0.112.0 // indirect - go.opentelemetry.io/collector/pdata v1.18.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect go.opentelemetry.io/collector/pipeline v0.112.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect + go.opentelemetry.io/collector/semconv v0.112.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/metric v1.31.0 // indirect go.opentelemetry.io/otel/sdk v1.31.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect go.opentelemetry.io/otel/trace v1.31.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index 4d821ba99720..09b719743e85 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -61,6 +61,8 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI= github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= +github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -80,6 +82,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= +github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/digitalocean/godo v1.118.0 h1:lkzGFQmACrVCp7UqH1sAi4JK/PWwlc5aaxubgorKmC4= github.com/digitalocean/godo v1.118.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= @@ -90,6 +94,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= +github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -100,6 +106,8 @@ github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnv github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -318,6 +326,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= @@ -449,6 +459,8 @@ go.opentelemetry.io/collector/receiver v0.112.0 h1:gdTBDOPGKMZlZghtN5A7ZLNlNwCHW go.opentelemetry.io/collector/receiver v0.112.0/go.mod h1:3QmfSUiyFzRTnHUqF8fyEvQpU5q/xuwS43jGt8JXEEA= go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 h1:SShkZsWRsFss3iWZa9JwMC7h4gD5RbWDhUcz1/9dXSs= go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0/go.mod h1:615smszDXiz4YWwXslxlAjX7FzOVDU7Bk6xARFk+zpk= +go.opentelemetry.io/collector/semconv v0.112.0 h1:JPQyvZhlNLVSuVI+FScONaiFygB7h7NTZceUEKIQUEc= +go.opentelemetry.io/collector/semconv v0.112.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index efb60b51b795..61195af2fa3c 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -7,14 +7,19 @@ import ( "context" "errors" "fmt" + "io" "net/http" "strings" "time" + "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + promremote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" ) @@ -90,7 +95,28 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req * // After parsing the content-type header, the next step would be to handle content-encoding. // Luckly confighttp's Server has middleware that already decompress the request body for us. - // The next step in follow up PRs would be to decode the request body. + body, err := io.ReadAll(req.Body) + if err != nil { + prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var prw2Req writev2.Request + if err = proto.Unmarshal(body, &prw2Req); err != nil { + prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + _, stats, err := prw.translateV2(req.Context(), &prw2Req) + stats.SetHeaders(w) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) // Following instructions at https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples + return + } + + w.WriteHeader(http.StatusNoContent) } // parseProto parses the content-type header and returns the version of the remote-write protocol. @@ -122,3 +148,10 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco // No "proto=" parameter found, assume v1. return promconfig.RemoteWriteProtoMsgV1, nil } + +// translateV2 translates a v2 remote-write request into OTLP metrics. +// For now translateV2 is not implemented and returns an empty metrics. +// nolint +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { + return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil +} diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index cccc307bcbe3..8c5c9e659cfc 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -4,19 +4,25 @@ package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver" import ( + "bytes" "context" "fmt" "net/http" "testing" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" promconfig "github.com/prometheus/prometheus/config" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" ) -func TestHandlePRWContentTypeNegotiation(t *testing.T) { +func setupServer(t *testing.T) { + t.Helper() + factory := NewFactory() cfg := factory.CreateDefaultConfig() @@ -31,6 +37,10 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { t.Cleanup(func() { assert.NoError(t, prwReceiver.Shutdown(ctx), "Must not error shutting down") }) +} + +func TestHandlePRWContentTypeNegotiation(t *testing.T) { + setupServer(t) for _, tc := range []struct { name string @@ -60,18 +70,31 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { { name: "x-protobuf/v2 proto parameter", contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), - extectedCode: http.StatusOK, + extectedCode: http.StatusNoContent, }, } { t.Run(tc.name, func(t *testing.T) { - req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", nil) + body := writev2.Request{} + pBuf := proto.NewBuffer(nil) + err := pBuf.Marshal(&body) + assert.NoError(t, err) + + var compressedBody []byte + snappy.Encode(compressedBody, pBuf.Bytes()) + req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", bytes.NewBuffer(compressedBody)) assert.NoError(t, err) req.Header.Set("Content-Type", tc.contentType) + req.Header.Set("Content-Encoding", "snappy") resp, err := http.DefaultClient.Do(req) assert.NoError(t, err) assert.Equal(t, tc.extectedCode, resp.StatusCode) + if tc.extectedCode == http.StatusNoContent { // We went until the end + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Samples-Written")) + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Histograms-Written")) + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Exemplars-Written")) + } }) } }