Skip to content

Commit

Permalink
[release-15.0] VReplication Last Error: retry error if it happens aft…
Browse files Browse the repository at this point in the history
…er timeout (#12114) (#12302)

* Retry error if it happens after timeout

Signed-off-by: Rohit Nayak <[email protected]>

* Address review comments

Signed-off-by: Rohit Nayak <[email protected]>

* Address review comments

Signed-off-by: Rohit Nayak <[email protected]>

* Improve unit tests

Signed-off-by: Rohit Nayak <[email protected]>

---------

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Mar 27, 2023
1 parent b632186 commit 6998097
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 35 deletions.
14 changes: 7 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
}

ct := &controller{
vre: vre,
dbClientFactory: dbClientFactory,
mysqld: mysqld,
blpStats: blpStats,
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
lastWorkflowError: newLastError("VReplication Controller", maxTimeToRetryError),
vre: vre,
dbClientFactory: dbClientFactory,
mysqld: mysqld,
blpStats: blpStats,
done: make(chan struct{}),
source: &binlogdatapb.BinlogSource{},
}
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

Expand All @@ -89,6 +88,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
}
ct.id = uint32(id)
ct.workflow = params["workflow"]
ct.lastWorkflowError = newLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)

state := params["state"]
blpStats.State.Set(state)
Expand Down
18 changes: 16 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/last_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ import (

/*
* lastError tracks the most recent error for any ongoing process and how long it has persisted.
* The err field should be a vterror so as to ensure we have meaningful error codes, causes, stack
* The err field should be a vterror to ensure we have meaningful error codes, causes, stack
* traces, etc.
*/
type lastError struct {
name string
err error
firstSeen time.Time
lastSeen time.Time
mu sync.Mutex
maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false
}

func newLastError(name string, maxTimeInError time.Duration) *lastError {
log.Infof("Created last error: %s, with maxTimeInError: %s", name, maxTimeInError)
return &lastError{
name: name,
maxTimeInError: maxTimeInError,
Expand All @@ -48,15 +50,27 @@ func (le *lastError) record(err error) {
le.mu.Lock()
defer le.mu.Unlock()
if err == nil {
log.Infof("Resetting last error: %s", le.name)
le.err = nil
le.firstSeen = time.Time{}
le.lastSeen = time.Time{}
return
}
if !vterrors.Equals(err, le.err) {
log.Infof("Got new last error %+v for %s, was %+v", err, le.name, le.err)
le.firstSeen = time.Now()
le.lastSeen = time.Now()
le.err = err
} else {
// same error seen
log.Infof("Got the same last error for %q: %+v ; first seen at %s and last seen %dms ago", le.name, le.err, le.firstSeen, int(time.Since(le.lastSeen).Milliseconds()))
if time.Since(le.lastSeen) > le.maxTimeInError {
// reset firstSeen, since it has been long enough since the last time we saw this error
log.Infof("Resetting firstSeen for %s, since it is too long since the last one", le.name)
le.firstSeen = time.Now()
}
le.lastSeen = time.Now()
}
// The error is unchanged so we don't need to do anything
}

func (le *lastError) shouldRetry() bool {
Expand Down
83 changes: 57 additions & 26 deletions go/vt/vttablet/tabletmanager/vreplication/last_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,63 @@ import (
"github.com/stretchr/testify/require"
)

func TestLastError(t *testing.T) {
le := newLastError("test", 100*time.Millisecond)
const shortWait = 1 * time.Millisecond
const longWait = 150 * time.Millisecond
const maxTimeInError = 100 * time.Millisecond

t.Run("long running error", func(t *testing.T) {
err1 := fmt.Errorf("test1")
// TestLastErrorZeroMaxTime tests maxTimeInError = 0, should always retry
func TestLastErrorZeroMaxTime(t *testing.T) {
le := newLastError("test", 0)
err1 := fmt.Errorf("error1")
le.record(err1)
require.True(t, le.shouldRetry())
time.Sleep(shortWait)
require.True(t, le.shouldRetry())
time.Sleep(longWait)
require.True(t, le.shouldRetry())
}

// TestLastErrorNoError ensures that an uninitialized lastError always retries
func TestLastErrorNoError(t *testing.T) {
le := newLastError("test", maxTimeInError)
require.True(t, le.shouldRetry())
err1 := fmt.Errorf("error1")
le.record(err1)
require.True(t, le.shouldRetry())
le.record(nil)
require.True(t, le.shouldRetry())
}

// TestLastErrorOneError validates that we retry an error if happening within the maxTimeInError, but not after
func TestLastErrorOneError(t *testing.T) {
le := newLastError("test", maxTimeInError)
err1 := fmt.Errorf("error1")
le.record(err1)
require.True(t, le.shouldRetry())
time.Sleep(shortWait)
require.True(t, le.shouldRetry())
time.Sleep(shortWait)
require.True(t, le.shouldRetry())
time.Sleep(longWait)
require.False(t, le.shouldRetry())
}

// TestLastErrorRepeatedError confirms that if same error is repeated we don't retry
// unless it happens after maxTimeInError
func TestLastErrorRepeatedError(t *testing.T) {
le := newLastError("test", maxTimeInError)
err1 := fmt.Errorf("error1")
le.record(err1)
require.True(t, le.shouldRetry())
for i := 1; i < 10; i++ {
le.record(err1)
require.True(t, le.shouldRetry())
time.Sleep(150 * time.Millisecond)
require.False(t, le.shouldRetry())
})

t.Run("new long running error", func(t *testing.T) {
err2 := fmt.Errorf("test2")
le.record(err2)
require.True(t, le.shouldRetry())
for i := 1; i < 10; i++ {
le.record(err2)
}
require.True(t, le.shouldRetry())
time.Sleep(150 * time.Millisecond)
le.record(err2)
require.False(t, le.shouldRetry())
})

t.Run("no error", func(t *testing.T) {
le.record(nil)
require.True(t, le.shouldRetry())
})
time.Sleep(shortWait)
}
require.True(t, le.shouldRetry())

// same error happens after maxTimeInError, so it should retry
time.Sleep(longWait)
require.False(t, le.shouldRetry())
le.record(err1)
require.True(t, le.shouldRetry())
}

0 comments on commit 6998097

Please sign in to comment.