Skip to content

Commit

Permalink
feat: receiver/prometheusremotewrite - Content-Type negotiation
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Oct 2, 2024
1 parent 5b0f81e commit 8554671
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-content-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement Content-Type negotiation for Prometheus Remote Write requests

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35565]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still doesn't do anything. It's just a placeholder for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
58 changes: 54 additions & 4 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

promConfig "github.com/prometheus/prometheus/config"

Check failure on line 14 in receiver/prometheusremotewritereceiver/receiver.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-2)

could not import github.com/prometheus/prometheus/config (invalid package name: "")
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap/zapcore"
)

func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
Expand All @@ -31,8 +34,8 @@ type prometheusRemoteWriteReceiver struct {
settings receiver.Settings
nextConsumer consumer.Metrics

config *Config
server *http.Server
config *Config
server *http.Server
}

func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -64,6 +67,53 @@ func (prw *prometheusRemoteWriteReceiver) Shutdown(ctx context.Context) error {
return prw.server.Shutdown(ctx)
}

func (prw *prometheusRemoteWriteReceiver) handlePRW(_ http.ResponseWriter, _ *http.Request) {
fmt.Println("handleWrite called")
func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req *http.Request) {
contentType := req.Header.Get("Content-Type")
if contentType == "" {
prw.settings.Logger.Warn("message received without Content-Type header, rejecting")
http.Error(w, "Content-Type header is required", http.StatusUnsupportedMediaType)
return
}

msgType, err := prw.parseProto(contentType)
if err != nil {
prw.settings.Logger.Warn("Error decoding remote-write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err})
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}
if msgType != promConfig.RemoteWriteProtoMsgV2 {
prw.settings.Logger.Warn("message received with unsupported proto version, rejecting")
http.Error(w, "Unsupported proto version", http.StatusUnsupportedMediaType)
return
}
}

// parseProto parses the content-type header and returns the version of the remote-write protocol.
// We can't expect that senders of remote-write v1 will add the "proto=" parameter since it was not
// a requirement in v1. So, if the parameter is not found, we assume v1.
func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promConfig.RemoteWriteProtoMsg, error) {
contentType = strings.TrimSpace(contentType)

parts := strings.Split(contentType, ";")
if parts[0] != "application/x-protobuf" {
return "", fmt.Errorf("expected %q as the first (media) part, got %v content-type", "application/x-protobuf", contentType)
}

for _, part := range parts[1:] {
parameter := strings.Split(part, "=")
if len(parameter) != 2 {
return "", fmt.Errorf("as per https://www.rfc-editor.org/rfc/rfc9110#parameter expected parameters to be key-values, got %v in %v content-type", part, contentType)
}

if strings.TrimSpace(parameter[0]) == "proto" {
ret := promConfig.RemoteWriteProtoMsg(parameter[1])
if err := ret.Validate(); err != nil {
return "", fmt.Errorf("got %v content type; %w", contentType, err)
}
return ret, nil
}
}

// No "proto=" parameter found, assume v1.
return promConfig.RemoteWriteProtoMsgV1, nil
}
77 changes: 77 additions & 0 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver"

import (
"context"
"fmt"
"net/http"
"testing"

promConfig "github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestHandlePRWContentTypeNegotiation(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

prwReceiver, err := factory.CreateMetricsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

assert.NoError(t, prwReceiver.Start(ctx, componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, prwReceiver.Shutdown(ctx), "Must not error shutting down")
})

for _, tc := range []struct {
name string
contentType string
extectedCode int
}{
{
name: "no content type",
contentType: "",
extectedCode: http.StatusUnsupportedMediaType,
},
{
name: "unsupported content type",
contentType: "application/json",
extectedCode: http.StatusUnsupportedMediaType,
},
{
name: "x-protobuf/no proto parameter",
contentType: "application/x-protobuf",
extectedCode: http.StatusUnsupportedMediaType,
},
{
name: "x-protobuf/v1 proto parameter",
contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promConfig.RemoteWriteProtoMsgV1),
extectedCode: http.StatusUnsupportedMediaType,
},
{
name: "x-protobuf/v2 proto parameter",
contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promConfig.RemoteWriteProtoMsgV2),
extectedCode: http.StatusOK,
},
} {
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", nil)
assert.NoError(t, err)

req.Header.Set("Content-Type", tc.contentType)
resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)

assert.Equal(t, tc.extectedCode, resp.StatusCode)
})
}
}

0 comments on commit 8554671

Please sign in to comment.