diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md new file mode 100644 index 00000000000..a481618c1a0 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/README.md @@ -0,0 +1,49 @@ +This Exporter sends metrics data in Prometheus TimeSeries format to any Prometheus Remote Write compatible backend, such as Cortex. + +Non-cumulative monotonic, histogram, and summary OTLP metrics are dropped by this exporter. + +The following settings are required: + +- `endpoint`: protocol:host:port to which the exporter is going to send traces or metrics, using +the HTTP/HTTPS protocol. + +- `namespace`: suffix to metric name attached to each metric. + +The following settings can be optionally configured: +- `headers`: additional headers attached to each HTTP request. `X-Prometheus-Remote-Write-Version` cannot be set by users +and is attached to each request. +- `insecure` (default = false): whether to enable client transport security for + the exporter's connection. +- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should + only be used if `insecure` is set to true. +- `cert_file`: path to the TLS cert to use for TLS required connections. Should + only be used if `insecure` is set to true. +- `key_file`: path to the TLS key to use for TLS required connections. Should + only be used if `insecure` is set to true. +- `timeout` (default = 5s): How long to wait until the connection is close. +- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. +- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. + +Example: + +```yaml +exporters: +prometheusremotewrite: + endpoint: "http://some.url:9411/api/prom/push" +``` +The full list of settings exposed for this exporter are documented [here](./config.go) +with detailed sample configurations [here](./testdata/config.yaml). + +_Here is a link to the overall project [design](https://github.com/open-telemetry/opentelemetry-collector/pull/1464)_ + +File structure: + +- `cortex.go`: exporter implementation. Converts and sends OTLP metrics + +- `helper.go`: helper functions that cortex.go uses. Performs tasks such as sanitizing label and generating signature string + +- `config.go`: configuration struct of the exporter + +- `factory.go`: initialization methods for creating default configuration and the exporter + +Feature in development: derive Prometheus `job` or `instance` label from Resource, or allow users to configure which Resource attributes needs to be added as metric label_ diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go new file mode 100644 index 00000000000..80303230f69 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/config.go @@ -0,0 +1,43 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This package defines the standard and necessary parameters of the exporter config struct. +// The yaml file for the entire collector pipelne must include a section underneath +// `exporters` titled `prometheusremotewrite(/#)`. Example in testdata/config.yaml. + +package prometheusremotewriteexporter + +import ( + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +// Config defines configuration for Remote Write exporter. +type Config struct { + // squash ensures fields are correctly decoded in embedded struct. + configmodels.ExporterSettings `mapstructure:",squash"` + exporterhelper.TimeoutSettings `mapstructure:",squash"` + + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` + + // Namespace if set, exports metrics under the provided value.*/ + Namespace string `mapstructure:"namespace"` + + // Optional headers configuration for authorization and security/extra metadata + Headers map[string]string `mapstructure:"headers"` + + HTTPClientSettings confighttp.HTTPClientSettings `mapstructure:"http_setting"` +} diff --git a/exporter/prometheusremotewriteexporter/config_test.go b/exporter/prometheusremotewriteexporter/config_test.go new file mode 100644 index 00000000000..33c1a226b45 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/config_test.go @@ -0,0 +1,92 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewriteexporter + +import ( + "path" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +// TestLoadConfig checks whether yaml configuration can be loaded correctly +func TestLoadConfig(t *testing.T) { + factories, err := componenttest.ExampleComponents() + assert.NoError(t, err) + + factory := NewFactory() + factories.Exporters[typeStr] = factory + cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + // From the default configurations -- checks if a correct exporter is instantiated + e0 := cfg.Exporters["prometheusremotewrite"] + assert.Equal(t, e0, factory.CreateDefaultConfig()) + + // checks if the correct Config struct can be instantiated from testdata/config.yaml + e1 := cfg.Exporters["prometheusremotewrite/2"] + assert.Equal(t, e1, + &Config{ + ExporterSettings: configmodels.ExporterSettings{ + NameVal: "prometheusremotewrite/2", + TypeVal: "prometheusremotewrite", + }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + Namespace: "test-space", + + Headers: map[string]string{ + "prometheus-remote-write-version": "0.1.0", + "tenant-id": "234"}, + + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "localhost:8888", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "/var/lib/mycert.pem", //This is subject to change, but currently I have no idea what else to put here lol + }, + Insecure: false, + }, + ReadBufferSize: 0, + + WriteBufferSize: 512 * 1024, + + Timeout: 5 * time.Second, + }, + }) +} diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go new file mode 100644 index 00000000000..2d8d4f289cc --- /dev/null +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -0,0 +1,367 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file defines the prwExporter struct and its class functions. +// The exporter is in charge of receiving data upstream from the Collector pipeline, +// Converting the data to Prometheus TimeSeries data, and then sending to the configurable +// Remote Write HTTP Endpoint. + +package prometheusremotewriteexporter + +import ( + "bufio" + "bytes" + "context" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/prompb" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +// TODO: get default labels such as job or instance from Resource + +// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint +type prwExporter struct { + namespace string + endpointURL *url.URL + client *http.Client + headers map[string]string + wg *sync.WaitGroup + closeChan chan struct{} +} + +// newPrwExporter initializes a new prwExporter instance and sets fields accordingly. +// client parameter cannot be nil. +func newPrwExporter(namespace string, endpoint string, client *http.Client) (*prwExporter, error) { + + if client == nil { + return nil, errors.Errorf("http client cannot be nil") + } + + endpointURL, err := url.ParseRequestURI(endpoint) + if err != nil { + return nil, errors.Errorf("invalid endpoint") + } + + return &prwExporter{ + namespace: namespace, + endpointURL: endpointURL, + client: client, + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + }, nil +} + +// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations +// to finish before returning +func (prwe *prwExporter) shutdown(context.Context) error { + close(prwe.closeChan) + prwe.wg.Wait() + return nil +} + +// pushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of +// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally +// exports the map. +func (prwe *prwExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { + prwe.wg.Add(1) + defer prwe.wg.Done() + select { + case <-prwe.closeChan: + return pdatautil.MetricCount(md), errors.Errorf("shutdown has been called") + default: + tsMap := map[string]*prompb.TimeSeries{} + dropped := 0 + errs := []string{} + + resourceMetrics := data.MetricDataToOtlp(pdatautil.MetricsToInternalMetrics(md)) + for _, r := range resourceMetrics { + // TODO: add resource attributes as labels + for _, instMetrics := range r.InstrumentationLibraryMetrics { + // TODO: add instrumentation library information as labels + for _, metric := range instMetrics.Metrics { + // check for valid type and temporality combination + ok := validateMetrics(metric.MetricDescriptor) + if !ok { + dropped++ + errs = append(errs, "invalid temporality and type combination") + continue + } + // handle individual metric based on type + switch metric.GetMetricDescriptor().GetType() { + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64, + otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if err := prwe.handleScalarMetric(tsMap, metric); err != nil { + errs = append(errs, err.Error()) + } + case otlp.MetricDescriptor_HISTOGRAM: + if err := prwe.handleHistogramMetric(tsMap, metric); err != nil { + errs = append(errs, err.Error()) + } + case otlp.MetricDescriptor_SUMMARY: + if err := prwe.handleSummaryMetric(tsMap, metric); err != nil { + errs = append(errs, err.Error()) + } + default: + dropped++ + errs = append(errs, "invalid type") + continue + } + } + } + } + + if err := prwe.export(ctx, tsMap); err != nil { + return pdatautil.MetricCount(md), err + } + + if dropped != 0 { + return dropped, errors.Errorf(strings.Join(errs, "\n")) + } + + return 0, nil + } +} + +// The following methods are called in the pushMetrics() method. + +// handleScalarMetric processes data points in a single OTLP scalar metric by adding each point as a Sample into +// its corresponding TimeSeries in tsMap. +// tsMap and metric cannot be nil, and metric must have a non-nil descriptor +func (prwe *prwExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + // add check for nil + + mType := metric.MetricDescriptor.Type + + switch mType { + // int points + case otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_INT64: + if metric.Int64DataPoints == nil { + return errors.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + + for _, pt := range metric.Int64DataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: float64(pt.Value), + Timestamp: int64(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + + // double points + case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: + if metric.DoubleDataPoints == nil { + return errors.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) + } + for _, pt := range metric.DoubleDataPoints { + + // create parameters for addSample + name := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + labels := createLabelSet(pt.GetLabels(), nameStr, name) + sample := &prompb.Sample{ + Value: pt.Value, + Timestamp: int64(pt.TimeUnixNano), + } + + addSample(tsMap, sample, labels, metric.GetMetricDescriptor().GetType()) + } + return nil + } + + return errors.Errorf("invalid metric type: wants int or double data points") +} + +// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each +// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (prwe *prwExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + if metric.HistogramDataPoints == nil { + return errors.Errorf("invalid metric type: wants histogram points") + } + + for _, pt := range metric.HistogramDataPoints { + + time := int64(pt.GetTimeUnixNano()) + mType := metric.GetMetricDescriptor().GetType() + + // sum, count, and buckets of the histogram should append suffix to baseName + baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + + // treat sum as sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, mType) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, mType) + + // count for +Inf bound + var totalCount uint64 + + // process each bucket + for le, bk := range pt.GetBuckets() { + bucket := &prompb.Sample{ + Value: float64(bk.Count), + Timestamp: time, + } + boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f', -1, 64) + labels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, boundStr) + addSample(tsMap, bucket, labels, mType) + + totalCount += bk.GetCount() + } + + infBucket := &prompb.Sample{ + Value: float64(totalCount), + Timestamp: time, + } + inflabels := createLabelSet(pt.GetLabels(), nameStr, baseName+bucketStr, leStr, pInfStr) + addSample(tsMap, infBucket, inflabels, mType) + } + return nil +} + +// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each +// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. +func (prwe *prwExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { + + if metric.SummaryDataPoints == nil { + return errors.Errorf("invalid metric type: wants summary points") + } + + for _, pt := range metric.SummaryDataPoints { + + time := int64(pt.GetTimeUnixNano()) + mType := metric.GetMetricDescriptor().GetType() + + // sum and count of the Summary should append suffix to baseName + baseName := getPromMetricName(metric.GetMetricDescriptor(), prwe.namespace) + + // treat sum as sample in an individual TimeSeries + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + sumlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+sumStr) + addSample(tsMap, sum, sumlabels, mType) + + // treat count as a sample in an individual TimeSeries + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + countlabels := createLabelSet(pt.GetLabels(), nameStr, baseName+countStr) + addSample(tsMap, count, countlabels, mType) + + // process each percentile/quantile + for _, qt := range pt.GetPercentileValues() { + quantile := &prompb.Sample{ + Value: float64(qt.Value), + Timestamp: time, + } + percentileStr := strconv.FormatFloat(qt.Percentile, 'f', -1, 64) + qtlabels := createLabelSet(pt.GetLabels(), nameStr, baseName, quantileStr, percentileStr) + addSample(tsMap, quantile, qtlabels, mType) + } + } + return nil +} + +// Because we are adhering closely to the Remote Write API, we must Export a +// Snappy-compressed WriteRequest instance of the TimeSeries Metrics in order +// for the Remote Write Endpoint to properly receive our Metrics data. +func (prwe *prwExporter) export(ctx context.Context, TsMap map[string]*prompb.TimeSeries) error { + //Calls the helper function to convert the TsMap to the desired format + req, err := wrapTimeSeries(TsMap) + if err != nil { + return err + } + + //Uses proto.Marshal to convert the WriteRequest into wire format (bytes array) + data, err := proto.Marshal(req) + if err != nil { + return err + } + buf := make([]byte, len(data), cap(data)) + //Makes use of the snappy compressor, as we are emulating the Remote Write package + compressedData := snappy.Encode(buf, data) + + //Create the HTTP POST request to send to the endpoint + httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) + if err != nil { + return err + } + + //Add optional headers + for name, value := range prwe.headers { + httpReq.Header.Set(name, value) + } + + // Add necessary headers specified by: + // https://cortexmetrics.io/docs/apis/#remote-api + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("User-Agent", "otel-collector") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + //Changing context of the httpreq to global context + httpReq = httpReq.WithContext(ctx) + + _, cancel := context.WithTimeout(context.Background(), prwe.client.Timeout) + defer cancel() + + httpResp, err := prwe.client.Do(httpReq) + if err != nil { + return err + } + + //Only checking for unsuccessful status codes + if httpResp.StatusCode < 200 || httpResp.StatusCode > 226 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + } + return err +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go new file mode 100644 index 00000000000..1766f5ebadb --- /dev/null +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -0,0 +1,681 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewriteexporter + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "sync" + "testing" + + proto "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/internal/data" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + "go.opentelemetry.io/collector/internal/data/testdata" +) + +// TODO: add bucket and histogram test cases for Test_PushMetrics + +// Test_handleScalarMetric checks whether data points within a single scalar metric can be added to a map of +// TimeSeries correctly. +// Test cases are two data point belonging to the same TimeSeries, two data point belonging different TimeSeries, +// and nil data points case. +func Test_handleScalarMetric(t *testing.T) { + sameTs := map[string]*prompb.TimeSeries{ + // string signature of the data point is the key of the map + typeMonotonicInt64 + "-name-same_ts_int_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, "name", "same_ts_int_points_total"), + getSample(float64(intVal1), time1), + getSample(float64(intVal2), time1)), + } + differentTs := map[string]*prompb.TimeSeries{ + typeMonotonicDouble + "-name-different_ts_double_points_total" + lb1Sig: getTimeSeries( + getPromLabels(label11, value11, label12, value12, "name", "different_ts_double_points_total"), + getSample(floatVal1, time1)), + typeMonotonicDouble + "-name-different_ts_double_points_total" + lb2Sig: getTimeSeries( + getPromLabels(label21, value21, label22, value22, "name", "different_ts_double_points_total"), + getSample(floatVal2, time2)), + } + + tests := []struct { + name string + m *otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + &otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", monotonicInt64Comb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "same_ts_int_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("same_ts_int_points", monotonicInt64Comb, validCombinations), + Int64DataPoints: []*otlp.Int64DataPoint{ + getIntDataPoint(lbs1, intVal1, time1), + getIntDataPoint(lbs1, intVal2, time1), + }, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + sameTs, + }, + { + "different_ts_double_points", + &otlp.Metric{ + MetricDescriptor: getDescriptor("different_ts_double_points", monotonicDoubleComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: []*otlp.DoubleDataPoint{ + getDoubleDataPoint(lbs1, floatVal1, time1), + getDoubleDataPoint(lbs2, floatVal2, time2), + }, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + false, + differentTs, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &prwExporter{} + ok := prw.handleScalarMetric(tsMap, tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k]) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_handleHistogramMetric checks whether data points(sum, count, buckets) within a single Histogram metric can be +// added to a map of TimeSeries correctly. +// Test cases are a histogram data point with two buckets and nil data points case. +func Test_handleHistogramMetric(t *testing.T) { + sum := "sum" + count := "count" + bucket1 := "bucket1" + bucket2 := "bucket2" + bucketInf := "bucketInf" + histPoint := getHistogramDataPoint( + lbs1, + time1, + floatVal2, + uint64(intVal2), []float64{floatVal1, floatVal2}, + []uint64{uint64(intVal1), uint64(intVal1)}) + + // string signature of the data point is the key of the map + sigs := map[string]string{ + sum: typeHistogram + "-name-" + name1 + "_sum" + lb1Sig, + count: typeHistogram + "-name-" + name1 + "_count" + lb1Sig, + bucket1: typeHistogram + "-" + "le-" + strconv.FormatFloat(floatVal1, 'f', -1, 64) + + "-name-" + name1 + "_bucket" + lb1Sig, + bucket2: typeHistogram + "-" + "le-" + strconv.FormatFloat(floatVal2, 'f', -1, 64) + + "-name-" + name1 + "_bucket" + lb1Sig, + bucketInf: typeHistogram + "-" + "le-" + "+Inf" + + "-name-" + name1 + "_bucket" + lb1Sig, + } + labels := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels("name", name1+"_sum")...), + count: append(promLbs1, getPromLabels("name", name1+"_count")...), + bucket1: append(promLbs1, getPromLabels("name", name1+"_bucket", "le", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + bucket2: append(promLbs1, getPromLabels("name", name1+"_bucket", "le", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + bucketInf: append(promLbs1, getPromLabels("name", name1+"_bucket", "le", + "+Inf")...), + } + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "single_histogram_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1+"", histogramComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: []*otlp.HistogramDataPoint{histPoint}, + SummaryDataPoints: nil, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(labels[sum], getSample(floatVal2, time1)), + sigs[count]: getTimeSeries(labels[count], getSample(float64(intVal2), time1)), + sigs[bucket1]: getTimeSeries(labels[bucket1], getSample(float64(intVal1), time1)), + sigs[bucket2]: getTimeSeries(labels[bucket2], getSample(float64(intVal1), time1)), + sigs[bucketInf]: getTimeSeries(labels[bucketInf], getSample(float64(intVal2), time1)), + }, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &prwExporter{} + ok := prw.handleHistogramMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Exactly(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_handleSummaryMetric checks whether data points(sum, count, quantiles) within a single Summary metric can be +// added to a map of TimeSeries correctly. +// Test cases are a summary data point with two quantiles and nil data points case. +func Test_handleSummaryMetric(t *testing.T) { + sum := "sum" + count := "count" + q1 := "quantile1" + q2 := "quantile2" + // string signature is the key of the map + sigs := map[string]string{ + sum: typeSummary + "-name-" + name1 + "_sum" + lb1Sig, + count: typeSummary + "-name-" + name1 + "_count" + lb1Sig, + q1: typeSummary + "-name-" + name1 + "-" + "quantile-" + + strconv.FormatFloat(floatVal1, 'f', -1, 64) + lb1Sig, + q2: typeSummary + "-name-" + name1 + "-" + "quantile-" + + strconv.FormatFloat(floatVal2, 'f', -1, 64) + lb1Sig, + } + labels := map[string][]prompb.Label{ + sum: append(promLbs1, getPromLabels("name", name1+"_sum")...), + count: append(promLbs1, getPromLabels("name", name1+"_count")...), + q1: append(promLbs1, getPromLabels("name", name1, "quantile", + strconv.FormatFloat(floatVal1, 'f', -1, 64))...), + q2: append(promLbs1, getPromLabels("name", name1, "quantile", + strconv.FormatFloat(floatVal2, 'f', -1, 64))...), + } + + summaryPoint := getSummaryDataPoint(lbs1, time1, floatVal2, uint64(intVal2), []float64{floatVal1, floatVal2}, []float64{floatVal1, floatVal1}) + + tests := []struct { + name string + m otlp.Metric + returnError bool + want map[string]*prompb.TimeSeries + }{ + { + "invalid_nil_array", + otlp.Metric{ + MetricDescriptor: getDescriptor("invalid_nil_array", summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: nil, + }, + true, + map[string]*prompb.TimeSeries{}, + }, + { + "single_summary_point", + otlp.Metric{ + MetricDescriptor: getDescriptor(name1, summaryComb, validCombinations), + Int64DataPoints: nil, + DoubleDataPoints: nil, + HistogramDataPoints: nil, + SummaryDataPoints: []*otlp.SummaryDataPoint{summaryPoint}, + }, + false, + map[string]*prompb.TimeSeries{ + sigs[sum]: getTimeSeries(labels[sum], getSample(floatVal2, time1)), + sigs[count]: getTimeSeries(labels[count], getSample(float64(intVal2), time1)), + sigs[q1]: getTimeSeries(labels[q1], getSample(float64(intVal1), time1)), + sigs[q2]: getTimeSeries(labels[q2], getSample(float64(intVal1), time1)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + prw := &prwExporter{} + ok := prw.handleSummaryMetric(tsMap, &tt.m) + if tt.returnError { + assert.Error(t, ok) + return + } + assert.Equal(t, len(tt.want), len(tsMap)) + for k, v := range tsMap { + require.NotNil(t, tt.want[k], k) + assert.ElementsMatch(t, tt.want[k].Labels, v.Labels) + assert.ElementsMatch(t, tt.want[k].Samples, v.Samples) + } + }) + } +} + +// Test_newPrwExporter checks that a new exporter instance with non-nil fields is initialized +func Test_newPrwExporter(t *testing.T) { + config := &Config{ + ExporterSettings: configmodels.ExporterSettings{}, + TimeoutSettings: exporterhelper.TimeoutSettings{}, + QueueSettings: exporterhelper.QueueSettings{}, + RetrySettings: exporterhelper.RetrySettings{}, + Namespace: "", + HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ""}, + } + tests := []struct { + name string + config *Config + namespace string + endpoint string + client *http.Client + returnError bool + }{ + { + "invalid_URL", + config, + "test", + "invalid URL", + http.DefaultClient, + true, + }, + { + "nil_client", + config, + "test", + "http://some.url:9411/api/prom/push", + nil, + true, + }, + { + "success_case", + config, + "test", + "http://some.url:9411/api/prom/push", + http.DefaultClient, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + prwe, err := newPrwExporter(tt.namespace, tt.endpoint, tt.client) + if tt.returnError { + assert.Error(t, err) + return + } + require.NotNil(t, prwe) + assert.NotNil(t, prwe.namespace) + assert.NotNil(t, prwe.endpointURL) + assert.NotNil(t, prwe.client) + assert.NotNil(t, prwe.closeChan) + assert.NotNil(t, prwe.wg) + }) + } +} + +// Test_shutdown checks after shutdown is called, incoming calls to pushMetrics return error. +func Test_shutdown(t *testing.T) { + prwe := &prwExporter{ + wg: new(sync.WaitGroup), + closeChan: make(chan struct{}), + } + wg := new(sync.WaitGroup) + errChan := make(chan error, 5) + err := prwe.shutdown(context.Background()) + require.NoError(t, err) + errChan = make(chan error, 5) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, ok := prwe.pushMetrics(context.Background(), + pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty())) + errChan <- ok + }() + } + wg.Wait() + close(errChan) + for ok := range errChan { + assert.Error(t, ok) + } +} + +//Test whether or not the Server receives the correct TimeSeries. +//Currently considering making this test an iterative for loop of multiple TimeSeries +//Much akin to Test_pushMetrics +func Test_export(t *testing.T) { + //First we will instantiate a dummy TimeSeries instance to pass into both the export call and compare the http request + labels := getPromLabels(label11, value11, label12, value12, label21, value21, label22, value22) + sample1 := getSample(floatVal1, time1) + sample2 := getSample(floatVal2, time2) + ts1 := getTimeSeries(labels, sample1, sample2) + handleFunc := func(w http.ResponseWriter, r *http.Request, code int) { + //The following is a handler function that reads the sent httpRequest, unmarshals, and checks if the WriteRequest + //preserves the TimeSeries data correctly + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, body) + //Receives the http requests and unzip, unmarshals, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + writeReq := &prompb.WriteRequest{} + unzipped := []byte{} + + dest, err := snappy.Decode(unzipped, body) + require.NoError(t, err) + + ok := proto.Unmarshal(dest, writeReq) + require.NoError(t, ok) + + assert.EqualValues(t, 1, len(writeReq.Timeseries)) + require.NotNil(t, writeReq.GetTimeseries()) + assert.Equal(t, *ts1, writeReq.GetTimeseries()[0]) + w.WriteHeader(code) + } + + // Create in test table format to check if different HTTP response codes or server errors + // are properly identified + tests := []struct { + name string + ts prompb.TimeSeries + serverUp bool + httpResponseCode int + returnError bool + }{ + {"success_case", + *ts1, + true, + http.StatusAccepted, + false, + }, + { + "server_no_response_case", + *ts1, + false, + http.StatusAccepted, + true, + }, { + "error_status_code_case", + *ts1, + true, + http.StatusForbidden, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if handleFunc != nil { + handleFunc(w, r, tt.httpResponseCode) + } + })) + defer server.Close() + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + if !tt.serverUp { + server.Close() + } + err := runExportPipeline(t, ts1, serverURL) + if tt.returnError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} + +func runExportPipeline(t *testing.T, ts *prompb.TimeSeries, endpoint *url.URL) error { + //First we will construct a TimeSeries array from the testutils package + testmap := make(map[string]*prompb.TimeSeries) + testmap["test"] = ts + + HTTPClient := http.DefaultClient + //after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint + prwe, err := newPrwExporter("test", endpoint.String(), HTTPClient) + if err != nil { + return err + } + err = prwe.export(context.Background(), testmap) + return err +} + +// Test_pushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as +// expected +func Test_pushMetrics(t *testing.T) { + noTempBatch := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataManyMetricsSameResource(10)) + invalidTypeBatch := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataMetricTypeInvalid()) + nilDescBatch := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataNilMetricDescriptor()) + + // 10 counter metrics, 2 points in each. Two TimeSeries in total + batch := testdata.GenerateMetricDataManyMetricsSameResource(10) + setCumulative(&batch) + scalarBatch := pdatautil.MetricsFromInternalMetrics(batch) + + hist := data.MetricDataToOtlp(testdata.GenerateMetricDataOneMetric()) + hist[0].InstrumentationLibraryMetrics[0].Metrics[0] = &otlp.Metric{ + MetricDescriptor: getDescriptor("hist_test", histogramComb, validCombinations), + HistogramDataPoints: []*otlp.HistogramDataPoint{getHistogramDataPoint( + lbs1, + time1, + floatVal1, + uint64(intVal1), + []float64{floatVal1}, + []uint64{uint64(intVal1)}, + ), + }, + } + + histBatch := pdatautil.MetricsFromInternalMetrics(data.MetricDataFromOtlp(hist)) + checkFunc := func(t *testing.T, r *http.Request, expected int) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(body)) + dest, err := snappy.Decode(buf, body) + assert.Equal(t, "0.1.0", r.Header.Get("x-prometheus-remote-write-version")) + assert.Equal(t, "snappy", r.Header.Get("content-encoding")) + assert.NotNil(t, r.Header.Get("tenant-id")) + require.NoError(t, err) + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + require.Nil(t, ok) + assert.EqualValues(t, expected, len(wr.Timeseries)) + } + + summary := data.MetricDataToOtlp(testdata.GenerateMetricDataOneMetric()) + summary[0].InstrumentationLibraryMetrics[0].Metrics[0] = &otlp.Metric{ + MetricDescriptor: getDescriptor("summary_test", summaryComb, validCombinations), + SummaryDataPoints: []*otlp.SummaryDataPoint{getSummaryDataPoint( + lbs1, + time1, + floatVal1, + uint64(intVal1), + []float64{floatVal1}, + []float64{floatVal2}, + ), + }, + } + summaryBatch := pdatautil.MetricsFromInternalMetrics(data.MetricDataFromOtlp(summary)) + + tests := []struct { + name string + md *pdata.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int) + expected int + httpResponseCode int + numDroppedTimeSeries int + returnErr bool + }{ + { + "invalid_type_case", + &invalidTypeBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(invalidTypeBatch), + true, + }, + { + "nil_desc_case", + &nilDescBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(nilDescBatch), + true, + }, + { + "no_temp_case", + &noTempBatch, + nil, + 0, + http.StatusAccepted, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "http_error_case", + &noTempBatch, + nil, + 0, + http.StatusForbidden, + pdatautil.MetricCount(noTempBatch), + true, + }, + { + "scalar_case", + &scalarBatch, + checkFunc, + 2, + http.StatusAccepted, + 0, + false, + }, + {"histogram_case", + &histBatch, + checkFunc, + 4, + http.StatusAccepted, + 0, + false, + }, + {"summary_case", + &summaryBatch, + checkFunc, + 3, + http.StatusAccepted, + 0, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r, tt.expected) + } + w.WriteHeader(tt.httpResponseCode) + })) + + defer server.Close() + + serverURL, uErr := url.Parse(server.URL) + assert.NoError(t, uErr) + + config := &Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "prometheusremotewrite", + NameVal: "prometheusremotewrite", + }, + Namespace: "", + Headers: map[string]string{}, + + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://some.url:9411/api/prom/push", + // We almost read 0 bytes, so no need to tune ReadBufferSize. + ReadBufferSize: 0, + WriteBufferSize: 512 * 1024, + }, + } + assert.NotNil(t, config) + // c, err := config.HTTPClientSettings.ToClient() + // assert.Nil(t, err) + c := http.DefaultClient + prwe, nErr := newPrwExporter(config.Namespace, serverURL.String(), c) + require.NoError(t, nErr) + numDroppedTimeSeries, err := prwe.pushMetrics(context.Background(), *tt.md) + assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + if tt.returnErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + }) + } +} diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go new file mode 100644 index 00000000000..e5e8ee8b331 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -0,0 +1,97 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Implements a factory interface and design pattern to efficiently instantiate metricsExporter +// instances. + +package prometheusremotewriteexporter + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +const ( + // The value of "type" key in configuration. + typeStr = "prometheusremotewrite" +) + +func NewFactory() component.ExporterFactory { + return exporterhelper.NewFactory( + typeStr, + createDefaultConfig, + exporterhelper.WithMetrics(createMetricsExporter)) +} + +// Instantiates a pseudo-Cortex Exporter that adheres to the component MetricsExporter interface +func createMetricsExporter(_ context.Context, _ component.ExporterCreateParams, + cfg configmodels.Exporter) (component.MetricsExporter, error) { + + cCfg := cfg.(*Config) + + client, err := cCfg.HTTPClientSettings.ToClient() + + if err != nil { + return nil, err + } + + prwe, err := newPrwExporter(cCfg.Namespace, cCfg.HTTPClientSettings.Endpoint, client) + + if err != nil { + return nil, err + } + + prwexp, err := exporterhelper.NewMetricsExporter( + cfg, + prwe.pushMetrics, + exporterhelper.WithTimeout(cCfg.TimeoutSettings), + exporterhelper.WithQueue(cCfg.QueueSettings), + exporterhelper.WithRetry(cCfg.RetrySettings), + exporterhelper.WithShutdown(prwe.shutdown), + ) + + if err != nil { + return nil, err + } + + return prwexp, nil +} + +func createDefaultConfig() configmodels.Exporter { + qs := exporterhelper.CreateDefaultQueueSettings() + qs.Enabled = false + + return &Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Namespace: "", + Headers: map[string]string{}, + + TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), + RetrySettings: exporterhelper.CreateDefaultRetrySettings(), + QueueSettings: qs, + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://some.url:9411/api/prom/push", + // We almost read 0 bytes, so no need to tune ReadBufferSize. + ReadBufferSize: 0, + WriteBufferSize: 512 * 1024, + }, + } +} diff --git a/exporter/prometheusremotewriteexporter/factory_test.go b/exporter/prometheusremotewriteexporter/factory_test.go new file mode 100644 index 00000000000..2328985ee2f --- /dev/null +++ b/exporter/prometheusremotewriteexporter/factory_test.go @@ -0,0 +1,62 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewriteexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configmodels" +) + +//Tests whether or not the default Exporter factory can instantiate a properly interfaced Exporter with default conditions +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +//Tests whether or not a correct Metrics Exporter from the default Config parameters +func TestCreateMetricsExporter(t *testing.T) { + factory := NewFactory() + tests := []struct { + name string + cfg configmodels.Exporter + params component.ExporterCreateParams + returnError bool + }{ + {"success_case", + factory.CreateDefaultConfig(), + component.ExporterCreateParams{}, + false, + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := factory.CreateMetricsExporter(context.Background(), tt.params, tt.cfg) + if tt.returnError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +} diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go new file mode 100644 index 00000000000..3d565f16178 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -0,0 +1,235 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file defines helper functions which are used in cortex.go. All functions here +// are not class functions, but rather standalone functions that mainly serve to help convert +// metrics data into different formats (sanitizing and adding labels, functions that +// help with creating an efficient map in cortex.go, etc.) + +package prometheusremotewriteexporter + +import ( + "log" + "sort" + "strings" + "unicode" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/prompb" + + common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +const ( + nameStr = "name" + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + totalStr = "total" + delimeter = "_" + keyStr = "key" +) + +// ByLabelName enables the usage of sort.Sort() with a slice of labels +type ByLabelName []prompb.Label + +func (a ByLabelName) Len() int { return len(a) } +func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// validateMetrics returns a bool representing whether the metric has a valid type and temporality combination. +func validateMetrics(desc *otlp.MetricDescriptor) bool { + + if desc == nil { + return false + } + + switch desc.GetType() { + case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_MONOTONIC_INT64, + otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_SUMMARY: + return desc.GetTemporality() == otlp.MetricDescriptor_CUMULATIVE + case otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_DOUBLE: + return true + } + + return false +} + +// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it +// creates a new TimeSeries in the map if not found. tsMap is unmodified if either of its parameters is nil. +func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label, + ty otlp.MetricDescriptor_Type) { + + if sample == nil || labels == nil || tsMap == nil { + return + } + + sig := timeSeriesSignature(ty, &labels) + ts, ok := tsMap[sig] + + if ok { + ts.Samples = append(ts.Samples, *sample) + } else { + newTs := &prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{*sample}, + } + tsMap[sig] = newTs + } +} + +// timeSeries return a string signature in the form of: +// TYPE-label1-value1- ... -labelN-valueN +// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +func timeSeriesSignature(t otlp.MetricDescriptor_Type, labels *[]prompb.Label) string { + b := strings.Builder{} + b.WriteString(t.String()) + + sort.Sort(ByLabelName(*labels)) + + for _, lb := range *labels { + b.WriteString("-") + b.WriteString(lb.GetName()) + b.WriteString("-") + b.WriteString(lb.GetValue()) + } + + return b.String() +} + +// createLabelSet creates a slice of Cortex Label with OTLP labels and paris of string values. +// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is +// logged. Resultant label names are sanitized. +func createLabelSet(labels []*common.StringKeyValue, extras ...string) []prompb.Label { + + // map ensures no duplicate label name + l := map[string]prompb.Label{} + + for _, lb := range labels { + l[lb.Key] = prompb.Label{ + Name: sanitize(lb.Key), + Value: lb.Value, + } + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + l[extras[i]] = prompb.Label{ + Name: sanitize(extras[i]), + Value: extras[i+1], + } + } + + s := make([]prompb.Label, 0, len(l)) + + for _, lb := range l { + s = append(s, lb) + } + + return s +} + +// getPromMetricName creates a Prometheus metric name by attaching namespace prefix, and _total suffix for Monotonic +// metrics. +func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { + + if desc == nil { + return "" + } + // whether _total suffix should be applied + isCounter := desc.Type == otlp.MetricDescriptor_MONOTONIC_INT64 || + desc.Type == otlp.MetricDescriptor_MONOTONIC_DOUBLE + + b := strings.Builder{} + + b.WriteString(ns) + + if b.Len() > 0 { + b.WriteString(delimeter) + } + b.WriteString(desc.GetName()) + + // Including units makes two metrics with the same name and label set belong to two different TimeSeries if the + // units are different. + /* + if b.Len() > 0 && len(desc.GetUnit()) > 0{ + fmt.Fprintf(&b, delimeter) + fmt.Fprintf(&b, desc.GetUnit()) + } + */ + + if b.Len() > 0 && isCounter { + b.WriteString(delimeter) + b.WriteString(totalStr) + } + return sanitize(b.String()) +} + +// Simple helper function that takes the map +// and creates a WriteRequest from the struct -- can move to the helper.go file +func wrapTimeSeries(TsMap map[string]*prompb.TimeSeries) (*prompb.WriteRequest, error) { + if len(TsMap) == 0 { + return nil, errors.Errorf("invalid TsMap: cannot be empty map") + } + TsArray := []prompb.TimeSeries{} + for _, v := range TsMap { + TsArray = append(TsArray, *v) + } + wrapped := prompb.WriteRequest{ + Timeseries: TsArray, + //Other parameters of the WriteRequest are unnecessary for our Export + } + return &wrapped, nil +} + +// copied from prometheus-go-metric-exporter +// sanitize replaces non-alphanumeric characters with underscores in s. +func sanitize(s string) string { + if len(s) == 0 { + return s + } + + // Note: No length limit for label keys because Prometheus doesn't + // define a length limit, thus we should NOT be truncating label keys. + // See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4. + s = strings.Map(sanitizeRune, s) + if unicode.IsDigit(rune(s[0])) { + s = keyStr + delimeter + s + } + if s[0] == '_' { + s = keyStr + s + } + return s +} + +// copied from prometheus-go-metric-exporter +// sanitizeRune converts anything that is not a letter or digit to an underscore +func sanitizeRune(r rune) rune { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + return r + } + // Everything else turns into an underscore + return '_' +} diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go new file mode 100644 index 00000000000..b1e1efe93b0 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -0,0 +1,276 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheusremotewriteexporter + +import ( + "strconv" + "testing" + + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + + common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +// Test_validateMetrics checks validateMetrics return true if a type and temporality combination is valid, false +// otherwise. +func Test_validateMetrics(t *testing.T) { + // define a single test + type combTest struct { + name string + desc *otlp.MetricDescriptor + want bool + } + + tests := []combTest{} + + // append true cases + for i := range validCombinations { + name := "valid_" + strconv.Itoa(i) + desc := getDescriptor(name, i, validCombinations) + tests = append(tests, combTest{ + name, + desc, + true, + }) + } + // append false cases + for i := range invalidCombinations { + name := "invalid_" + strconv.Itoa(i) + desc := getDescriptor(name, i, invalidCombinations) + tests = append(tests, combTest{ + name, + desc, + false, + }) + } + // append nil case + tests = append(tests, combTest{"invalid_nil", nil, false}) + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := validateMetrics(tt.desc) + assert.Equal(t, tt.want, got) + }) + } +} + +// Test_addSample checks addSample updates the map it receives correctly based on the sample and Label +// set it receives. +// Test cases are two samples belonging to the same TimeSeries, two samples belong to different TimeSeries, and nil +// case. +func Test_addSample(t *testing.T) { + type testCase struct { + desc otlp.MetricDescriptor_Type + sample prompb.Sample + labels []prompb.Label + } + + tests := []struct { + name string + orig map[string]*prompb.TimeSeries + testCase []testCase + want map[string]*prompb.TimeSeries + }{ + { + "two_points_same_ts_same_metric", + map[string]*prompb.TimeSeries{}, + []testCase{ + {otlp.MetricDescriptor_INT64, + getSample(float64(intVal1), time1), + promLbs1, + }, + { + otlp.MetricDescriptor_INT64, + getSample(float64(intVal2), time2), + promLbs1, + }, + }, + twoPointsSameTs, + }, + { + "two_points_different_ts_same_metric", + map[string]*prompb.TimeSeries{}, + []testCase{ + {otlp.MetricDescriptor_INT64, + getSample(float64(intVal1), time1), + promLbs1, + }, + {otlp.MetricDescriptor_INT64, + getSample(float64(intVal1), time2), + promLbs2, + }, + }, + twoPointsDifferentTs, + }, + } + t.Run("nil_case", func(t *testing.T) { + tsMap := map[string]*prompb.TimeSeries{} + addSample(tsMap, nil, nil, 0) + assert.Exactly(t, tsMap, map[string]*prompb.TimeSeries{}) + }) + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + addSample(tt.orig, &tt.testCase[0].sample, tt.testCase[0].labels, tt.testCase[0].desc) + addSample(tt.orig, &tt.testCase[1].sample, tt.testCase[1].labels, tt.testCase[1].desc) + assert.Exactly(t, tt.want, tt.orig) + }) + } +} + +// Test_timeSeries checks timeSeriesSignature returns consistent and unique signatures for a distinct label set and +// metric type combination. +func Test_timeSeriesSignature(t *testing.T) { + tests := []struct { + name string + lbs []prompb.Label + desc otlp.MetricDescriptor_Type + want string + }{ + { + "int64_signature", + promLbs1, + otlp.MetricDescriptor_INT64, + typeInt64 + lb1Sig, + }, + { + "histogram_signature", + promLbs2, + otlp.MetricDescriptor_HISTOGRAM, + typeHistogram + lb2Sig, + }, + { + "unordered_signature", + getPromLabels(label22, value22, label21, value21), + otlp.MetricDescriptor_HISTOGRAM, + typeHistogram + lb2Sig, + }, + // descriptor type cannot be nil, as checked by validateMetrics + { + "nil_case", + nil, + otlp.MetricDescriptor_HISTOGRAM, + typeHistogram, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.EqualValues(t, tt.want, timeSeriesSignature(tt.desc, &tt.lbs)) + }) + } +} + +// Test_createLabelSet checks resultant label names are sanitized and label in extra overrides label in labels if +// collision happens. It does not check whether labels are not sorted +func Test_createLabelSet(t *testing.T) { + tests := []struct { + name string + orig []*common.StringKeyValue + extras []string + want []prompb.Label + }{ + { + "labels_clean", + lbs1, + []string{label31, value31, label32, value32}, + getPromLabels(label11, value11, label12, value12, label31, value31, label32, value32), + }, + { + "labels_duplicate_in_extras", + lbs1, + []string{label11, value31}, + getPromLabels(label11, value31, label12, value12), + }, + { + "labels_dirty", + lbs1Dirty, + []string{label31 + dirty1, value31, label32, value32}, + getPromLabels(label11+"_", value11, "key_"+label12, value12, label31+"_", value31, label32, value32), + }, + { + "no_extras_case", + nil, + []string{label31, value31, label32, value32}, + getPromLabels(label31, value31, label32, value32), + }, + { + "single_left_over_case", + lbs1, + []string{label31, value31, label32}, + getPromLabels(label11, value11, label12, value12, label31, value31), + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.ElementsMatch(t, tt.want, createLabelSet(tt.orig, tt.extras...)) + }) + } +} + +// Tes_getPromMetricName checks if OTLP metric names are converted to Cortex metric names correctly. +// Test cases are empty namespace, monotonic metrics that require a total suffix, and metric names that contains +// invalid characters. +func Test_getPromMetricName(t *testing.T) { + tests := []struct { + name string + desc *otlp.MetricDescriptor + ns string + want string + }{ + { + "nil_case", + nil, + ns1, + "", + }, + { + "normal_case", + getDescriptor(name1, histogramComb, validCombinations), + ns1, + "test_ns_valid_single_int_point", + }, + { + "empty_namespace", + getDescriptor(name1, summaryComb, validCombinations), + "", + "valid_single_int_point", + }, + { + "total_suffix", + getDescriptor(name1, monotonicInt64Comb, validCombinations), + ns1, + "test_ns_valid_single_int_point_total", + }, + { + "dirty_string", + getDescriptor(name1+dirty1, monotonicInt64Comb, validCombinations), + "7" + ns1, + "key_7test_ns_valid_single_int_point__total", + }, + } + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, getPromMetricName(tt.desc, tt.ns)) + }) + } + +} diff --git a/exporter/prometheusremotewriteexporter/testdata/config.yaml b/exporter/prometheusremotewriteexporter/testdata/config.yaml new file mode 100644 index 00000000000..ba02afab3c0 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/testdata/config.yaml @@ -0,0 +1,37 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + prometheusremotewrite: + prometheusremotewrite/2: + headers: + Prometheus-Remote-Write-Version: "0.1.0" + Tenant-id: 234 + namespace: "test-space" + timeout: 10s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m + http_setting: + endpoint: "localhost:8888" + ca_file: "/var/lib/mycert.pem" + timeout: 5s + write_buffer_size: 524288 + +service: + pipelines: + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [prometheusremotewrite] + + diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go new file mode 100644 index 00000000000..50c3376c2ca --- /dev/null +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -0,0 +1,248 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Utility variables and functions to easily create test data, both OTLP and Prometheus TimeSeries + +package prometheusremotewriteexporter + +import ( + "time" + + "github.com/prometheus/prometheus/prompb" + + "go.opentelemetry.io/collector/internal/data" + commonpb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" +) + +type combination struct { + ty otlp.MetricDescriptor_Type + temp otlp.MetricDescriptor_Temporality +} + +var ( + time1 = uint64(time.Now().UnixNano()) + time2 = uint64(time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).UnixNano()) + + typeInt64 = "INT64" + typeMonotonicInt64 = "MONOTONIC_INT64" + typeMonotonicDouble = "MONOTONIC_DOUBLE" + typeHistogram = "HISTOGRAM" + typeSummary = "SUMMARY" + + label11 = "test_label11" + value11 = "test_value11" + label12 = "test_label12" + value12 = "test_value12" + label21 = "test_label21" + value21 = "test_value21" + label22 = "test_label22" + value22 = "test_value22" + label31 = "test_label31" + value31 = "test_value31" + label32 = "test_label32" + value32 = "test_value32" + dirty1 = "%" + dirty2 = "?" + + intVal1 int64 = 1 + intVal2 int64 = 2 + floatVal1 = 1.0 + floatVal2 = 2.0 + + lbs1 = getLabels(label11, value11, label12, value12) + lbs2 = getLabels(label21, value21, label22, value22) + lbs1Dirty = getLabels(label11+dirty1, value11, dirty2+label12, value12) + + promLbs1 = getPromLabels(label11, value11, label12, value12) + promLbs2 = getPromLabels(label21, value21, label22, value22) + + lb1Sig = "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12 + lb2Sig = "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22 + ns1 = "test_ns" + name1 = "valid_single_int_point" + + monotonicInt64Comb = 0 + monotonicDoubleComb = 1 + histogramComb = 2 + summaryComb = 3 + validCombinations = []combination{ + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_INSTANTANEOUS}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_INSTANTANEOUS}, + {otlp.MetricDescriptor_INT64, otlp.MetricDescriptor_CUMULATIVE}, + {otlp.MetricDescriptor_DOUBLE, otlp.MetricDescriptor_CUMULATIVE}, + } + invalidCombinations = []combination{ + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_INT64, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_HISTOGRAM, otlp.MetricDescriptor_DELTA}, + {otlp.MetricDescriptor_SUMMARY, otlp.MetricDescriptor_DELTA}, + {ty: otlp.MetricDescriptor_INVALID_TYPE}, + {temp: otlp.MetricDescriptor_INVALID_TEMPORALITY}, + {}, + } + twoPointsSameTs = map[string]*prompb.TimeSeries{ + typeInt64 + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), time1), + getSample(float64(intVal2), time2)), + } + twoPointsDifferentTs = map[string]*prompb.TimeSeries{ + typeInt64 + "-" + label11 + "-" + value11 + "-" + label12 + "-" + value12: getTimeSeries(getPromLabels(label11, value11, label12, value12), + getSample(float64(intVal1), time1)), + typeInt64 + "-" + label21 + "-" + value21 + "-" + label22 + "-" + value22: getTimeSeries(getPromLabels(label21, value21, label22, value22), + getSample(float64(intVal1), time2)), + } +) + +// OTLP metrics +// labels must come in pairs +func getLabels(labels ...string) []*commonpb.StringKeyValue { + var set []*commonpb.StringKeyValue + for i := 0; i < len(labels); i += 2 { + set = append(set, &commonpb.StringKeyValue{ + Key: labels[i], + Value: labels[i+1], + }) + } + return set +} + +func getDescriptor(name string, i int, comb []combination) *otlp.MetricDescriptor { + return &otlp.MetricDescriptor{ + Name: name, + Description: "", + Unit: "", + Type: comb[i].ty, + Temporality: comb[i].temp, + } +} + +func getIntDataPoint(labels []*commonpb.StringKeyValue, value int64, ts uint64) *otlp.Int64DataPoint { + return &otlp.Int64DataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + +func getDoubleDataPoint(labels []*commonpb.StringKeyValue, value float64, ts uint64) *otlp.DoubleDataPoint { + return &otlp.DoubleDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Value: value, + } +} + +func getHistogramDataPoint(labels []*commonpb.StringKeyValue, ts uint64, sum float64, count uint64, bounds []float64, buckets []uint64) *otlp.HistogramDataPoint { + bks := []*otlp.HistogramDataPoint_Bucket{} + for _, c := range buckets { + bks = append(bks, &otlp.HistogramDataPoint_Bucket{ + Count: c, + Exemplar: nil, + }) + } + return &otlp.HistogramDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Count: count, + Sum: sum, + Buckets: bks, + ExplicitBounds: bounds, + } +} + +func getSummaryDataPoint(labels []*commonpb.StringKeyValue, ts uint64, sum float64, count uint64, pcts []float64, values []float64) *otlp.SummaryDataPoint { + pcs := []*otlp.SummaryDataPoint_ValueAtPercentile{} + for i, v := range values { + pcs = append(pcs, &otlp.SummaryDataPoint_ValueAtPercentile{ + Percentile: pcts[i], + Value: v, + }) + } + return &otlp.SummaryDataPoint{ + Labels: labels, + StartTimeUnixNano: 0, + TimeUnixNano: ts, + Count: count, + Sum: sum, + PercentileValues: pcs, + } +} + +// Prometheus TimeSeries +func getPromLabels(lbs ...string) []prompb.Label { + pbLbs := prompb.Labels{ + Labels: []prompb.Label{}, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } + for i := 0; i < len(lbs); i += 2 { + pbLbs.Labels = append(pbLbs.Labels, getLabel(lbs[i], lbs[i+1])) + } + return pbLbs.Labels +} + +func getLabel(name string, value string) prompb.Label { + return prompb.Label{ + Name: name, + Value: value, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + +func getSample(v float64, t uint64) prompb.Sample { + return prompb.Sample{ + Value: v, + Timestamp: int64(t), + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + +func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.TimeSeries { + return &prompb.TimeSeries{ + Labels: labels, + Samples: samples, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + } +} + +func setCumulative(metricsData *data.MetricData) { + for _, r := range data.MetricDataToOtlp(*metricsData) { + for _, instMetrics := range r.InstrumentationLibraryMetrics { + for _, m := range instMetrics.Metrics { + m.MetricDescriptor.Temporality = otlp.MetricDescriptor_CUMULATIVE + } + } + } +} diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index 61dbc536520..89072c5c7ae 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -111,7 +111,7 @@ func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.Tra return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } _ = resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode > 299 { + if resp.StatusCode/100 != 2 { return len(td.Spans), fmt.Errorf("failed the request with status code %d", resp.StatusCode) } return 0, nil diff --git a/go.mod b/go.mod index 4ddf9e5ceb6..3fd2ab16117 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/golang/protobuf v1.4.2 + github.com/golang/snappy v0.0.1 github.com/golangci/golangci-lint v1.29.0 github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 github.com/google/go-cmp v0.5.1 @@ -44,6 +45,7 @@ require ( github.com/rs/cors v1.7.0 github.com/securego/gosec v0.0.0-20200316084457-7da9f46445fd github.com/shirou/gopsutil v0.0.0-20200517204708-c89193f22d93 // c89193f22d9359848988f32aee972122bb2abdc2 + github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e github.com/soheilhy/cmux v0.1.4 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.1 diff --git a/service/README.md b/service/README.md new file mode 100644 index 00000000000..cdc5d1428bc --- /dev/null +++ b/service/README.md @@ -0,0 +1,4 @@ +# General Information + +The service package does most of the backend processing for the Collector pipeline. + diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 8169436ce45..89974ba9ea7 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/exporter/prometheusexporter" + "go.opentelemetry.io/collector/exporter/prometheusremotewriteexporter" "go.opentelemetry.io/collector/exporter/zipkinexporter" "go.opentelemetry.io/collector/extension/fluentbitextension" "go.opentelemetry.io/collector/extension/healthcheckextension" @@ -88,6 +89,7 @@ func Components() ( &fileexporter.Factory{}, otlpexporter.NewFactory(), kafkaexporter.NewFactory(), + prometheusremotewriteexporter.NewFactory(), ) if err != nil { errs = append(errs, err) diff --git a/service/defaultcomponents/defaults_test.go b/service/defaultcomponents/defaults_test.go index 790fc90a45e..bb106cb599d 100644 --- a/service/defaultcomponents/defaults_test.go +++ b/service/defaultcomponents/defaults_test.go @@ -61,6 +61,7 @@ func TestDefaultComponents(t *testing.T) { "file", "otlp", "kafka", + "prometheusremotewrite", } factories, err := Components()