Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle DeletedFinalStateUnknown in k8s OnDelete #23419

Merged
merged 6 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix typo in config docs {pull}23185[23185]
- Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183]
- Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154]
- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419]

*Auditbeat*

Expand Down
17 changes: 16 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -114,7 +115,21 @@ func (n *node) OnUpdate(obj interface{}) {
// OnDelete ensures processing of node objects that are deleted
func (n *node) OnDelete(obj interface{}) {
n.logger.Debugf("Watcher Node delete: %+v", obj)
time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(obj.(*kubernetes.Node), "stop") })
node, isNode := obj.(*kubernetes.Node)
// We can get DeletedFinalStateUnknown instead of *kubernetes.Node here and we need to handle that correctly. #23385
if !isNode {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
Copy link
Member

@jsoriano jsoriano Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exposing details from the watcher and the client cache in the handlers. DeleteFinalStateUnknown happens when a informer misses the delete event, and it sees in a resync that an object is not there anymore. As it doesn't know what was the final state, it includes the last known state, that in our case is probably enough to remove configurations.

Also, handling this in the handler level forces us to repeat some code in each handler we add.

Wdyt about trying to handle it in the watcher directly?

For example watcher.enqueue() already does something related to DeletedFinalStateUnknown when it calls cache.DeletionHandlingMetaNamespaceKeyFunc. We could maybe modify enqueue to do something like this:

func (w *watcher) enqueue(obj interface{}, state string) {
        // DeletionHandlingMetaNamespaceKeyFunc that we get a key only if the resource's state is not Unknown.
        key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err != nil {
                return
        }
        if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                w.logger.Debugf(.....)
                obj = deleted.Obj
        }
        w.queue.Add(&item{key, obj, state})
}

if !ok {
n.logger.Errorf("Received unexpected object: %+v", obj)
return
}
node, ok = deletedState.Obj.(*kubernetes.Node)
if !ok {
n.logger.Errorf("DeletedFinalStateUnknown contained non-Node object: %+v", deletedState.Obj)
return
}
}
time.AfterFunc(n.config.CleanupTimeout, func() { n.emit(node, "stop") })
}

// GenerateHints creates hints needed for hints builder
Expand Down
190 changes: 190 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/elastic/beats/v7/libbeat/autodiscover/template"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -345,6 +346,195 @@ func TestEmitEvent_Node(t *testing.T) {
}
}

func TestNode_OnDelete(t *testing.T) {
name := "metricbeat"
nameUnknown := "metricbeat-unknown"
nodeIP := "192.168.0.1"
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
UUID, err := uuid.NewV4()

typeMeta := metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
}
if err != nil {
t.Fatal(err)
}

tests := []struct {
Message string
Flag string
Node *kubernetes.Node
Expected bus.Event
}{
{
Message: "Test node stop",
Node: &kubernetes.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(uid),
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Type: v1.NodeExternalIP,
Address: nodeIP,
},
{
Type: v1.NodeInternalIP,
Address: "1.2.3.4",
},
{
Type: v1.NodeHostName,
Address: "node1",
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "192.168.0.1",
"id": uid,
"provider": UUID,
"kubernetes": common.MapStr{
"node": common.MapStr{
"name": "metricbeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
"hostname": "node1",
},
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"node": common.MapStr{
"name": "metricbeat",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
"hostname": "node1",
},
},
},
"config": []*common.Config{},
},
},
{
Message: "Test node stop with DeletedFinalStateUnknown",
Node: &kubernetes.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nameUnknown,
UID: types.UID(uid),
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Type: v1.NodeExternalIP,
Address: nodeIP,
},
{
Type: v1.NodeInternalIP,
Address: "1.2.3.4",
},
{
Type: v1.NodeHostName,
Address: "node1",
},
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
},
Expected: bus.Event{
"stop": true,
"host": "192.168.0.1",
"id": uid,
"provider": UUID,
"kubernetes": common.MapStr{
"node": common.MapStr{
"name": nameUnknown,
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
"hostname": "node1",
},
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"node": common.MapStr{
"name": nameUnknown,
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
"hostname": "node1",
},
},
},
"config": []*common.Config{},
},
},
}

for _, test := range tests {
t.Run(test.Message, func(t *testing.T) {
mapper, err := template.NewConfigMapper(nil, nil, nil)
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
config := defaultConfig()
p := &Provider{
config: config,
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}

no := &node{
metagen: metaGen,
config: defaultConfig(),
publish: p.publish,
uuid: UUID,
logger: logp.NewLogger("kubernetes.no"),
}
no.config.CleanupTimeout = 1 * time.Second
p.eventManager = NewMockNodeEventerManager(no)

listener := p.bus.Subscribe()

if test.Node.Name == nameUnknown {
deletedState := cache.DeletedFinalStateUnknown{
Key: "testnode",
Obj: test.Node,
}
no.OnDelete(deletedState)
} else {
no.OnDelete(test.Node)
}

select {
case event := <-listener.Events():
assert.Equal(t, test.Expected, event, test.Message)
case <-time.After(4 * time.Second):
if test.Expected != nil {
t.Fatal("Timeout while waiting for event")
}
}
})
}
}

func NewMockNodeEventerManager(no *node) EventManager {
em := &eventerManager{}
em.eventer = no
Expand Down
16 changes: 15 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,21 @@ func (p *pod) OnUpdate(obj interface{}) {
// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
pod, isPod := obj.(*kubernetes.Pod)
// We can get DeletedFinalStateUnknown instead of *kubernetes.Pod here and we need to handle that correctly. #23385
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
p.logger.Debugf("Received unexpected object: %+v", obj)
return
}
pod, ok = deletedState.Obj.(*kubernetes.Pod)
if !ok {
p.logger.Debugf("DeletedFinalStateUnknown contained non-Pod object: %+v", deletedState.Obj)
return
}
}
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
}

// GenerateHints creates hints needed for hints builder
Expand Down
Loading