Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup after the Otel upgrade #4359

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Main (unreleased)

- `otelcol.exporter.loki` now includes the instrumentation scope in its output. (@ptodev)

- `otelcol.extension.jaeger_remote_sampling` removes the `\` HTTP endpoint. The `/sampling` endpoint is still functional.
- `otelcol.extension.jaeger_remote_sampling` removes the `/` HTTP endpoint. The `/sampling` endpoint is still functional.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that all breaking changes need to be documented with details and potential migration instructions in the upgrade guide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I forgot to update the guide in #3858. I added three entries to it now!

The change was made in PR [#18070](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/18070) of opentelemetry-collector-contrib. (@ptodev)

- The field `version` and `auth` struct block from `walk_params` in `prometheus.exporter.snmp` and SNMP integration have been removed. The auth block now can be configured at top level, together with `modules` (@marctc)
Expand All @@ -29,6 +29,33 @@ Main (unreleased)
discovers file on the local filesystem, and so it doesn't get confused with
Prometheus' file discovery. (@rfratto)

- In the traces subsystem for Static mode, some metrics are removed and others are renamed. (@ptodev)
- Removed metrics:
- "blackbox_exporter_config_last_reload_success_timestamp_seconds" (gauge)
- "blackbox_exporter_config_last_reload_successful" (gauge)
- "blackbox_module_unknown_total" (counter)
- "traces_processor_tail_sampling_count_traces_sampled" (counter)
- "traces_processor_tail_sampling_new_trace_id_received" (counter)
- "traces_processor_tail_sampling_sampling_decision_latency" (histogram)
- "traces_processor_tail_sampling_sampling_decision_timer_latency" (histogram)
- "traces_processor_tail_sampling_sampling_policy_evaluation_error" (counter)
- "traces_processor_tail_sampling_sampling_trace_dropped_too_early" (counter)
- "traces_processor_tail_sampling_sampling_traces_on_memory" (gauge)
- "traces_receiver_accepted_spans" (counter)
- "traces_receiver_refused_spans" (counter)
- "traces_exporter_enqueue_failed_log_records" (counter)
- "traces_exporter_enqueue_failed_metric_points" (counter)
- "traces_exporter_enqueue_failed_spans" (counter)
- "traces_exporter_queue_capacity" (gauge)
- "traces_exporter_queue_size" (gauge)

- Renamed metrics:
- "traces_receiver_refused_spans" is renamed to "traces_receiver_refused_spans_total"
- "traces_receiver_accepted_spans" is renamed to "traces_receiver_refused_spans_total"
- "traces_exporter_sent_metric_points" is renamed to "traces_exporter_sent_metric_points_total"

- The `remote_sampling` block has been removed from `otelcol.receiver.jaeger`. (@ptodev)

### Features

- The Pyroscope scrape component computes and sends delta profiles automatically when required to reduce bandwidth usage. (@cyriltovena)
Expand Down Expand Up @@ -72,6 +99,14 @@ Main (unreleased)
- `grafana-agent tools prometheus.remote_write` holds a collection of remote
write-specific tools. These have been ported over from the `agentctl` command. (@rfratto)

- A new `action` argument for `otelcol.auth.headers`. (@ptodev)

- New `metadata_keys` and `metadata_cardinality_limit` arguments for `otelcol.processor.batch`. (@ptodev)

- New `boolean_attribute` and `ottl_condition` sampling policies for `otelcol.processor.tail_sampling`. (@ptodev)

- A new `initial_offset` argument for `otelcol.receiver.kafka`. (@ptodev)

### Enhancements

- Attributes and blocks set to their default values will no longer be shown in the Flow UI. (@rfratto)
Expand Down Expand Up @@ -109,6 +144,16 @@ Main (unreleased)

- Allow `prometheus.exporter.snmp` and SNMP integration to be configured passing a YAML block. (@marctc)

- Some metrics have been added to the traces subsystem for Static mode. (@ptodev)
- "traces_processor_batch_batch_send_size" (histogram)
- "traces_processor_batch_batch_size_trigger_send_total" (counter)
- "traces_processor_batch_metadata_cardinality" (gauge)
- "traces_processor_batch_timeout_trigger_send_total" (counter)
- "traces_rpc_server_duration" (histogram)
- "traces_exporter_send_failed_metric_points_total" (counter)
- "traces_exporter_send_failed_spans_total" (counter)
- "traces_exporter_sent_spans_total" (counter)

### Bugfixes

- Add signing region to remote.s3 component for use with custom endpoints so that Authorization Headers work correctly when
Expand Down Expand Up @@ -143,6 +188,8 @@ Main (unreleased)
- Mongodb integration has been re-enabled. (@jcreixell, @marctc)
- Build with go 1.20.6 (@captncraig)

- `otelcol.exporter.jaeger` has been deprecated and will be removed soon. (@ptodev)

v0.34.3 (2023-06-27)
--------------------

Expand Down
11 changes: 2 additions & 9 deletions component/otelcol/auth/bearer/bearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,9 @@ var DefaultArguments = Arguments{
Scheme: "Bearer",
}

// UnmarshalRiver implements river.Unmarshaler.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments

type arguments Arguments
if err := f((*arguments)(args)); err != nil {
return err
}

return nil
}

// Convert implements auth.Arguments.
Expand Down
80 changes: 80 additions & 0 deletions component/otelcol/auth/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
package headers

import (
"encoding"
"fmt"
"strings"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol/auth"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/river/rivertypes"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -40,6 +43,11 @@ func (args Arguments) Convert() (otelcomponent.Config, error) {
Key: &h.Key,
}

err := h.Action.Convert(&upstreamHeader)
if err != nil {
return nil, err
}

if h.Value != nil {
upstreamHeader.Value = &h.Value.Value
}
Expand All @@ -65,15 +73,87 @@ func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.I
return nil
}

type Action string

const (
ActionInsert Action = "insert"
ActionUpdate Action = "update"
ActionUpsert Action = "upsert"
ActionDelete Action = "delete"
)

var (
_ river.Validator = (*Action)(nil)
_ encoding.TextUnmarshaler = (*Action)(nil)
)

// Validate implements river.Validator.
func (a *Action) Validate() error {
switch *a {
case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete:
// This is a valid value, do not error
default:
return fmt.Errorf("action is set to an invalid value of %q", *a)
}
return nil
}

// Convert the River type to the Otel type.
// TODO: When headerssetterextension.actionValue is made external,
// remove the input parameter and make this output the Otel type.
func (a *Action) Convert(hc *headerssetterextension.HeaderConfig) error {
switch *a {
case ActionInsert:
hc.Action = headerssetterextension.INSERT
case ActionUpdate:
hc.Action = headerssetterextension.UPDATE
case ActionUpsert:
hc.Action = headerssetterextension.UPSERT
case ActionDelete:
hc.Action = headerssetterextension.DELETE
default:
return fmt.Errorf("action is set to an invalid value of %q", *a)
}
return nil
}

func (a *Action) UnmarshalText(text []byte) error {
str := Action(strings.ToLower(string(text)))
switch str {
case ActionInsert, ActionUpdate, ActionUpsert, ActionDelete:
*a = str
return nil
default:
return fmt.Errorf("unknown action %v", str)
}
}
Comment on lines +120 to +129
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to exist or is the Validate method enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnmarshalText needs to be implemented because otherwise River won't be able to unmarshal Action. Since I already had to implement it, I thought I might as well verify if it's correct since it doesn't make sense to unmarshal to an incorrect value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnmarshalText needs to be implemented because otherwise River won't be able to unmarshal Action.

Are you sure? Action is just a string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, there's text transformation going on when unmarshaling to make it case insensitive. OK, that adds some validity to UnmarshalText but it's still not strictly necessary.

I'm ok with keeping it in both cases (including the tail sampling case being discussed below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The text transformation isn't the main reason. If I don't implement UnmarshalText, I get this error:

panic: reflect.Set: value of type string is not assignable to type headers.Action [recovered]
	panic: reflect.Set: value of type string is not assignable to type headers.Action

The text transformation was simply so that we match what the Collector does. TBH I'm not 100% sure if I like it or if we should just stick to some Agent convention, but I wasn't sure if we have a convention and thought this way it'll be easier to migrate Collector configs to the Agent.


// Header is an individual Header to send along with requests.
type Header struct {
Key string `river:"key,attr"`
Value *rivertypes.OptionalSecret `river:"value,attr,optional"`
FromContext *string `river:"from_context,attr,optional"`
Action Action `river:"action,attr,optional"`
}

var _ river.Defaulter = &Header{}

var DefaultHeader = Header{
Action: ActionUpsert,
}

// SetToDefault implements river.Defaulter.
func (h *Header) SetToDefault() {
*h = DefaultHeader
}

// Validate implements river.Validator.
func (h *Header) Validate() error {
err := h.Action.Validate()
if err != nil {
return err
}

switch {
case h.Key == "":
return fmt.Errorf("key must be set to a non-empty string")
Expand Down
103 changes: 103 additions & 0 deletions component/otelcol/auth/headers/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
extauth "go.opentelemetry.io/collector/extension/auth"
Expand Down Expand Up @@ -78,3 +79,105 @@ func Test(t *testing.T) {
require.NoError(t, err, "HTTP request failed")
require.Equal(t, http.StatusOK, resp.StatusCode)
}

func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
cfg string
expectedKey string
expectedValue string
expectedAction any
expectUnmarshalError bool
}{
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "insert"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.INSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "update"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPDATE,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "upsert"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.UPSERT,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "delete"
}
`,
expectedKey: "X-Scope-Org-ID",
expectedValue: "fake",
expectedAction: headerssetterextension.DELETE,
},
{
cfg: `
header {
key = "X-Scope-Org-ID"
value = "fake"
action = "NonExistingAction"
}
`,
expectUnmarshalError: true,
},
}

