diff --git a/reconciler/retries.go b/reconciler/retries.go index f80b7dc..cb3fd50 100644 --- a/reconciler/retries.go +++ b/reconciler/retries.go @@ -86,6 +86,7 @@ func (rq *retries) Top() (*retryItem, bool) { func (rq *retries) Pop() { // Pop the object from the queue, but leave it into the map until // the object is cleared or re-added. + rq.queue[0].index = -1 heap.Pop(&rq.queue) rq.resetTimer() @@ -119,6 +120,7 @@ func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError err if item, ok = rq.items[string(key)]; !ok { item = &retryItem{ numRetries: 0, + index: -1, } rq.items[string(key)] = item } @@ -129,10 +131,18 @@ func (rq *retries) Add(obj any, rev statedb.Revision, delete bool, lastError err item.lastError = lastError duration := rq.backoff.Duration(item.numRetries) item.retryAt = time.Now().Add(duration) - heap.Push(&rq.queue, item) - // New item is at the top of the queue, reset the timer. - rq.resetTimer() + if item.index >= 0 { + // The item was already in the queue, fix up its position. + heap.Fix(&rq.queue, item.index) + } else { + heap.Push(&rq.queue, item) + } + + // Item is at the head of the queue, reset the timer. + if item.index == 0 { + rq.resetTimer() + } } func (rq *retries) Clear(obj any) { diff --git a/reconciler/retries_test.go b/reconciler/retries_test.go index fb0ad21..fb7bf66 100644 --- a/reconciler/retries_test.go +++ b/reconciler/retries_test.go @@ -33,55 +33,74 @@ func TestRetries(t *testing.T) { assert.Len(t, errs, 3) assert.Equal(t, err, errs[0]) + // Adding an item a second time will increment the number of retries and + // recalculate when it should be retried. + rq.Add(obj3, 3, false, err) + <-rq.Wait() - item, ok := rq.Top() + item1, ok := rq.Top() if assert.True(t, ok) { + assert.True(t, item1.retryAt.Before(time.Now()), "expected item to be expired") + assert.Equal(t, item1.object, obj1) + rq.Pop() - rq.Clear(item.object) - assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired") - assert.Equal(t, item.object, obj1) + rq.Clear(item1.object) } <-rq.Wait() - item, ok = rq.Top() + item2, ok := rq.Top() if assert.True(t, ok) { + assert.True(t, item2.retryAt.Before(time.Now()), "expected item to be expired") + assert.True(t, item2.retryAt.After(item1.retryAt), "expected item to expire later than previous") + assert.Equal(t, item2.object, obj2) + rq.Pop() - rq.Clear(item.object) - assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired") - assert.Equal(t, item.object, obj2) + rq.Clear(item2.object) } + // Pop the last object. But don't clear its retry count. <-rq.Wait() - item, ok = rq.Top() + item3, ok := rq.Top() if assert.True(t, ok) { + assert.True(t, item3.retryAt.Before(time.Now()), "expected item to be expired") + assert.True(t, item3.retryAt.After(item2.retryAt), "expected item to expire later than previous") + assert.Equal(t, item3.object, obj3) + rq.Pop() - assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired") - assert.Equal(t, item.object, obj3) } + // Queue should be empty now. + item, ok := rq.Top() + assert.False(t, ok) + // Retry 'obj3' and since it was added back without clearing it'll be retried // later. Add obj1 and check that 'obj3' has later retry time. rq.Add(obj3, 4, false, err) rq.Add(obj1, 5, false, err) <-rq.Wait() - item, ok = rq.Top() + item4, ok := rq.Top() if assert.True(t, ok) { + assert.True(t, item4.retryAt.Before(time.Now()), "expected item to be expired") + assert.Equal(t, item4.object, obj1) + rq.Pop() - rq.Clear(item.object) - assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired") - assert.Equal(t, item.object, obj1) + rq.Clear(item4.object) } - retryAt1 := item.retryAt <-rq.Wait() - item, ok = rq.Top() + item5, ok := rq.Top() if assert.True(t, ok) { + assert.True(t, item5.retryAt.Before(time.Now()), "expected item to be expired") + assert.True(t, item5.retryAt.After(item4.retryAt), "expected obj1 before obj3") + assert.Equal(t, obj3, item5.object) + + // numRetries is 3 since 'obj3' was added to the queue 3 times and it has not + // been cleared. + assert.Equal(t, 3, item5.numRetries) + rq.Pop() - rq.Clear(item.object) - assert.True(t, retryAt1.Before(item.retryAt), "expected obj1 before obj3") - assert.True(t, item.retryAt.Before(time.Now()), "expected item to be expired") - assert.Equal(t, item.object, obj3) + rq.Clear(item5.object) } _, ok = rq.Top()