Skip to content

Commit

Permalink
reconciler: Fix retries.Add() when called multiple times
Browse files Browse the repository at this point in the history
The Add() function did not properly handle the case when
an item was Add()'d that was still in the queue. This happened
via the full reconciliation where it was queueing retries for
failed refreshes.

The effect of the bug was that the same item could end up in
the retry queue multiple times. This combined with the miscalculation
of the retryAt time eventually caused an item to be retried without
any backoff.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Jul 3, 2024
1 parent 8705921 commit 10fcaf7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 24 deletions.
16 changes: 13 additions & 3 deletions reconciler/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (rq *retries) Top() (*retryItem, bool) {
func (rq *retries) Pop() {
// Pop the object from the queue, but leave it into the map until
// the object is cleared or re-added.
rq.queue[0].index = -1
heap.Pop(&rq.queue)

rq.resetTimer()
Expand Down Expand Up @@ -119,6 +120,7 @@ func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError err
if item, ok = rq.items[string(key)]; !ok {
item = &retryItem{
numRetries: 0,
index: -1,
}
rq.items[string(key)] = item
}
Expand All @@ -129,10 +131,18 @@ func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError err
item.lastError = lastError
duration := rq.backoff.Duration(item.numRetries)
item.retryAt = time.Now().Add(duration)
heap.Push(&rq.queue, item)

// New item is at the top of the queue, reset the timer.
rq.resetTimer()
if item.index >= 0 {
// The item was already in the queue, fix up its position.
heap.Fix(&rq.queue, item.index)
} else {
heap.Push(&rq.queue, item)
}

// Item is at the head of the queue, reset the timer.
if item.index == 0 {
rq.resetTimer()
}
}

func (rq *retries) Clear(obj any) {
Expand Down
61 changes: 40 additions & 21 deletions reconciler/retries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,74 @@ func TestRetries(t *testing.T) {
assert.Len(t, errs, 3)
assert.Equal(t, err, errs[0])

// Adding an item a second time will increment the number of retries and
// recalculate when it should be retried.
rq.Add(obj3, 3, false, err)

<-rq.Wait()
item, ok := rq.Top()
item1, ok := rq.Top()
if assert.True(t, ok) {
assert.True(t, item1.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item1.object, obj1)

rq.Pop()
rq.Clear(item.object)
assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item.object, obj1)
rq.Clear(item1.object)
}

<-rq.Wait()
item, ok = rq.Top()
item2, ok := rq.Top()
if assert.True(t, ok) {
assert.True(t, item2.retryAt.Before(time.Now()), "expected item to be expired")
assert.True(t, item2.retryAt.After(item1.retryAt), "expected item to expire later than previous")
assert.Equal(t, item2.object, obj2)

rq.Pop()
rq.Clear(item.object)
assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item.object, obj2)
rq.Clear(item2.object)
}

// Pop the last object. But don't clear its retry count.
<-rq.Wait()
item, ok = rq.Top()
item3, ok := rq.Top()
if assert.True(t, ok) {
assert.True(t, item3.retryAt.Before(time.Now()), "expected item to be expired")
assert.True(t, item3.retryAt.After(item2.retryAt), "expected item to expire later than previous")
assert.Equal(t, item3.object, obj3)

rq.Pop()
assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item.object, obj3)
}

// Queue should be empty now.
item, ok := rq.Top()
assert.False(t, ok)

// Retry 'obj3' and since it was added back without clearing it'll be retried
// later. Add obj1 and check that 'obj3' has later retry time.
rq.Add(obj3, 4, false, err)
rq.Add(obj1, 5, false, err)

<-rq.Wait()
item, ok = rq.Top()
item4, ok := rq.Top()
if assert.True(t, ok) {
assert.True(t, item4.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item4.object, obj1)

rq.Pop()
rq.Clear(item.object)
assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item.object, obj1)
rq.Clear(item4.object)
}
retryAt1 := item.retryAt

<-rq.Wait()
item, ok = rq.Top()
item5, ok := rq.Top()
if assert.True(t, ok) {
assert.True(t, item5.retryAt.Before(time.Now()), "expected item to be expired")
assert.True(t, item5.retryAt.After(item4.retryAt), "expected obj1 before obj3")
assert.Equal(t, obj3, item5.object)

// numRetries is 3 since 'obj3' was added to the queue 3 times and it has not
// been cleared.
assert.Equal(t, 3, item5.numRetries)

rq.Pop()
rq.Clear(item.object)
assert.True(t, retryAt1.Before(item.retryAt), "expected obj1 before obj3")
assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired")
assert.Equal(t, item.object, obj3)
rq.Clear(item5.object)
}

_, ok = rq.Top()
Expand Down

0 comments on commit 10fcaf7

Please sign in to comment.