From eca44dae016ae246b219b676af32938d26bdc784 Mon Sep 17 00:00:00 2001
From: Thomas Hamm
Date: Thu, 1 Aug 2024 18:21:11 +0200
Subject: [PATCH] [exporter/kafkaexporter] Add encoding extensions support
---
.../kafkaexporter-encoding-extensions.yaml | 27 ++++
exporter/kafkaexporter/README.md | 3 +-
exporter/kafkaexporter/config.go | 7 +-
exporter/kafkaexporter/config_test.go | 8 +-
exporter/kafkaexporter/factory.go | 11 +-
exporter/kafkaexporter/factory_test.go | 12 +-
exporter/kafkaexporter/jaeger_marshaler.go | 10 +-
.../kafkaexporter/jaeger_marshaler_test.go | 2 +-
exporter/kafkaexporter/kafka_exporter.go | 55 +++++++-
exporter/kafkaexporter/kafka_exporter_test.go | 54 ++++----
exporter/kafkaexporter/marshaler.go | 119 ++++++++++++++----
exporter/kafkaexporter/marshaler_test.go | 42 +++----
exporter/kafkaexporter/pdata_marshaler.go | 30 ++---
exporter/kafkaexporter/raw_marshaler.go | 2 +-
14 files changed, 266 insertions(+), 116 deletions(-)
create mode 100644 .chloggen/kafkaexporter-encoding-extensions.yaml
diff --git a/.chloggen/kafkaexporter-encoding-extensions.yaml b/.chloggen/kafkaexporter-encoding-extensions.yaml
new file mode 100644
index 0000000000000..3a4a24ad3cc4c
--- /dev/null
+++ b/.chloggen/kafkaexporter-encoding-extensions.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: breaking
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: kafkaexporter
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add support for encoding extensions in the Kafka exporter.
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [34384]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext: |
+ Breaking: Renames the configuration field `encoding` to `format`, and reuses the `encoding` field to specify an encoding extension.
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [user]
diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md
index c0ea3212cd9e4..aabfd0b0061bb 100644
--- a/exporter/kafkaexporter/README.md
+++ b/exporter/kafkaexporter/README.md
@@ -26,7 +26,8 @@ The following settings can be optionally configured:
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
-- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
+- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`.
+- `format` (default = otlp_proto): The format of the traces sent to kafka. All available format types:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- The following encodings are valid *only* for **traces**.
diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go
index e1c433a268d85..68b04c80117e5 100644
--- a/exporter/kafkaexporter/config.go
+++ b/exporter/kafkaexporter/config.go
@@ -43,8 +43,11 @@ type Config struct {
// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
- // Encoding of messages (default "otlp_proto")
- Encoding string `mapstructure:"encoding"`
+ // Encoding of the messages (default "none")
+ Encoding *component.ID `mapstructure:"encoding"`
+
+ // FormatType of messages (default "otlp_proto")
+ FormatType string `mapstructure:"format"`
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go
index da2cebf8e808c..29564bc502a95 100644
--- a/exporter/kafkaexporter/config_test.go
+++ b/exporter/kafkaexporter/config_test.go
@@ -56,7 +56,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
- Encoding: "otlp_proto",
+ Encoding: nil,
+ FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
@@ -112,7 +113,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
- Encoding: "otlp_proto",
+ Encoding: nil,
+ FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
@@ -167,7 +169,7 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
- Encoding: "otlp_proto",
+ FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go
index 3334e3b268fc0..49b77bc209d5b 100644
--- a/exporter/kafkaexporter/factory.go
+++ b/exporter/kafkaexporter/factory.go
@@ -21,7 +21,7 @@ const (
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
- defaultEncoding = "otlp_proto"
+ defaultFormatType = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "sarama"
// default from sarama.NewConfig()
@@ -71,7 +71,8 @@ func createDefaultConfig() component.Config {
ClientID: defaultClientID,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
- Encoding: defaultEncoding,
+ Encoding: nil,
+ FormatType: defaultFormatType,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
Metadata: Metadata{
@@ -102,7 +103,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
- if oCfg.Encoding == "otlp_json" {
+ if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(oCfg, set)
@@ -133,7 +134,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
- if oCfg.Encoding == "otlp_json" {
+ if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(oCfg, set)
@@ -164,7 +165,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
- if oCfg.Encoding == "otlp_json" {
+ if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(oCfg, set)
diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go
index 55dce9c8505d1..87b4e4ed541de 100644
--- a/exporter/kafkaexporter/factory_test.go
+++ b/exporter/kafkaexporter/factory_test.go
@@ -61,11 +61,11 @@ func TestCreateMetricExporter(t *testing.T) {
err: &net.DNSError{},
},
{
- name: "default_encoding",
+ name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
+ conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
@@ -126,11 +126,11 @@ func TestCreateLogExporter(t *testing.T) {
err: &net.DNSError{},
},
{
- name: "default_encoding",
+ name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
+ conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
@@ -191,11 +191,11 @@ func TestCreateTraceExporter(t *testing.T) {
err: &net.DNSError{},
},
{
- name: "default_encoding",
+ name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
- conf.Encoding = defaultEncoding
+ conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
diff --git a/exporter/kafkaexporter/jaeger_marshaler.go b/exporter/kafkaexporter/jaeger_marshaler.go
index abc73c22f18a7..3afc4f0e4ee28 100644
--- a/exporter/kafkaexporter/jaeger_marshaler.go
+++ b/exporter/kafkaexporter/jaeger_marshaler.go
@@ -49,13 +49,13 @@ func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.
return messages, errs
}
-func (j jaegerMarshaler) Encoding() string {
- return j.marshaler.encoding()
+func (j jaegerMarshaler) FormatType() string {
+ return j.marshaler.formatType()
}
type jaegerSpanMarshaler interface {
marshal(span *jaegerproto.Span) ([]byte, error)
- encoding() string
+ formatType() string
}
type jaegerProtoSpanMarshaler struct {
@@ -67,7 +67,7 @@ func (p jaegerProtoSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error
return span.Marshal()
}
-func (p jaegerProtoSpanMarshaler) encoding() string {
+func (p jaegerProtoSpanMarshaler) formatType() string {
return "jaeger_proto"
}
@@ -89,6 +89,6 @@ func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error)
return out.Bytes(), err
}
-func (p jaegerJSONSpanMarshaler) encoding() string {
+func (p jaegerJSONSpanMarshaler) formatType() string {
return "jaeger_json"
}
diff --git a/exporter/kafkaexporter/jaeger_marshaler_test.go b/exporter/kafkaexporter/jaeger_marshaler_test.go
index 81a310c4a3534..bda1f300cf3e5 100644
--- a/exporter/kafkaexporter/jaeger_marshaler_test.go
+++ b/exporter/kafkaexporter/jaeger_marshaler_test.go
@@ -65,7 +65,7 @@ func TestJaegerMarshaler(t *testing.T) {
messages, err := test.unmarshaler.Marshal(td, "topic")
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
- assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
+ assert.Equal(t, test.encoding, test.unmarshaler.FormatType())
})
}
}
diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go
index 5e57e5d636783..ba846b9dc710d 100644
--- a/exporter/kafkaexporter/kafka_exporter.go
+++ b/exporter/kafkaexporter/kafka_exporter.go
@@ -21,7 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)
-var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
+var errUnrecognizedFormatType = fmt.Errorf("unrecognized format type")
// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
@@ -65,7 +65,22 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}
-func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
+func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
+ // takes precedence over format type and overwrites the unmarshaler
+ if e.cfg.Encoding != nil {
+ encoding := host.GetExtensions()[*e.cfg.Encoding]
+ if encoding == nil {
+ return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
+ }
+ marshaler, ok := encoding.(ptrace.Marshaler)
+ if !ok {
+ return fmt.Errorf("extension %q is not a trace marshaler", e.cfg.Encoding)
+ }
+ e.marshaler = &tracesEncodingMarshaler{
+ marshaler: marshaler,
+ formatType: e.cfg.Encoding.String(),
+ }
+ }
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
@@ -107,7 +122,22 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}
-func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
+func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
+ // takes precedence over format type and overwrites the unmarshaler
+ if e.cfg.Encoding != nil {
+ encoding := host.GetExtensions()[*e.cfg.Encoding]
+ if encoding == nil {
+ return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
+ }
+ marshaler, ok := encoding.(pmetric.Marshaler)
+ if !ok {
+ return fmt.Errorf("extension %q is not a metric marshaler", e.cfg.Encoding)
+ }
+ e.marshaler = &metricsEncodingMarshaler{
+ marshaler: marshaler,
+ formatType: e.cfg.Encoding.String(),
+ }
+ }
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
@@ -149,7 +179,22 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}
-func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
+func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
+ // takes precedence over format type and overwrites the unmarshaler
+ if e.cfg.Encoding != nil {
+ encoding := host.GetExtensions()[*e.cfg.Encoding]
+ if encoding == nil {
+ return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
+ }
+ marshaler, ok := encoding.(plog.Marshaler)
+ if !ok {
+ return fmt.Errorf("extension %q is not a log marshaler", e.cfg.Encoding)
+ }
+ e.marshaler = &logsEncodingMarshaler{
+ marshaler: marshaler,
+ formatType: e.cfg.Encoding.String(),
+ }
+ }
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
@@ -211,7 +256,7 @@ func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProd
}
if marshaler == nil {
- return nil, errUnrecognizedEncoding
+ return nil, errUnrecognizedFormatType
}
return &kafkaMetricsProducer{
diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go
index aff1fdb1fceaf..c3958a8ec0292 100644
--- a/exporter/kafkaexporter/kafka_exporter_test.go
+++ b/exporter/kafkaexporter/kafka_exporter_test.go
@@ -25,7 +25,7 @@ import (
)
func TestNewExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
+ c := Config{ProtocolVersion: "0.0.0", FormatType: defaultFormatType}
texp, err := newTracesExporter(c, exportertest.NewNopSettings())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
@@ -33,14 +33,14 @@ func TestNewExporter_err_version(t *testing.T) {
}
func TestNewExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "foo"}
+ c := Config{FormatType: "foo"}
texp, err := newTracesExporter(c, exportertest.NewNopSettings())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
+ assert.EqualError(t, err, errUnrecognizedFormatType.Error())
assert.Nil(t, texp)
}
func TestNewMetricsExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
+ c := Config{ProtocolVersion: "0.0.0", FormatType: defaultFormatType}
mexp, err := newMetricsExporter(c, exportertest.NewNopSettings())
require.NoError(t, err)
err = mexp.start(context.Background(), componenttest.NewNopHost())
@@ -48,21 +48,21 @@ func TestNewMetricsExporter_err_version(t *testing.T) {
}
func TestNewMetricsExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "bar"}
+ c := Config{FormatType: "bar"}
mexp, err := newMetricsExporter(c, exportertest.NewNopSettings())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
+ assert.EqualError(t, err, errUnrecognizedFormatType.Error())
assert.Nil(t, mexp)
}
func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {
- c := Config{Encoding: "jaeger_proto"}
+ c := Config{FormatType: "jaeger_proto"}
mexp, err := newMetricsExporter(c, exportertest.NewNopSettings())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
+ assert.EqualError(t, err, errUnrecognizedFormatType.Error())
assert.Nil(t, mexp)
}
func TestNewLogsExporter_err_version(t *testing.T) {
- c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
+ c := Config{ProtocolVersion: "0.0.0", FormatType: defaultFormatType}
lexp, err := newLogsExporter(c, exportertest.NewNopSettings())
require.NoError(t, err)
err = lexp.start(context.Background(), componenttest.NewNopHost())
@@ -70,16 +70,16 @@ func TestNewLogsExporter_err_version(t *testing.T) {
}
func TestNewLogsExporter_err_encoding(t *testing.T) {
- c := Config{Encoding: "bar"}
+ c := Config{FormatType: "bar"}
mexp, err := newLogsExporter(c, exportertest.NewNopSettings())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
+ assert.EqualError(t, err, errUnrecognizedFormatType.Error())
assert.Nil(t, mexp)
}
func TestNewLogsExporter_err_traces_encoding(t *testing.T) {
- c := Config{Encoding: "jaeger_proto"}
+ c := Config{FormatType: "jaeger_proto"}
mexp, err := newLogsExporter(c, exportertest.NewNopSettings())
- assert.EqualError(t, err, errUnrecognizedEncoding.Error())
+ assert.EqualError(t, err, errUnrecognizedFormatType.Error())
assert.Nil(t, mexp)
}
@@ -93,7 +93,7 @@ func TestNewExporter_err_auth_type(t *testing.T) {
},
},
},
- Encoding: defaultEncoding,
+ FormatType: defaultFormatType,
Metadata: Metadata{
Full: false,
},
@@ -121,7 +121,7 @@ func TestNewExporter_err_auth_type(t *testing.T) {
func TestNewExporter_err_compression(t *testing.T) {
c := Config{
- Encoding: defaultEncoding,
+ FormatType: defaultFormatType,
Producer: Producer{
Compression: "idk",
},
@@ -140,7 +140,7 @@ func TestTracesPusher(t *testing.T) {
p := kafkaTracesProducer{
producer: producer,
- marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -159,7 +159,7 @@ func TestTracesPusher_attr(t *testing.T) {
TopicFromAttribute: "kafka_topic",
},
producer: producer,
- marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -176,7 +176,7 @@ func TestTracesPusher_err(t *testing.T) {
p := kafkaTracesProducer{
producer: producer,
- marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultFormatType, false),
logger: zap.NewNop(),
}
t.Cleanup(func() {
@@ -206,7 +206,7 @@ func TestMetricsDataPusher(t *testing.T) {
p := kafkaMetricsProducer{
producer: producer,
- marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -225,7 +225,7 @@ func TestMetricsDataPusher_attr(t *testing.T) {
TopicFromAttribute: "kafka_topic",
},
producer: producer,
- marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -242,7 +242,7 @@ func TestMetricsDataPusher_err(t *testing.T) {
p := kafkaMetricsProducer{
producer: producer,
- marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultFormatType, false),
logger: zap.NewNop(),
}
t.Cleanup(func() {
@@ -272,7 +272,7 @@ func TestLogsDataPusher(t *testing.T) {
p := kafkaLogsProducer{
producer: producer,
- marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -291,7 +291,7 @@ func TestLogsDataPusher_attr(t *testing.T) {
TopicFromAttribute: "kafka_topic",
},
producer: producer,
- marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultFormatType, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
@@ -308,7 +308,7 @@ func TestLogsDataPusher_err(t *testing.T) {
p := kafkaLogsProducer{
producer: producer,
- marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
+ marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultFormatType, false),
logger: zap.NewNop(),
}
t.Cleanup(func() {
@@ -347,7 +347,7 @@ func (e metricsErrorMarshaler) Marshal(_ pmetric.Metrics, _ string) ([]*sarama.P
return nil, e.err
}
-func (e metricsErrorMarshaler) Encoding() string {
+func (e metricsErrorMarshaler) FormatType() string {
panic("implement me")
}
@@ -357,7 +357,7 @@ func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.Prod
return nil, e.err
}
-func (e tracesErrorMarshaler) Encoding() string {
+func (e tracesErrorMarshaler) FormatType() string {
panic("implement me")
}
@@ -365,7 +365,7 @@ func (e logsErrorMarshaler) Marshal(_ plog.Logs, _ string) ([]*sarama.ProducerMe
return nil, e.err
}
-func (e logsErrorMarshaler) Encoding() string {
+func (e logsErrorMarshaler) FormatType() string {
panic("implement me")
}
diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go
index 89b4bcc3893c1..4497a1b046f1c 100644
--- a/exporter/kafkaexporter/marshaler.go
+++ b/exporter/kafkaexporter/marshaler.go
@@ -4,6 +4,8 @@
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
import (
+ "fmt"
+
"github.com/IBM/sarama"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -17,8 +19,8 @@ type TracesMarshaler interface {
// Marshal serializes spans into sarama's ProducerMessages
Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error)
- // Encoding returns encoding name
- Encoding() string
+ // FormatType returns format type
+ FormatType() string
}
// MetricsMarshaler marshals metrics into Message array
@@ -26,8 +28,8 @@ type MetricsMarshaler interface {
// Marshal serializes metrics into sarama's ProducerMessages
Marshal(metrics pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error)
- // Encoding returns encoding name
- Encoding() string
+ // FormatType returns format type
+ FormatType() string
}
// LogsMarshaler marshals logs into Message array
@@ -35,65 +37,134 @@ type LogsMarshaler interface {
// Marshal serializes logs into sarama's ProducerMessages
Marshal(logs plog.Logs, topic string) ([]*sarama.ProducerMessage, error)
- // Encoding returns encoding name
- Encoding() string
+ // FormatType returns format type
+ FormatType() string
}
// creates TracesMarshaler based on the provided config
func createTracesMarshaler(config Config) (TracesMarshaler, error) {
- encoding := config.Encoding
+ formatType := config.FormatType
partitionTracesByID := config.PartitionTracesByID
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
- switch encoding {
- case defaultEncoding:
- return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID), nil
+ switch formatType {
+ case defaultFormatType:
+ return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultFormatType, partitionTracesByID), nil
case "otlp_json":
return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID), nil
case "zipkin_proto":
return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID), nil
case "zipkin_json":
return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID), nil
- case jaegerProtoSpanMarshaler{}.encoding():
+ case jaegerProtoSpanMarshaler{}.formatType():
return jaegerProto, nil
- case jaegerJSON.Encoding():
+ case jaegerJSON.FormatType():
return jaegerJSON, nil
default:
- return nil, errUnrecognizedEncoding
+ return nil, errUnrecognizedFormatType
}
}
// creates MetricsMarshaler based on the provided config
func createMetricMarshaler(config Config) (MetricsMarshaler, error) {
- encoding := config.Encoding
+ formatType := config.FormatType
partitionMetricsByResources := config.PartitionMetricsByResourceAttributes
- switch encoding {
- case defaultEncoding:
- return newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, partitionMetricsByResources), nil
+ switch formatType {
+ case defaultFormatType:
+ return newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultFormatType, partitionMetricsByResources), nil
case "otlp_json":
return newPdataMetricsMarshaler(&pmetric.JSONMarshaler{}, "otlp_json", partitionMetricsByResources), nil
default:
- return nil, errUnrecognizedEncoding
+ return nil, errUnrecognizedFormatType
}
}
// creates LogsMarshalers based on the provided config
func createLogMarshaler(config Config) (LogsMarshaler, error) {
- encoding := config.Encoding
+ formatType := config.FormatType
partitionLogsByAttributes := config.PartitionLogsByResourceAttributes
raw := newRawMarshaler()
- switch encoding {
- case defaultEncoding:
- return newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, partitionLogsByAttributes), nil
+ switch formatType {
+ case defaultFormatType:
+ return newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultFormatType, partitionLogsByAttributes), nil
case "otlp_json":
return newPdataLogsMarshaler(&plog.JSONMarshaler{}, "otlp_json", partitionLogsByAttributes), nil
- case raw.Encoding():
+ case raw.FormatType():
return raw, nil
default:
- return nil, errUnrecognizedEncoding
+ return nil, errUnrecognizedFormatType
+ }
+}
+
+// tracesEncodingMarshaler is a wrapper around ptrace.Marshaler that implements TracesMarshaler.
+type tracesEncodingMarshaler struct {
+ marshaler ptrace.Marshaler
+ formatType string
+}
+
+func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
+ var messages []*sarama.ProducerMessage
+ data, err := t.marshaler.MarshalTraces(traces)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal traces: %w", err)
+ }
+ messages = append(messages, &sarama.ProducerMessage{
+ Topic: topic,
+ Value: sarama.ByteEncoder(data),
+ })
+ return messages, nil
+}
+
+func (t *tracesEncodingMarshaler) FormatType() string {
+ return t.formatType
+}
+
+// metricsEncodingMarshaler is a wrapper around pmetric.Marshaler that implements MetricsMarshaler.
+type metricsEncodingMarshaler struct {
+ marshaler pmetric.Marshaler
+ formatType string
+}
+
+func (m *metricsEncodingMarshaler) Marshal(metrics pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
+ var messages []*sarama.ProducerMessage
+ data, err := m.marshaler.MarshalMetrics(metrics)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal metrics: %w", err)
}
+ messages = append(messages, &sarama.ProducerMessage{
+ Topic: topic,
+ Value: sarama.ByteEncoder(data),
+ })
+ return messages, nil
+}
+
+func (m *metricsEncodingMarshaler) FormatType() string {
+ return m.formatType
+}
+
+// logsEncodingMarshaler is a wrapper around plog.Marshaler that implements LogsMarshaler.
+type logsEncodingMarshaler struct {
+ marshaler plog.Marshaler
+ formatType string
+}
+
+func (l *logsEncodingMarshaler) Marshal(logs plog.Logs, topic string) ([]*sarama.ProducerMessage, error) {
+ var messages []*sarama.ProducerMessage
+ data, err := l.marshaler.MarshalLogs(logs)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal logs: %w", err)
+ }
+ messages = append(messages, &sarama.ProducerMessage{
+ Topic: topic,
+ Value: sarama.ByteEncoder(data),
+ })
+ return messages, nil
+}
+
+func (l *logsEncodingMarshaler) FormatType() string {
+ return l.formatType
}
diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go
index a2ea4cd32f0b9..60faff87345a5 100644
--- a/exporter/kafkaexporter/marshaler_test.go
+++ b/exporter/kafkaexporter/marshaler_test.go
@@ -21,7 +21,7 @@ import (
)
func TestDefaultTracesMarshalers(t *testing.T) {
- expectedEncodings := []string{
+ expectedFormatTypes := []string{
"otlp_proto",
"otlp_json",
"zipkin_proto",
@@ -29,10 +29,10 @@ func TestDefaultTracesMarshalers(t *testing.T) {
"jaeger_proto",
"jaeger_json",
}
- for _, e := range expectedEncodings {
- t.Run(e, func(t *testing.T) {
+ for _, ft := range expectedFormatTypes {
+ t.Run(ft, func(t *testing.T) {
m, err := createTracesMarshaler(Config{
- Encoding: e,
+ FormatType: ft,
})
require.NoError(t, err)
assert.NotNil(t, m)
@@ -41,14 +41,14 @@ func TestDefaultTracesMarshalers(t *testing.T) {
}
func TestDefaultMetricsMarshalers(t *testing.T) {
- expectedEncodings := []string{
+ expectedFormatTypes := []string{
"otlp_proto",
"otlp_json",
}
- for _, e := range expectedEncodings {
- t.Run(e, func(t *testing.T) {
+ for _, ft := range expectedFormatTypes {
+ t.Run(ft, func(t *testing.T) {
m, err := createMetricMarshaler(Config{
- Encoding: e,
+ FormatType: ft,
})
require.NoError(t, err)
assert.NotNil(t, m)
@@ -57,15 +57,15 @@ func TestDefaultMetricsMarshalers(t *testing.T) {
}
func TestDefaultLogsMarshalers(t *testing.T) {
- expectedEncodings := []string{
+ expectedFormatTypes := []string{
"otlp_proto",
"otlp_json",
"raw",
}
- for _, e := range expectedEncodings {
- t.Run(e, func(t *testing.T) {
+ for _, ft := range expectedFormatTypes {
+ t.Run(ft, func(t *testing.T) {
m, err := createLogMarshaler(Config{
- Encoding: e,
+ FormatType: ft,
})
require.NoError(t, err)
assert.NotNil(t, m)
@@ -119,7 +119,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) {
marshaler, err := createMetricMarshaler(
Config{
- Encoding: "otlp_json",
+ FormatType: "otlp_json",
PartitionMetricsByResourceAttributes: tt.partitionByResources,
})
require.NoError(t, err)
@@ -180,7 +180,7 @@ func TestOTLPLogsJsonMarshaling(t *testing.T) {
marshaler, err := createLogMarshaler(
Config{
- Encoding: "otlp_json",
+ FormatType: "otlp_json",
PartitionLogsByResourceAttributes: tt.partitionByResources,
})
require.NoError(t, err)
@@ -442,26 +442,26 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
keyedZipkinJSONResult[1] = keyedZipkinJSON2
tests := []struct {
- encoding string
+ formatType string
partitionTracesByID bool
numExpectedMessages int
expectedJSON []any
expectedMessageKey []sarama.Encoder
unmarshaled any
}{
- {encoding: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}},
- {encoding: "otlp_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}},
- {encoding: "zipkin_json", numExpectedMessages: 1, expectedJSON: unkeyedZipkinJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: []map[string]any{}},
- {encoding: "zipkin_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}},
+ {formatType: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}},
+ {formatType: "otlp_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}},
+ {formatType: "zipkin_json", numExpectedMessages: 1, expectedJSON: unkeyedZipkinJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: []map[string]any{}},
+ {formatType: "zipkin_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}},
}
for _, test := range tests {
marshaler, err := createTracesMarshaler(Config{
- Encoding: test.encoding,
+ FormatType: test.formatType,
PartitionTracesByID: test.partitionTracesByID,
})
- require.NoError(t, err, fmt.Sprintf("Must have %s marshaler", test.encoding))
+ require.NoError(t, err, fmt.Sprintf("Must have %s marshaler", test.formatType))
msg, err := marshaler.Marshal(traces, t.Name())
require.NoError(t, err, "Must have marshaled the data without error")
diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go
index 72a90b54fb16a..9897b59aa4782 100644
--- a/exporter/kafkaexporter/pdata_marshaler.go
+++ b/exporter/kafkaexporter/pdata_marshaler.go
@@ -16,7 +16,7 @@ import (
type pdataLogsMarshaler struct {
marshaler plog.Marshaler
- encoding string
+ formatType string
partitionedByResources bool
}
@@ -55,21 +55,21 @@ func (p pdataLogsMarshaler) Marshal(ld plog.Logs, topic string) ([]*sarama.Produ
return msgs, nil
}
-func (p pdataLogsMarshaler) Encoding() string {
- return p.encoding
+func (p pdataLogsMarshaler) FormatType() string {
+ return p.formatType
}
-func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string, partitionedByResources bool) LogsMarshaler {
+func newPdataLogsMarshaler(marshaler plog.Marshaler, formatType string, partitionedByResources bool) LogsMarshaler {
return pdataLogsMarshaler{
marshaler: marshaler,
- encoding: encoding,
+ formatType: formatType,
partitionedByResources: partitionedByResources,
}
}
type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
- encoding string
+ formatType string
partitionedByResources bool
}
@@ -109,21 +109,21 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar
return msgs, nil
}
-func (p pdataMetricsMarshaler) Encoding() string {
- return p.encoding
+func (p pdataMetricsMarshaler) FormatType() string {
+ return p.formatType
}
-func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string, partitionedByResources bool) MetricsMarshaler {
+func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, formatType string, partitionedByResources bool) MetricsMarshaler {
return &pdataMetricsMarshaler{
marshaler: marshaler,
- encoding: encoding,
+ formatType: formatType,
partitionedByResources: partitionedByResources,
}
}
type pdataTracesMarshaler struct {
marshaler ptrace.Marshaler
- encoding string
+ formatType string
partitionedByTraceID bool
}
@@ -156,14 +156,14 @@ func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*saram
return msgs, nil
}
-func (p *pdataTracesMarshaler) Encoding() string {
- return p.encoding
+func (p *pdataTracesMarshaler) FormatType() string {
+ return p.formatType
}
-func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool) TracesMarshaler {
+func newPdataTracesMarshaler(marshaler ptrace.Marshaler, formatType string, partitionedByTraceID bool) TracesMarshaler {
return &pdataTracesMarshaler{
marshaler: marshaler,
- encoding: encoding,
+ formatType: formatType,
partitionedByTraceID: partitionedByTraceID,
}
}
diff --git a/exporter/kafkaexporter/raw_marshaler.go b/exporter/kafkaexporter/raw_marshaler.go
index 166be57f3e068..1343d05b8aa42 100644
--- a/exporter/kafkaexporter/raw_marshaler.go
+++ b/exporter/kafkaexporter/raw_marshaler.go
@@ -79,6 +79,6 @@ func (r rawMarshaler) interfaceAsBytes(value any) ([]byte, error) {
return res, err
}
-func (r rawMarshaler) Encoding() string {
+func (r rawMarshaler) FormatType() string {
return "raw"
}