Skip to content

Commit

Permalink
Add tests for otelcol.receiver.kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Oct 24, 2023
1 parent b8c543b commit edfc17e
Show file tree
Hide file tree
Showing 2 changed files with 376 additions and 0 deletions.
375 changes: 375 additions & 0 deletions component/otelcol/receiver/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,375 @@
package kafka_test

import (
"testing"
"time"

"github.com/grafana/agent/component/otelcol/receiver/kafka"
"github.com/grafana/river"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
"github.com/stretchr/testify/require"
)

func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
testName string
cfg string
expected kafkareceiver.Config
}{
{
testName: "Defaults",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
output {}
`,
expected: kafkareceiver.Config{
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
Topic: "otlp_spans",
Encoding: "otlp_proto",
GroupID: "otel-collector",
ClientID: "otel-collector",
InitialOffset: "latest",
Metadata: kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 3,
Backoff: 250 * time.Millisecond,
},
},
AutoCommit: kafkareceiver.AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
HeaderExtraction: kafkareceiver.HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
},
},
{
testName: "ExplicitValues_AuthPlaintext",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
topic = "test_topic"
encoding = "test_encoding"
group_id = "test_group_id"
client_id = "test_client_id"
initial_offset = "test_offset"
metadata {
include_all_topics = true
retry {
max_retries = 9
backoff = "11s"
}
}
autocommit {
enable = true
interval = "12s"
}
message_marking {
after_execution = true
include_unsuccessful = true
}
header_extraction {
extract_headers = true
headers = ["foo", "bar"]
}
output {}
`,
expected: kafkareceiver.Config{
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
Topic: "test_topic",
Encoding: "test_encoding",
GroupID: "test_group_id",
ClientID: "test_client_id",
InitialOffset: "test_offset",
Metadata: kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 9,
Backoff: 11 * time.Second,
},
},
AutoCommit: kafkareceiver.AutoCommit{
Enable: true,
Interval: 12 * time.Second,
},
MessageMarking: kafkareceiver.MessageMarking{
After: true,
OnError: true,
},
HeaderExtraction: kafkareceiver.HeaderExtraction{
ExtractHeaders: true,
Headers: []string{"foo", "bar"},
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args kafka.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*kafkareceiver.Config)

require.Equal(t, tc.expected, *actual)
})
}
}

func TestArguments_Auth(t *testing.T) {
tests := []struct {
testName string
cfg string
expected map[string]interface{}
}{
{
testName: "plain_text",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
authentication {
plaintext {
username = "test_username"
password = "test_password"
}
}
output {}
`,
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
"initial_offset": "latest",
"metadata": kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 3,
Backoff: 250 * time.Millisecond,
},
},
"autocommit": kafkareceiver.AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
"header_extraction": kafkareceiver.HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
"auth": map[string]interface{}{
"plain_text": map[string]interface{}{
"username": "test_username",
"password": "test_password",
},
},
},
},
{
testName: "sasl",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
authentication {
sasl {
username = "test_username"
password = "test_password"
mechanism = "test_mechanism"
version = 9
aws_msk {
region = "test_region"
broker_addr = "test_broker_addr"
}
}
}
output {}
`,
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
"initial_offset": "latest",
"metadata": kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 3,
Backoff: 250 * time.Millisecond,
},
},
"autocommit": kafkareceiver.AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
"header_extraction": kafkareceiver.HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
"auth": map[string]interface{}{
"sasl": map[string]interface{}{
"username": "test_username",
"password": "test_password",
"mechanism": "test_mechanism",
"version": 9,
"aws_msk": map[string]interface{}{
"region": "test_region",
"broker_addr": "test_broker_addr",
},
},
},
},
},
{
testName: "tls",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
authentication {
tls {
insecure = true
insecure_skip_verify = true
server_name = "test_server_name_override"
ca_pem = "test_ca_pem"
cert_pem = "test_cert_pem"
key_pem = "test_key_pem"
min_version = "1.1"
reload_interval = "11s"
}
}
output {}
`,
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
"initial_offset": "latest",
"metadata": kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 3,
Backoff: 250 * time.Millisecond,
},
},
"autocommit": kafkareceiver.AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
"header_extraction": kafkareceiver.HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
"auth": map[string]interface{}{
"tls": map[string]interface{}{
"insecure": true,
"insecure_skip_verify": true,
"server_name_override": "test_server_name_override",
"ca_pem": "test_ca_pem",
"cert_pem": "test_cert_pem",
"key_pem": "test_key_pem",
"min_version": "1.1",
"reload_interval": 11 * time.Second,
},
},
},
},
{
testName: "kerberos",
cfg: `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
authentication {
kerberos {
service_name = "test_service_name"
realm = "test_realm"
use_keytab = true
username = "test_username"
password = "test_password"
config_file = "test_config_filem"
keytab_file = "test_keytab_file"
}
}
output {}
`,
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
"initial_offset": "latest",
"metadata": kafkaexporter.Metadata{
Full: true,
Retry: kafkaexporter.MetadataRetry{
Max: 3,
Backoff: 250 * time.Millisecond,
},
},
"autocommit": kafkareceiver.AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
"header_extraction": kafkareceiver.HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
"auth": map[string]interface{}{
"kerberos": map[string]interface{}{
"service_name": "test_service_name",
"realm": "test_realm",
"use_keytab": true,
"username": "test_username",
"password": "test_password",
"config_file": "test_config_filem",
"keytab_file": "test_keytab_file",
},
},
},
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args kafka.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*kafkareceiver.Config)

var expected kafkareceiver.Config
err = mapstructure.Decode(tc.expected, &expected)
require.NoError(t, err)

require.Equal(t, expected, *actual)
})
}
}
1 change: 1 addition & 0 deletions docs/developer/updating-otel.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Unfortunately, updating Otel dependencies is not straightforward:
* Search the Agent repository for the old version (e.g. "0.87") to find code and
documentation which also needs updating.
* Update the `OTEL_VERSION` parameter in the `docs/sources/_index.md.t` file.
Then run `make generate-versioned-files`, which will update `docs/sources/_index.md`.
5. Some Agent components reuse OpenTelemetry code, but do not import it:
* `otelcol.extension.jaeger_remote_sampling`: a lot of this code has
been copy-pasted from Otel and modified slightly to fit the Agent's needs.
Expand Down

0 comments on commit edfc17e

Please sign in to comment.