Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid infinite loop in Kubernetes watcher #6353

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ https://github.com/elastic/beats/compare/v6.2.0...6.2[Check the HEAD diff]

*Affecting all Beats*

- Fix infinite loop when event unmarshal fails in Kubernetes pod watcher. {pull}6353[6353]

*Auditbeat*

*Filebeat*
Expand Down
34 changes: 30 additions & 4 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"errors"
"io"
"sync"
"time"

Expand All @@ -13,6 +14,9 @@ import (
corev1 "github.com/ericchiang/k8s/api/v1"
)

// Max back off time for retries
const maxBackoff = 30 * time.Second

// Watcher reads Kubernetes events and keeps a list of known pods
type Watcher interface {
// Start watching Kubernetes API for new containers
Expand Down Expand Up @@ -129,27 +133,41 @@ func (p *podWatcher) Start() error {
}

func (p *podWatcher) watch() {
// Failures counter, do exponential backoff on retries
var failures uint

for {
logp.Info("kubernetes: %s", "Watching API for pod events")
watcher, err := p.client.WatchPods(p.ctx, "", p.nodeFilter, k8s.ResourceVersion(p.lastResourceVersion))
if err != nil {
//watch pod failures should be logged and gracefully failed over as metadata retrieval
//should never stop.
logp.Err("kubernetes: Watching API error %v", err)
time.Sleep(time.Second)
backoff(failures)
failures++
continue
}

for {
_, apiPod, err := watcher.Next()
if err != nil {
logp.Err("kubernetes: Watching API error %v", err)
watcher.Close()
break

// In case of EOF, stop watching and restart the process
if err == io.EOF || err == io.ErrUnexpectedEOF {
watcher.Close()
backoff(failures)
failures++
break
}

// Otherwise, this is probably an unknown event (unmarshal error), ignore it
continue
}

// Update last resource version
// Update last resource version and reset failure counter
p.lastResourceVersion = apiPod.Metadata.GetResourceVersion()
failures = 0

pod := GetPod(apiPod)
if pod.Metadata.DeletionTimestamp != "" {
Expand Down Expand Up @@ -190,6 +208,14 @@ func (p *podWatcher) watch() {
}
}

func backoff(failures uint) {
wait := 1 << failures * time.Second
if wait > maxBackoff {
wait = maxBackoff
}
time.Sleep(wait)
}

// Check annotations flagged as deleted for their last access time, fully delete
// the ones older than p.cleanupTimeout
func (p *podWatcher) cleanupWorker() {
Expand Down