Skip to content

Commit

Permalink
feat: receiver/prometheusremotewrite - Implement body unmarshaling (#…
Browse files Browse the repository at this point in the history
…35624)

#### Description
This PR builds on top of #35535 and #35565. We're now making sure we're
able to unmarshal a remote write request, while also exercising the
decompression that is made by OTel's confighttp.

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens authored Nov 2, 2024
1 parent 6dc39a8 commit 85fe1ba
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 7 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-bodyunmarshal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement body unmarshaling for Prometheus Remote Write requests

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35624]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still doesn't do anything. It's just a placeholder for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
9 changes: 6 additions & 3 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
go 1.22.0

require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/prometheus/prometheus v0.54.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
Expand All @@ -11,6 +13,7 @@ require (
go.opentelemetry.io/collector/confmap v1.18.0
go.opentelemetry.io/collector/consumer v0.112.0
go.opentelemetry.io/collector/consumer/consumertest v0.112.0
go.opentelemetry.io/collector/pdata v1.18.0
go.opentelemetry.io/collector/receiver v0.112.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
Expand All @@ -26,16 +29,15 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -72,16 +74,17 @@ require (
go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 // indirect
go.opentelemetry.io/collector/extension v0.112.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.112.0 // indirect
go.opentelemetry.io/collector/pdata v1.18.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect
go.opentelemetry.io/collector/semconv v0.112.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions receiver/prometheusremotewritereceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/gogo/protobuf/proto"
promconfig "github.com/prometheus/prometheus/config"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
promremote "github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -90,7 +95,28 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req *
// After parsing the content-type header, the next step would be to handle content-encoding.
// Luckly confighttp's Server has middleware that already decompress the request body for us.

// The next step in follow up PRs would be to decode the request body.
body, err := io.ReadAll(req.Body)
if err != nil {
prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err})
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var prw2Req writev2.Request
if err = proto.Unmarshal(body, &prw2Req); err != nil {
prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err})
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

_, stats, err := prw.translateV2(req.Context(), &prw2Req)
stats.SetHeaders(w)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) // Following instructions at https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples
return
}

w.WriteHeader(http.StatusNoContent)
}

// parseProto parses the content-type header and returns the version of the remote-write protocol.
Expand Down Expand Up @@ -122,3 +148,10 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
// No "proto=" parameter found, assume v1.
return promconfig.RemoteWriteProtoMsgV1, nil
}

// translateV2 translates a v2 remote-write request into OTLP metrics.
// For now translateV2 is not implemented and returns an empty metrics.
// nolint
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
}
29 changes: 26 additions & 3 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver"

import (
"bytes"
"context"
"fmt"
"net/http"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
promconfig "github.com/prometheus/prometheus/config"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestHandlePRWContentTypeNegotiation(t *testing.T) {
func setupServer(t *testing.T) {
t.Helper()

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

Expand All @@ -31,6 +37,10 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, prwReceiver.Shutdown(ctx), "Must not error shutting down")
})
}

func TestHandlePRWContentTypeNegotiation(t *testing.T) {
setupServer(t)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -60,18 +70,31 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
{
name: "x-protobuf/v2 proto parameter",
contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
extectedCode: http.StatusOK,
extectedCode: http.StatusNoContent,
},
} {
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", nil)
body := writev2.Request{}
pBuf := proto.NewBuffer(nil)
err := pBuf.Marshal(&body)
assert.NoError(t, err)

var compressedBody []byte
snappy.Encode(compressedBody, pBuf.Bytes())
req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", bytes.NewBuffer(compressedBody))
assert.NoError(t, err)

req.Header.Set("Content-Type", tc.contentType)
req.Header.Set("Content-Encoding", "snappy")
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)

assert.Equal(t, tc.extectedCode, resp.StatusCode)
if tc.extectedCode == http.StatusNoContent { // We went until the end
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Samples-Written"))
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
}
})
}
}

0 comments on commit 85fe1ba

Please sign in to comment.