Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle host not available scenario during peer bootstrap more gracefully #1677

Merged
merged 10 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/client/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (

var (
errConnectionPoolClosed = errors.New("connection pool closed")
errConnectionPoolHasNoConnections = errors.New("connection pool has no connections")
errConnectionPoolHasNoConnections = newHostNotAvailableError(errors.New("connection pool has no connections"))
)

type connPool struct {
Expand Down
21 changes: 21 additions & 0 deletions src/dbnode/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func NumError(err error) int {
return 0
}

type hostNotAvailableError struct {
err error
}

func (h hostNotAvailableError) Error() string {
return h.err.Error()
}

func newHostNotAvailableError(err error) error {
return xerrors.NewNonRetryableError(hostNotAvailableError{err: err})
}

func isHostNotAvailableError(err error) bool {
inner := xerrors.GetInnerNonRetryableError(err)
if inner == nil {
return false
}
_, ok := inner.(hostNotAvailableError)
return ok
}

type consistencyResultError interface {
error

Expand Down
57 changes: 39 additions & 18 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import (
"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/convert"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/storage/index"
idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
Expand Down Expand Up @@ -71,6 +71,8 @@ const (
gaugeReportInterval = 500 * time.Millisecond
blockMetadataChBufSize = 4096
shardResultCapacity = 4096
hostNotAvailableMinSleepInterval = 1 * time.Millisecond
hostNotAvailableMaxSleepInterval = 100 * time.Millisecond
)

type resultTypeEnum string
Expand Down Expand Up @@ -103,7 +105,7 @@ var (
// the connect consistency level specified is not recognized
errSessionInvalidConnectClusterConnectConsistencyLevel = errors.New("session has invalid connect consistency level specified")
// errSessionHasNoHostQueueForHost is raised when host queue requested for a missing host
errSessionHasNoHostQueueForHost = errors.New("session has no host queue for host")
errSessionHasNoHostQueueForHost = newHostNotAvailableError(errors.New("session has no host queue for host"))
// errUnableToEncodeTags is raised when the server is unable to encode provided tags
// to be sent over the wire.
errUnableToEncodeTags = errors.New("unable to include tags")
Expand Down Expand Up @@ -2112,21 +2114,26 @@ func (s *session) streamBlocksMetadataFromPeers(
// returned it will likely not be nil, this lets us restart fetching
// if we need to (if consistency has not been achieved yet) without
// losing place in the pagination.
currPageToken pageToken
currPageToken pageToken
currHostNotAvailableSleepInterval = hostNotAvailableMinSleepInterval
)
condition := func() bool {
if firstAttempt {
// Always attempt at least once
firstAttempt = false
return true
}
currLevel := level.value()
majority := int(majority)
enqueued := int(enqueued)
success := int(atomic.LoadInt32(&success))

doRetry := !topology.ReadConsistencyAchieved(currLevel, majority, enqueued, success) &&
errs.getAbortError() == nil
var (
currLevel = level.value()
majority = int(majority)
enqueued = int(enqueued)
success = int(atomic.LoadInt32(&success))
)
metReadConsistency := topology.ReadConsistencyAchieved(
currLevel, majority, enqueued, success)
doRetry := !metReadConsistency && errs.getAbortError() == nil

if doRetry {
// Track that we are reattempting the fetch metadata
// pagination from a peer
Expand All @@ -2138,19 +2145,33 @@ func (s *session) streamBlocksMetadataFromPeers(
var err error
currPageToken, err = s.streamBlocksMetadataFromPeer(namespace, shardID,
peer, start, end, currPageToken, metadataCh, resultOpts, progress)

// Set error or success if err is nil
errs.setError(idx, err)

// Check exit criteria
// hostNotAvailable is a NonRetryableError for the purposes of short-circuiting
// the automatic retry functionality, but in this case the client should avoid
// aborting and continue retrying at this level until consistency can be reached.
if isHostNotAvailableError(err) {
// Prevent the loop from spinning too aggressively in the short-circuiting case.
time.Sleep(currHostNotAvailableSleepInterval)
currHostNotAvailableSleepInterval = minDuration(
currHostNotAvailableSleepInterval*2,
hostNotAvailableMaxSleepInterval,
)
continue
}

if err != nil && xerrors.IsNonRetryableError(err) {
errs.setAbortError(err)
return // Cannot recover from this error, so we break from the loop
}

if err == nil {
atomic.AddInt32(&success, 1)
return
}

// There was a retryable error, continue looping.
}
}()
}
Expand Down Expand Up @@ -2833,13 +2854,6 @@ func (s *session) streamBlocksBatchFromPeer(
result, attemptErr = client.FetchBlocksRaw(tctx, req)
})
err := xerrors.FirstError(borrowErr, attemptErr)
// Do not retry if cannot borrow the connection or
// if the connection pool has no connections
switch err {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted this since this will be non-retryable by default

case errSessionHasNoHostQueueForHost,
errConnectionPoolHasNoConnections:
err = xerrors.NewNonRetryableError(err)
}
return err
}); err != nil {
blocksErr := fmt.Errorf(
Expand Down Expand Up @@ -3883,3 +3897,10 @@ func histogramWithDurationBuckets(scope tally.Scope, name string) tally.Histogra
})
return sub.Histogram(name, histogramDurationBuckets())
}

func minDuration(x, y time.Duration) time.Duration {
if x < y {
return x
}
return y
}
Loading