Skip to content

Commit

Permalink
feat (processor/k8sattributes): wait for synced when starting k8sattr…
Browse files Browse the repository at this point in the history
…ibutes processor. (open-telemetry#32622)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

When starting `k8sattributes` processor, block until an initial
synchronization has been completed. This solves open-telemetry#32556

**Link to tracking Issue:** <Issue number if applicable>

fix open-telemetry#32556

**Testing:** <Describe what testing was performed and which tests were
added.>

Tested in a cluster with constant high span traffic, no more spans with
unassociated k8s metadata after adding this pr.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
2 people authored and RutvikS-crest committed Dec 9, 2024
1 parent 7133b98 commit a38569c
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 109 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sattributes-block.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Block when starting until the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32556]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 15 additions & 0 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ the processor associates the received trace to the pod, based on the connection
}
```

By default, the processor will be ready as soon as it starts, even if no metadata has been fetched yet.
If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with.

To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`.
Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector will be blocked. If the metadata cannot be synced, the Collector will ultimately fail to start.
If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit.
The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option.

example for setting the processor to wait for metadata to be synced before it is ready:

```yaml
wait_for_metadata: true
wait_for_metadata_timeout: 10s
```

## Extracting attributes from pod labels and annotations

The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.
Expand Down
9 changes: 6 additions & 3 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package k8sattributesprocessor

import (
"time"

"go.opentelemetry.io/collector/component"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
Expand Down Expand Up @@ -70,10 +72,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
}

// Start is a noop for FakeClient.
func (f *fakeClient) Start() {
func (f *fakeClient) Start() error {
if f.Informer != nil {
f.Informer.Run(f.StopCh)
go f.Informer.Run(f.StopCh)
}
return nil
}

// Stop is a noop for FakeClient.
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
import (
"fmt"
"regexp"
"time"

"go.opentelemetry.io/collector/featuregate"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -46,6 +47,12 @@ type Config struct {
// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`

// WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting.
WaitForMetadata bool `mapstructure:"wait_for_metadata"`

// WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"`
}

func (cfg *Config) Validate() error {
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package k8sattributesprocessor
import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down Expand Up @@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand All @@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) {
{Name: "jaeger-collector"},
},
},
WaitForMetadataTimeout: 10 * time.Second,
},
},
{
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config {
Extract: ExtractConfig{
Metadata: enabledAttributes(),
},
WaitForMetadataTimeout: 10 * time.Second,
}
}

Expand Down Expand Up @@ -202,5 +204,10 @@ func createProcessorOpts(cfg component.Config) []option {

opts = append(opts, withExcludes(oCfg.Exclude))

opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout))
if oCfg.WaitForMetadata {
opts = append(opts, withWaitForMetadata(true))
}

return opts
}
38 changes: 0 additions & 38 deletions processor/k8sattributesprocessor/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 66 additions & 31 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
Expand Down Expand Up @@ -39,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister(

// WatchClient is the main interface provided by this package to a kubernetes cluster.
type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
nodeInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
waitForMetadata bool
waitForMetadataTimeout time.Duration

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand Down Expand Up @@ -84,21 +87,36 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
func New(
set component.TelemetrySettings,
apiCfg k8sconfig.APIConfig,
rules ExtractionRules,
filters Filters,
associations []Association,
exclude Excludes,
newClientSet APIClientsetProvider,
newInformer InformerProvider,
newNamespaceInformer InformerProviderNamespace,
newReplicaSetInformer InformerProviderReplicaSet,
waitForMetadata bool,
waitForMetadataTimeout time.Duration,
) (Client, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
}
c := &WatchClient{
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
logger: set.Logger,
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
cronJobRegex: cronJobRegex,
stopCh: make(chan struct{}),
telemetryBuilder: telemetryBuilder,
waitForMetadata: waitForMetadata,
waitForMetadataTimeout: waitForMetadataTimeout,
}
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)

Expand Down Expand Up @@ -189,50 +207,67 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr
}

// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() {
_, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
func (c *WatchClient) Start() error {
synced := make([]cache.InformerSynced, 0)
reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
DeleteFunc: c.handlePodDelete,
})
if err != nil {
c.logger.Error("error adding event handler to pod informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.informer.Run(c.stopCh)

_, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
})
if err != nil {
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.namespaceInformer.Run(c.stopCh)

if c.Rules.DeploymentName || c.Rules.DeploymentUID {
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleReplicaSetAdd,
UpdateFunc: c.handleReplicaSetUpdate,
DeleteFunc: c.handleReplicaSetDelete,
})
if err != nil {
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.replicasetInformer.Run(c.stopCh)
}

if c.nodeInformer != nil {
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNodeAdd,
UpdateFunc: c.handleNodeUpdate,
DeleteFunc: c.handleNodeDelete,
})
if err != nil {
c.logger.Error("error adding event handler to node informer", zap.Error(err))
return err
}
synced = append(synced, reg.HasSynced)
go c.nodeInformer.Run(c.stopCh)
}

if c.waitForMetadata {
timeoutCh := make(chan struct{})
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
close(timeoutCh)
})
defer t.Stop()
if !cache.WaitForCacheSync(timeoutCh, synced...) {
return errors.New("failed to wait for caches to sync")
}
}
return nil
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down
Loading

0 comments on commit a38569c

Please sign in to comment.