Skip to content

Commit

Permalink
[exporter/kafkaexporter] Add encoding extensions support
Browse files Browse the repository at this point in the history
  • Loading branch information
thmshmm committed Sep 6, 2024
1 parent 7ec6396 commit eca44da
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 116 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafkaexporter-encoding-extensions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for encoding extensions in the Kafka exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34384]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Breaking: Renames the configuration field `encoding` to `format`, and reuses the `encoding` field to specify an encoding extension.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ The following settings can be optionally configured:
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `encoding`[default: none]: if specified, uses an encoding extension to encode telemetry data. Overrides `format`.
- `format` (default = otlp_proto): The format of the traces sent to kafka. All available format types:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- The following encodings are valid *only* for **traces**.
Expand Down
7 changes: 5 additions & 2 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ type Config struct {
// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`

// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// Encoding of the messages (default "none")
Encoding *component.ID `mapstructure:"encoding"`

// FormatType of messages (default "otlp_proto")
FormatType string `mapstructure:"format"`

// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
Expand Down
8 changes: 5 additions & 3 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
Encoding: nil,
FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Expand Down Expand Up @@ -112,7 +113,8 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
Encoding: nil,
FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Expand Down Expand Up @@ -167,7 +169,7 @@ func TestLoadConfig(t *testing.T) {
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
FormatType: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Expand Down
11 changes: 6 additions & 5 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultFormatType = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "sarama"
// default from sarama.NewConfig()
Expand Down Expand Up @@ -71,7 +71,8 @@ func createDefaultConfig() component.Config {
ClientID: defaultClientID,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Encoding: nil,
FormatType: defaultFormatType,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
Metadata: Metadata{
Expand Down Expand Up @@ -102,7 +103,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
if oCfg.Encoding == "otlp_json" {
if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newTracesExporter(oCfg, set)
Expand Down Expand Up @@ -133,7 +134,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
if oCfg.Encoding == "otlp_json" {
if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newMetricsExporter(oCfg, set)
Expand Down Expand Up @@ -164,7 +165,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
if oCfg.Encoding == "otlp_json" {
if oCfg.FormatType == "otlp_json" {
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
}
exp, err := newLogsExporter(oCfg, set)
Expand Down
12 changes: 6 additions & 6 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ func TestCreateMetricExporter(t *testing.T) {
err: &net.DNSError{},
},
{
name: "default_encoding",
name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
Expand Down Expand Up @@ -126,11 +126,11 @@ func TestCreateLogExporter(t *testing.T) {
err: &net.DNSError{},
},
{
name: "default_encoding",
name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
Expand Down Expand Up @@ -191,11 +191,11 @@ func TestCreateTraceExporter(t *testing.T) {
err: &net.DNSError{},
},
{
name: "default_encoding",
name: "default_formattype",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
conf.FormatType = defaultFormatType
}),
marshalers: nil,
err: nil,
Expand Down
10 changes: 5 additions & 5 deletions exporter/kafkaexporter/jaeger_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.
return messages, errs
}

func (j jaegerMarshaler) Encoding() string {
return j.marshaler.encoding()
func (j jaegerMarshaler) FormatType() string {
return j.marshaler.formatType()
}

type jaegerSpanMarshaler interface {
marshal(span *jaegerproto.Span) ([]byte, error)
encoding() string
formatType() string
}

type jaegerProtoSpanMarshaler struct {
Expand All @@ -67,7 +67,7 @@ func (p jaegerProtoSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error
return span.Marshal()
}

func (p jaegerProtoSpanMarshaler) encoding() string {
func (p jaegerProtoSpanMarshaler) formatType() string {
return "jaeger_proto"
}

Expand All @@ -89,6 +89,6 @@ func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error)
return out.Bytes(), err
}

func (p jaegerJSONSpanMarshaler) encoding() string {
func (p jaegerJSONSpanMarshaler) formatType() string {
return "jaeger_json"
}
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/jaeger_marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestJaegerMarshaler(t *testing.T) {
messages, err := test.unmarshaler.Marshal(td, "topic")
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
assert.Equal(t, test.encoding, test.unmarshaler.FormatType())
})
}
}
55 changes: 50 additions & 5 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
var errUnrecognizedFormatType = fmt.Errorf("unrecognized format type")

// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
Expand Down Expand Up @@ -65,7 +65,22 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
// takes precedence over format type and overwrites the unmarshaler
if e.cfg.Encoding != nil {
encoding := host.GetExtensions()[*e.cfg.Encoding]
if encoding == nil {
return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
}
marshaler, ok := encoding.(ptrace.Marshaler)
if !ok {
return fmt.Errorf("extension %q is not a trace marshaler", e.cfg.Encoding)
}
e.marshaler = &tracesEncodingMarshaler{
marshaler: marshaler,
formatType: e.cfg.Encoding.String(),
}
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -107,7 +122,22 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
// takes precedence over format type and overwrites the unmarshaler
if e.cfg.Encoding != nil {
encoding := host.GetExtensions()[*e.cfg.Encoding]
if encoding == nil {
return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
}
marshaler, ok := encoding.(pmetric.Marshaler)
if !ok {
return fmt.Errorf("extension %q is not a metric marshaler", e.cfg.Encoding)
}
e.marshaler = &metricsEncodingMarshaler{
marshaler: marshaler,
formatType: e.cfg.Encoding.String(),
}
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -149,7 +179,22 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
// takes precedence over format type and overwrites the unmarshaler
if e.cfg.Encoding != nil {
encoding := host.GetExtensions()[*e.cfg.Encoding]
if encoding == nil {
return fmt.Errorf("unknown encoding %q", e.cfg.Encoding)
}
marshaler, ok := encoding.(plog.Marshaler)
if !ok {
return fmt.Errorf("extension %q is not a log marshaler", e.cfg.Encoding)
}
e.marshaler = &logsEncodingMarshaler{
marshaler: marshaler,
formatType: e.cfg.Encoding.String(),
}
}
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
Expand Down Expand Up @@ -211,7 +256,7 @@ func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProd
}

if marshaler == nil {
return nil, errUnrecognizedEncoding
return nil, errUnrecognizedFormatType
}

return &kafkaMetricsProducer{
Expand Down
Loading

0 comments on commit eca44da

Please sign in to comment.