for _, tc := range tests {
var args headers.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)

if tc.expectUnmarshalError {
require.Error(t, err)
continue
}
require.NoError(t, err)

ext, err := args.Convert()

require.NoError(t, err)
otelArgs, ok := (ext).(*headerssetterextension.Config)
require.True(t, ok)

require.Equal(t, len(otelArgs.HeadersConfig), 1)
require.Equal(t, *otelArgs.HeadersConfig[0].Key, tc.expectedKey)
require.Equal(t, *otelArgs.HeadersConfig[0].Value, tc.expectedValue)
require.Equal(t, otelArgs.HeadersConfig[0].Action, tc.expectedAction)
}
}
21 changes: 13 additions & 8 deletions component/otelcol/processor/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ func init() {

// Arguments configures the otelcol.processor.batch component.
type Arguments struct {
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
SendBatchSize uint32 `river:"send_batch_size,attr,optional"`
SendBatchMaxSize uint32 `river:"send_batch_max_size,attr,optional"`
MetadataKeys []string `river:"metadata_keys,attr,optional"`
MetadataCardinalityLimit uint32 `river:"metadata_cardinality_limit,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Expand All @@ -42,8 +44,9 @@ var (

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
Timeout: 200 * time.Millisecond,
SendBatchSize: 8192,
MetadataCardinalityLimit: 1000,
}

// SetToDefault implements river.Defaulter.
Expand All @@ -62,9 +65,11 @@ func (args *Arguments) Validate() error {
// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &batchprocessor.Config{
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
Timeout: args.Timeout,
SendBatchSize: args.SendBatchSize,
SendBatchMaxSize: args.SendBatchMaxSize,
MetadataKeys: args.MetadataKeys,
MetadataCardinalityLimit: args.MetadataCardinalityLimit,
}, nil
}

Expand Down
Loading