Skip to content

Commit

Permalink
[processor/k8sattributes] Add support for profiles signal (#35999)
Browse files Browse the repository at this point in the history
#### Description
Add support for profiles signal to `k8sattributesprocessor`.

#### Link to tracking issue
Fixes #35983

#### Testing
- factory_test.go
- processor_test.go
  • Loading branch information
haoqixu authored Nov 12, 2024
1 parent 0250081 commit 2f2ba8c
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 46 deletions.
27 changes: 27 additions & 0 deletions .chloggen/f-profiles-k8sattributesprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for profiles signal

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35983]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 3 additions & 1 deletion processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: logs, metrics, traces |
| Stability | [development]: profiles |
| | [beta]: logs, metrics, traces |
| Distributions | [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fk8sattributes%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fk8sattributes) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fk8sattributes%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fk8sattributes) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@dmitryax](https://www.github.com/dmitryax), [@fatsheep9146](https://www.github.com/fatsheep9146), [@TylerHelmuth](https://www.github.com/TylerHelmuth) |
| Emeritus | [@rmfitzpatrick](https://www.github.com/rmfitzpatrick) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[k8s]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-k8s
Expand Down
276 changes: 255 additions & 21 deletions processor/k8sattributesprocessor/e2e_test.go

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"
"go.opentelemetry.io/collector/processor/processorprofiles"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
Expand All @@ -22,12 +25,13 @@ var defaultExcludes = ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agen

// NewFactory returns a new factory for the k8s processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
return processorprofiles.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(createTracesProcessor, metadata.TracesStability),
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
processorprofiles.WithTraces(createTracesProcessor, metadata.TracesStability),
processorprofiles.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
processorprofiles.WithLogs(createLogsProcessor, metadata.LogsStability),
processorprofiles.WithProfiles(createProfilesProcessor, metadata.ProfilesStability),
)
}

Expand Down Expand Up @@ -68,6 +72,15 @@ func createMetricsProcessor(
return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer)
}

func createProfilesProcessor(
ctx context.Context,
params processor.Settings,
cfg component.Config,
nextProfilesConsumer consumerprofiles.Profiles,
) (processorprofiles.Profiles, error) {
return createProfilesProcessorWithOptions(ctx, params, cfg, nextProfilesConsumer)
}

func createTracesProcessorWithOptions(
ctx context.Context,
set processor.Settings,
Expand Down Expand Up @@ -128,6 +141,27 @@ func createLogsProcessorWithOptions(
processorhelper.WithShutdown(kp.Shutdown))
}

func createProfilesProcessorWithOptions(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextProfilesConsumer consumerprofiles.Profiles,
options ...option,
) (processorprofiles.Profiles, error) {
kp := createKubernetesProcessor(set, cfg, options...)

return processorhelperprofiles.NewProfiles(
ctx,
set,
cfg,
nextProfilesConsumer,
kp.processProfiles,
processorhelperprofiles.WithCapabilities(consumerCapabilities),
processorhelperprofiles.WithStart(kp.Start),
processorhelperprofiles.WithShutdown(kp.Shutdown),
)
}

func createKubernetesProcessor(
params processor.Settings,
cfg component.Config,
Expand Down
9 changes: 9 additions & 0 deletions processor/k8sattributesprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processorprofiles"
"go.opentelemetry.io/collector/processor/processortest"
)

Expand Down Expand Up @@ -41,6 +42,10 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, lp)
assert.NoError(t, err)

pp, err := factory.(processorprofiles.Factory).CreateProfiles(context.Background(), params, cfg, consumertest.NewNop())
assert.NotNil(t, pp)
assert.NoError(t, err)

oCfg := cfg.(*Config)
oCfg.Passthrough = true

Expand All @@ -56,6 +61,10 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, lp)
assert.NoError(t, err)

pp, err = factory.(processorprofiles.Factory).CreateProfiles(context.Background(), params, cfg, consumertest.NewNop())
assert.NotNil(t, pp)
assert.NoError(t, err)

// Switch it back so other tests run afterwards will not fail on unexpected state
kubeClientProvider = realClient
}
10 changes: 6 additions & 4 deletions processor/k8sattributesprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.113.0
go.opentelemetry.io/collector/confmap v1.19.0
go.opentelemetry.io/collector/consumer v0.113.0
go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0
go.opentelemetry.io/collector/consumer/consumertest v0.113.0
go.opentelemetry.io/collector/featuregate v1.19.0
go.opentelemetry.io/collector/pdata v1.19.0
go.opentelemetry.io/collector/pdata/pprofile v0.113.0
go.opentelemetry.io/collector/pipeline v0.113.0
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.113.0
go.opentelemetry.io/collector/processor v0.113.0
go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.113.0
go.opentelemetry.io/collector/processor/processorprofiles v0.113.0
go.opentelemetry.io/collector/processor/processortest v0.113.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.113.0
go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0
go.opentelemetry.io/collector/receiver/receivertest v0.113.0
go.opentelemetry.io/collector/semconv v0.113.0
go.opentelemetry.io/otel/metric v1.32.0
Expand Down Expand Up @@ -94,15 +100,11 @@ require (
go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect
go.opentelemetry.io/collector/config/internal v0.113.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.113.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 // indirect
go.opentelemetry.io/collector/extension v0.113.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect
go.opentelemetry.io/collector/internal/sharedcomponent v0.113.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.113.0 // indirect
go.opentelemetry.io/collector/processor/processorprofiles v0.113.0 // indirect
go.opentelemetry.io/collector/receiver v0.113.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ status:
class: processor
stability:
beta: [logs, metrics, traces]
development: [profiles]
distributions: [contrib, k8s]
codeowners:
active: [dmitryax, fatsheep9146, TylerHelmuth]
Expand Down
11 changes: 11 additions & 0 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.8.0"
"go.uber.org/zap"
Expand Down Expand Up @@ -117,6 +118,16 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p
return ld, nil
}

// processProfiles process profiles and add k8s metadata using resource IP, hostname or incoming IP as pod origin.
func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) {
rp := pd.ResourceProfiles()
for i := 0; i < rp.Len(); i++ {
kp.processResource(ctx, rp.At(i).Resource())
}

return pd, nil
}

// processResource adds Pod metadata tags to resource based on pod association configuration
func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) {
podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations)
Expand Down
Loading

0 comments on commit 2f2ba8c

Please sign in to comment.