Skip to content

Commit

Permalink
refactor: use module per signal type
Browse files Browse the repository at this point in the history
Mitigates open-telemetry/opentelemetry-collector#5168

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
Dominik Rosiek authored and Mikołaj Świątek committed Apr 25, 2022
1 parent d44bde0 commit 8e0fc85
Show file tree
Hide file tree
Showing 80 changed files with 1,183 additions and 1,162 deletions.
23 changes: 12 additions & 11 deletions pkg/exporter/sumologicexporter/carbon_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"fmt"
"strings"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// carbon2TagString returns all attributes as space spearated key=value pairs.
// In addition, metric name and unit are also included.
// In case `metric` or `unit` attributes has been set too, they are prefixed
// with underscore `_` to avoid overwriting the metric name and unit.
func carbon2TagString(metric pdata.Metric, attributes pdata.Map) string {
func carbon2TagString(metric pmetric.Metric, attributes pcommon.Map) string {
length := attributes.Len()

if _, ok := attributes.Get("metric"); ok {
Expand All @@ -37,7 +38,7 @@ func carbon2TagString(metric pdata.Metric, attributes pdata.Map) string {
}

returnValue := make([]string, 0, length)
attributes.Range(func(k string, v pdata.AttributeValue) bool {
attributes.Range(func(k string, v pcommon.Value) bool {
if k == "name" || k == "unit" {
k = fmt.Sprintf("_%s", k)
}
Expand Down Expand Up @@ -65,15 +66,15 @@ func sanitizeCarbonString(text string) string {

// carbon2NumberRecord converts NumberDataPoint to carbon2 metric string
// with additional information from metricPair.
func carbon2NumberRecord(metric pdata.Metric, attributes pdata.Map, dataPoint pdata.NumberDataPoint) string {
func carbon2NumberRecord(metric pmetric.Metric, attributes pcommon.Map, dataPoint pmetric.NumberDataPoint) string {
switch dataPoint.ValueType() {
case pdata.MetricValueTypeDouble:
case pmetric.MetricValueTypeDouble:
return fmt.Sprintf("%s %g %d",
carbon2TagString(metric, attributes),
dataPoint.DoubleVal(),
dataPoint.Timestamp()/1e9,
)
case pdata.MetricValueTypeInt:
case pmetric.MetricValueTypeInt:
return fmt.Sprintf("%s %d %d",
carbon2TagString(metric, attributes),
dataPoint.IntVal(),
Expand All @@ -84,25 +85,25 @@ func carbon2NumberRecord(metric pdata.Metric, attributes pdata.Map, dataPoint pd
}

// carbon2metric2String converts metric to Carbon2 formatted string.
func carbon2Metric2String(metric pdata.Metric, attributes pdata.Map) string {
func carbon2Metric2String(metric pmetric.Metric, attributes pcommon.Map) string {
var nextLines []string

switch metric.DataType() {
case pdata.MetricDataTypeGauge:
case pmetric.MetricDataTypeGauge:
dps := metric.Gauge().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2NumberRecord(metric, attributes, dps.At(i)))
}
case pdata.MetricDataTypeSum:
case pmetric.MetricDataTypeSum:
dps := metric.Sum().DataPoints()
nextLines = make([]string, 0, dps.Len())
for i := 0; i < dps.Len(); i++ {
nextLines = append(nextLines, carbon2NumberRecord(metric, attributes, dps.At(i)))
}
// Skip complex metrics
case pdata.MetricDataTypeHistogram:
case pdata.MetricDataTypeSummary:
case pmetric.MetricDataTypeHistogram:
case pmetric.MetricDataTypeSummary:
}

return strings.Join(nextLines, "\n")
Expand Down
21 changes: 11 additions & 10 deletions pkg/exporter/sumologicexporter/carbon_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestCarbon2TagString(t *testing.T) {
Expand All @@ -40,9 +41,9 @@ func TestCarbon2TagString(t *testing.T) {
}

func TestCarbon2InvalidCharacters(t *testing.T) {
metric := pdata.NewMetric()
metric := pmetric.NewMetric()

attributes := pdata.NewAttributeMap()
attributes := pcommon.NewMap()
attributes.InsertString("= \n\r", "= \n\r")
metric.SetName("= \n\r")

Expand Down Expand Up @@ -105,49 +106,49 @@ func TestCarbonMetricDataTypeHistogram(t *testing.T) {
func TestCarbonMetrics(t *testing.T) {
type testCase struct {
name string
metricFunc func() (pdata.Metric, pdata.Map)
metricFunc func() (pmetric.Metric, pcommon.Map)
expected string
}

tests := []testCase{
{
name: "empty int gauge",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleIntGaugeMetric(false)
},
expected: "",
},
{
name: "empty double gauge",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleDoubleGaugeMetric(false)
},
expected: "",
},
{
name: "empty int sum",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleIntSumMetric(false)
},
expected: "",
},
{
name: "empty double sum",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleDoubleSumMetric(false)
},
expected: "",
},
{
name: "empty summary",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleSummaryMetric(false)
},
expected: "",
},
{
name: "empty histogram",
metricFunc: func() (pdata.Metric, pdata.Map) {
metricFunc: func() (pmetric.Metric, pcommon.Map) {
return buildExampleHistogramMetric(false)
},
expected: "",
Expand Down
25 changes: 14 additions & 11 deletions pkg/exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -181,7 +184,7 @@ func newTracesExporter(
// pushLogsData groups data with common metadata and sends them as separate batched requests.
// It returns the number of unsent logs and an error which contains a list of dropped records
// so they can be handled by OTC retry mechanism
func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) error {
func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
compr, err := se.getCompressor()
if err != nil {
return consumererror.NewLogs(err, ld)
Expand Down Expand Up @@ -212,8 +215,8 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) er
}

type droppedResourceRecords struct {
resource pdata.Resource
records []pdata.LogRecord
resource pcommon.Resource
records []plog.LogRecord
}
var (
errs []error
Expand Down Expand Up @@ -241,7 +244,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) er
}

if len(dropped) > 0 {
ld = pdata.NewLogs()
ld = plog.NewLogs()

// Move all dropped records to Logs
// NOTE: we only copy resource and log records here.
Expand All @@ -268,7 +271,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld pdata.Logs) er
// pushMetricsData groups data with common metadata and send them as separate batched requests
// it returns number of unsent metrics and error which contains list of dropped records
// so they can be handle by the OTC retry mechanism
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metrics) error {
func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
compr, err := se.getCompressor()
if err != nil {
return consumererror.NewMetrics(err, md)
Expand Down Expand Up @@ -299,8 +302,8 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metri
}

type droppedResourceMetrics struct {
resource pdata.Resource
metrics []pdata.Metric
resource pcommon.Resource
metrics []pmetric.Metric
}
var (
errs []error
Expand Down Expand Up @@ -340,7 +343,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pdata.Metri
}

if len(dropped) > 0 {
md = pdata.NewMetrics()
md = pmetric.NewMetrics()

// Move all dropped records to Metrics
// NOTE: we only copy resource and metrics here.
Expand Down Expand Up @@ -382,7 +385,7 @@ func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs
}
}

func (se *sumologicexporter) pushTracesData(ctx context.Context, td pdata.Traces) error {
func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Traces) error {
compr, err := se.getCompressor()
if err != nil {
return consumererror.NewTraces(err, td)
Expand Down Expand Up @@ -526,6 +529,6 @@ func (se *sumologicexporter) shutdown(context.Context) error {
return nil
}

func (se *sumologicexporter) dropRoutingAttribute(attr pdata.AttributeMap) {
func (se *sumologicexporter) dropRoutingAttribute(attr pcommon.Map) {
attr.Delete(se.config.DropRoutingAttribute)
}
Loading

0 comments on commit 8e0fc85

Please sign in to comment.