Skip to content

Commit

Permalink
Merge pull request #2325 from alvaroaleman/terminal-error
Browse files Browse the repository at this point in the history
✨  Add terminal error
  • Loading branch information
k8s-ci-robot authored May 17, 2023
2 parents 124fac6 + d10ae95 commit c8cf90e
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 24 deletions.
16 changes: 11 additions & 5 deletions pkg/controller/controllertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllertest

import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,28 +36,33 @@ func (ErrorType) GetObjectKind() schema.ObjectKind { return nil }
// DeepCopyObject implements runtime.Object.
func (ErrorType) DeepCopyObject() runtime.Object { return nil }

var _ workqueue.RateLimitingInterface = Queue{}
var _ workqueue.RateLimitingInterface = &Queue{}

// Queue implements a RateLimiting queue as a non-ratelimited queue for testing.
// This helps testing by having functions that use a RateLimiting queue synchronously add items to the queue.
type Queue struct {
workqueue.Interface
AddedRateLimitedLock sync.Mutex
AddedRatelimited []any
}

// AddAfter implements RateLimitingInterface.
func (q Queue) AddAfter(item interface{}, duration time.Duration) {
func (q *Queue) AddAfter(item interface{}, duration time.Duration) {
q.Add(item)
}

// AddRateLimited implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) AddRateLimited(item interface{}) {
func (q *Queue) AddRateLimited(item interface{}) {
q.AddedRateLimitedLock.Lock()
q.AddedRatelimited = append(q.AddedRatelimited, item)
q.AddedRateLimitedLock.Unlock()
q.Add(item)
}

// Forget implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) Forget(item interface{}) {}
func (q *Queue) Forget(item interface{}) {}

// NumRequeues implements RateLimitingInterface. TODO(community): Implement this.
func (q Queue) NumRequeues(item interface{}) int {
func (q *Queue) NumRequeues(item interface{}) int {
return 0
}
2 changes: 1 addition & 1 deletion pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Eventhandler", func() {
var pod *corev1.Pod
var mapper meta.RESTMapper
BeforeEach(func() {
q = controllertest.Queue{Interface: workqueue.New()}
q = &controllertest.Queue{Interface: workqueue.New()}
pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "biz", Name: "baz"},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
if errors.Is(err, reconcile.TerminalError(nil)) {
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
} else {
c.Queue.AddRateLimited(req)
}
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
Expand Down
25 changes: 24 additions & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ var _ = Describe("controller", func() {
})

It("should requeue a Request if there is an error and continue processing items", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -372,6 +371,9 @@ var _ = Describe("controller", func() {
By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
Expect(<-reconciled).To(Equal(request))
queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(Equal([]any{request}))
queue.AddedRateLimitedLock.Unlock()

By("Invoking Reconciler a second time without error")
fakeReconcile.AddResult(reconcile.Result{}, nil)
Expand All @@ -382,6 +384,27 @@ var _ = Describe("controller", func() {
Eventually(func() int { return queue.NumRequeues(request) }, 1.0).Should(Equal(0))
})

It("should not requeue a Request if there is a terminal error", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
}()

queue.Add(request)

By("Invoking Reconciler which will give an error")
fakeReconcile.AddResult(reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("expected error: reconcile")))
Expect(<-reconciled).To(Equal(request))

queue.AddedRateLimitedLock.Lock()
Expect(queue.AddedRatelimited).To(BeEmpty())
queue.AddedRateLimitedLock.Unlock()

Expect(queue.Len()).Should(Equal(0))
})

// TODO(directxman12): we should ensure that backoff occurrs with error requeue

It("should not reset backoff until there's a non-error result", func() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/internal/controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ var (
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})

// TerminalReconcileErrors is a prometheus counter metrics which holds the total
// number of terminal errors from the Reconciler.
TerminalReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_terminal_reconcile_errors_total",
Help: "Total number of terminal reconciliation errors per controller",
}, []string{"controller"})

// ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations.
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -67,6 +74,7 @@ func init() {
metrics.Registry.MustRegister(
ReconcileTotal,
ReconcileErrors,
TerminalReconcileErrors,
ReconcileTime,
WorkerCount,
ActiveWorkers,
Expand Down
32 changes: 16 additions & 16 deletions pkg/internal/source/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Internal", func() {
set = true
},
}
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, funcs, nil)
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, funcs, nil)
})

Describe("EventHandler", func() {
Expand All @@ -99,38 +99,38 @@ var _ = Describe("Internal", func() {
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand All @@ -157,37 +157,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter UpdateEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return false }},
predicate.Funcs{UpdateFunc: func(event.UpdateEvent) bool { return true }},
})
instance.OnUpdate(pod, newPod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
Expand Down Expand Up @@ -215,37 +215,37 @@ var _ = Describe("Internal", func() {

It("should used Predicates to filter DeleteEvents", func() {
set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return false }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
instance.OnDelete(pod)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, controllertest.Queue{}, setfuncs, []predicate.Predicate{
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
predicate.Funcs{DeleteFunc: func(event.DeleteEvent) bool { return true }},
})
Expand Down
24 changes: 24 additions & 0 deletions pkg/reconcile/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reconcile

import (
"context"
"errors"
"time"

"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -100,3 +101,26 @@ var _ Reconciler = Func(nil)

// Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) }

// TerminalError is an error that will not be retried but still be logged
// and recorded in metrics.
func TerminalError(wrapped error) error {
return &terminalError{err: wrapped}
}

type terminalError struct {
err error
}

func (te *terminalError) Unwrap() error {
return te.err
}

func (te *terminalError) Error() string {
return "terminal error: " + te.err.Error()
}

func (te *terminalError) Is(target error) bool {
tp := &terminalError{}
return errors.As(target, &tp)
}
8 changes: 8 additions & 0 deletions pkg/reconcile/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -88,5 +89,12 @@ var _ = Describe("reconcile", func() {
Expect(actualResult).To(Equal(result))
Expect(actualErr).To(Equal(err))
})

It("should allow unwrapping inner error from terminal error", func() {
inner := apierrors.NewGone("")
terminalError := reconcile.TerminalError(inner)

Expect(apierrors.IsGone(terminalError)).To(BeTrue())
})
})
})

0 comments on commit c8cf90e

Please sign in to comment.