Skip to content

Commit

Permalink
Upgrade Agent to Collector 0.87 (grafana#5529)
Browse files Browse the repository at this point in the history
* Upgrade Agent to Collector 0.87

* Parametrize the OTel version in docs.

* Document another batch processor metric

* Don't accept routing keys for metrics.

* Add tests for otelcol.receiver.kafka

---------

Co-authored-by: Mischa Thompson <[email protected]>
  • Loading branch information
ptodev and spartan0x117 authored Oct 24, 2023
1 parent 4c5ae43 commit b307c02
Show file tree
Hide file tree
Showing 28 changed files with 1,000 additions and 491 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ Main (unreleased)

- Fix validation issue with ServiceMonitors when scrape timeout is greater than interval. (@captncraig)

- Static mode's spanmetrics processor will now prune histograms when the dimension cache is pruned.
Dimension cache was always pruned but histograms were not being pruned. This caused metric series
created by the spanmetrics processor to grow unbounded. Only static mode has this issue. Flow mode's
`otelcol.connector.spanmetrics` does not have this bug. (@nijave)

### Enhancements

- The `loki.write` WAL now has snappy compression enabled by default. (@thepalbi)
Expand All @@ -75,6 +80,11 @@ Main (unreleased)

- The `loki.source.docker` component now allows connecting to Docker daemons
over HTTP(S) and setting up TLS credentials. (@tpaschalis)

- Upgrade OpenTelemetry Collector packages to version 0.87 (@ptodev):
- `otelcol.receiver.kafka` has a new `header_extraction` block to extract headers from Kafka records.
- `otelcol.receiver.kafka` has a new `version` argument to change the version of
the SASL Protocol for SASL authentication.

v0.37.2 (2023-10-16)
-----------------
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (a *Auth) Update(args component.Arguments) error {

TracerProvider: a.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (p *Connector) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/connector/spanmetrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// The unit is a private type in an internal Otel package,
// so we need to convert it to a map and then back to the internal type.
// ConvertMetricUnit matches the Unit type in this internal package:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/connector/spanmetricsconnector/internal/metrics/unit.go
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.87.0/connector/spanmetricsconnector/internal/metrics/unit.go
func ConvertMetricUnit(unit string) (map[string]interface{}, error) {
switch unit {
case MetricsUnitMilliseconds:
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (e *Exporter) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metricOpts...),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
18 changes: 18 additions & 0 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package loadbalancing

import (
"fmt"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Arguments struct {
var (
_ exporter.Arguments = Arguments{}
_ river.Defaulter = &Arguments{}
_ river.Validator = &Arguments{}
)

var (
Expand All @@ -72,6 +74,22 @@ func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
//TODO(ptodev): Add support for "resource" and "metric" routing keys later.
// The reason we can't add them yet is that otelcol.exporter.loadbalancing
// is labeled as "beta", but those routing keys are experimental.
// We need a way to label otelcol.exporter.loadbalancing as "beta"
// for logs and traces, but "experimental" for metrics.
switch args.RoutingKey {
case "service", "traceID":
// The routing key is valid.
default:
return fmt.Errorf("invalid routing key %q", args.RoutingKey)
}
return nil
}

// Convert implements exporter.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &loadbalancingexporter.Config{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (e *Extension) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (p *Processor) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
125 changes: 77 additions & 48 deletions component/otelcol/receiver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/agent/component/otelcol/receiver"
otel_service "github.com/grafana/agent/service/otel"
"github.com/grafana/river/rivertypes"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -38,10 +39,11 @@ type Arguments struct {
ClientID string `river:"client_id,attr,optional"`
InitialOffset string `river:"initial_offset,attr,optional"`

Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
HeaderExtraction HeaderExtraction `river:"header_extraction,block,optional"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcol.DebugMetricsArguments `river:"debug_metrics,block,optional"`
Expand Down Expand Up @@ -79,6 +81,10 @@ var DefaultArguments = Arguments{
AfterExecution: false,
IncludeUnsuccessful: false,
},
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
}

// SetToDefault implements river.Defaulter.
Expand All @@ -88,20 +94,28 @@ func (args *Arguments) SetToDefault() {

// Convert implements receiver.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &kafkareceiver.Config{
Brokers: args.Brokers,
ProtocolVersion: args.ProtocolVersion,
Topic: args.Topic,
Encoding: args.Encoding,
GroupID: args.GroupID,
ClientID: args.ClientID,
InitialOffset: args.InitialOffset,

Authentication: args.Authentication.Convert(),
Metadata: args.Metadata.Convert(),
AutoCommit: args.AutoCommit.Convert(),
MessageMarking: args.MessageMarking.Convert(),
}, nil
input := make(map[string]interface{})
input["auth"] = args.Authentication.Convert()

var result kafkareceiver.Config
err := mapstructure.Decode(input, &result)
if err != nil {
return nil, err
}

result.Brokers = args.Brokers
result.ProtocolVersion = args.ProtocolVersion
result.Topic = args.Topic
result.Encoding = args.Encoding
result.GroupID = args.GroupID
result.ClientID = args.ClientID
result.InitialOffset = args.InitialOffset
result.Metadata = args.Metadata.Convert()
result.AutoCommit = args.AutoCommit.Convert()
result.MessageMarking = args.MessageMarking.Convert()
result.HeaderExtraction = args.HeaderExtraction.Convert()

return &result, nil
}

// Extensions implements receiver.Arguments.
Expand All @@ -128,26 +142,26 @@ type AuthenticationArguments struct {
}

// Convert converts args into the upstream type.
func (args AuthenticationArguments) Convert() kafkaexporter.Authentication {
var res kafkaexporter.Authentication
func (args AuthenticationArguments) Convert() map[string]interface{} {
auth := make(map[string]interface{})

if args.Plaintext != nil {
conv := args.Plaintext.Convert()
res.PlainText = &conv
auth["plain_text"] = &conv
}
if args.SASL != nil {
conv := args.SASL.Convert()
res.SASL = &conv
auth["sasl"] = &conv
}
if args.TLS != nil {
res.TLS = args.TLS.Convert()
auth["tls"] = args.TLS.Convert()
}
if args.Kerberos != nil {
conv := args.Kerberos.Convert()
res.Kerberos = &conv
auth["kerberos"] = &conv
}

return res
return auth
}

// PlaintextArguments configures plaintext authentication against the Kafka
Expand All @@ -158,10 +172,10 @@ type PlaintextArguments struct {
}

// Convert converts args into the upstream type.
func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig {
return kafkaexporter.PlainTextConfig{
Username: args.Username,
Password: string(args.Password),
func (args PlaintextArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
}
}

Expand All @@ -170,16 +184,18 @@ type SASLArguments struct {
Username string `river:"username,attr"`
Password rivertypes.Secret `river:"password,attr"`
Mechanism string `river:"mechanism,attr"`
Version int `river:"version,attr,optional"`
AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"`
}

// Convert converts args into the upstream type.
func (args SASLArguments) Convert() kafkaexporter.SASLConfig {
return kafkaexporter.SASLConfig{
Username: args.Username,
Password: string(args.Password),
Mechanism: args.Mechanism,
AWSMSK: args.AWSMSK.Convert(),
func (args SASLArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
"mechanism": args.Mechanism,
"version": args.Version,
"aws_msk": args.AWSMSK.Convert(),
}
}

Expand All @@ -191,10 +207,10 @@ type AWSMSKArguments struct {
}

// Convert converts args into the upstream type.
func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig {
return kafkaexporter.AWSMSKConfig{
Region: args.Region,
BrokerAddr: args.BrokerAddr,
func (args AWSMSKArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"region": args.Region,
"broker_addr": args.BrokerAddr,
}
}

Expand All @@ -211,15 +227,15 @@ type KerberosArguments struct {
}

// Convert converts args into the upstream type.
func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig {
return kafkaexporter.KerberosConfig{
ServiceName: args.ServiceName,
Realm: args.Realm,
UseKeyTab: args.UseKeyTab,
Username: args.Username,
Password: string(args.Password),
ConfigPath: args.ConfigPath,
KeyTabPath: args.KeyTabPath,
func (args KerberosArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"service_name": args.ServiceName,
"realm": args.Realm,
"use_keytab": args.UseKeyTab,
"username": args.Username,
"password": string(args.Password),
"config_file": args.ConfigPath,
"keytab_file": args.KeyTabPath,
}
}

Expand Down Expand Up @@ -283,6 +299,19 @@ func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking {
}
}

type HeaderExtraction struct {
ExtractHeaders bool `river:"extract_headers,attr,optional"`
Headers []string `river:"headers,attr,optional"`
}

// Convert converts HeaderExtraction into the upstream type.
func (h HeaderExtraction) Convert() kafkareceiver.HeaderExtraction {
return kafkareceiver.HeaderExtraction{
ExtractHeaders: h.ExtractHeaders,
Headers: h.Headers,
}
}

// DebugMetricsConfig implements receiver.Arguments.
func (args Arguments) DebugMetricsConfig() otelcol.DebugMetricsArguments {
return args.DebugMetrics
Expand Down
Loading

0 comments on commit b307c02

Please sign in to comment.