From 202378cf1fe23bdd8fc125afedbaa01d62521ef0 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 29 May 2019 19:53:15 -0500 Subject: [PATCH 01/10] Dont retry hostNotAvailableErrors --- src/dbnode/client/connection_pool.go | 2 +- src/dbnode/client/errors.go | 17 +++++++++++++++++ src/dbnode/client/session.go | 25 ++++++++++++++++++++----- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/src/dbnode/client/connection_pool.go b/src/dbnode/client/connection_pool.go index eed6612a65..47d7a7d0b5 100644 --- a/src/dbnode/client/connection_pool.go +++ b/src/dbnode/client/connection_pool.go @@ -46,7 +46,7 @@ const ( var ( errConnectionPoolClosed = errors.New("connection pool closed") - errConnectionPoolHasNoConnections = errors.New("connection pool has no connections") + errConnectionPoolHasNoConnections = newHostNotAvailableError(errors.New("connection pool has no connections")) ) type connPool struct { diff --git a/src/dbnode/client/errors.go b/src/dbnode/client/errors.go index 2fa0559372..f5ad4ae834 100644 --- a/src/dbnode/client/errors.go +++ b/src/dbnode/client/errors.go @@ -94,6 +94,23 @@ func NumError(err error) int { return 0 } +type hostNotAvailableError struct { + err error +} + +func (h hostNotAvailableError) Error() string { + return h.err.Error() +} + +func newHostNotAvailableError(err error) hostNotAvailableError { + return hostNotAvailableError{err: err} +} + +func isHostNotAvailableError(e error) bool { + _, ok := e.(hostNotAvailableError) + return ok +} + type consistencyResultError interface { error diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index b7f5b7866c..c7b6317d19 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -36,13 +36,13 @@ import ( "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/convert" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" idxconvert "github.com/m3db/m3/src/dbnode/storage/index/convert" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -103,7 +103,7 @@ var ( // the connect consistency level specified is not recognized errSessionInvalidConnectClusterConnectConsistencyLevel = errors.New("session has invalid connect consistency level specified") // errSessionHasNoHostQueueForHost is raised when host queue requested for a missing host - errSessionHasNoHostQueueForHost = errors.New("session has no host queue for host") + errSessionHasNoHostQueueForHost = newHostNotAvailableError(errors.New("session has no host queue for host")) // errUnableToEncodeTags is raised when the server is unable to encode provided tags // to be sent over the wire. errUnableToEncodeTags = errors.New("unable to include tags") @@ -2319,11 +2319,28 @@ func (s *session) streamBlocksMetadataFromPeer( fetchFn := func() error { borrowErr := peer.BorrowConnection(checkedAttemptFn) + // Don't retry if the host is not available to prevent exponential + // backoff from causing this to take a long time when subsequent + // requests will fail. + if isHostNotAvailableError(borrowErr) { + borrowErr = xerrors.NewNonRetryableError(borrowErr) + } return xerrors.FirstError(borrowErr, attemptErr) } for moreResults { if err := s.streamBlocksRetrier.Attempt(fetchFn); err != nil { + // Check if the error was a hostNotAvailableError wrapped in a + // nonRetryableError. If so, return the hostNotAvailableError + // instead of the non-retryable error since the error is technically + // retryable, it was only wrapped to prevent the exponential backoff + // in the actual retrier. + if xerrors.IsNonRetryableError(err) { + inner := xerrors.GetInnerNonRetryableError(err) + if isHostNotAvailableError(inner) { + err = inner + } + } return startPageToken, err } } @@ -2835,9 +2852,7 @@ func (s *session) streamBlocksBatchFromPeer( err := xerrors.FirstError(borrowErr, attemptErr) // Do not retry if cannot borrow the connection or // if the connection pool has no connections - switch err { - case errSessionHasNoHostQueueForHost, - errConnectionPoolHasNoConnections: + if isHostNotAvailableError(err) { err = xerrors.NewNonRetryableError(err) } return err From d655d8292321b359a4ad2d17011ce51db3eab015 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Sun, 2 Jun 2019 14:06:06 -0500 Subject: [PATCH 02/10] wip with wip test --- src/dbnode/client/session.go | 1 - .../client/session_fetch_bulk_blocks_test.go | 145 +++++++++++++++++- 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c7b6317d19..75af71b42f 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2138,7 +2138,6 @@ func (s *session) streamBlocksMetadataFromPeers( var err error currPageToken, err = s.streamBlocksMetadataFromPeer(namespace, shardID, peer, start, end, currPageToken, metadataCh, resultOpts, progress) - // Set error or success if err is nil errs.setError(idx, err) diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index ea807fbd9e..a82e07f623 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -279,6 +279,149 @@ func TestFetchBootstrapBlocksAllPeersSucceedV2(t *testing.T) { assert.NoError(t, session.Close()) } +// TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier was added as a regression test +// to ensure that in the scenario where a peer is not available (hard down) but the others are +// available the streamBlocksMetadataFromPeers does not wait for all of the exponential retries +// to the downed host to fail before continuing. This is important because if the client waits for +// all the retries to the downed host to complete it can block each metadata fetch for up to 30 seconds +// due to the exponential backoff logic in the retrier. +func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := newSessionTestAdminOptions() + s, err := newSession(opts) + require.NoError(t, err) + session := s.(*session) + + mockHostQueues, mockClients := mockHostQueuesAndClientsForFetchBootstrapBlocks(ctrl, opts) + session.newHostQueueFn = mockHostQueues.newHostQueueFn() + + // Don't drain the peer blocks queue, explicitly drain ourselves to + // avoid unpredictable batches being retrieved from peers + var ( + qs []*peerBlocksQueue + qsMutex sync.RWMutex + ) + session.newPeerBlocksQueueFn = func( + peer peer, + maxQueueSize int, + _ time.Duration, + workers xsync.WorkerPool, + processFn processFn, + ) *peerBlocksQueue { + qsMutex.Lock() + defer qsMutex.Unlock() + q := newPeerBlocksQueue(peer, maxQueueSize, 0, workers, processFn) + qs = append(qs, q) + return q + } + + require.NoError(t, session.Open()) + + batchSize := opts.FetchSeriesBlocksBatchSize() + + start := time.Now().Truncate(blockSize).Add(blockSize * -(24 - 1)) + + blocks := []testBlocks{ + { + id: fooID, + blocks: []testBlock{ + { + start: start.Add(blockSize * 1), + segments: &testBlockSegments{merged: &testBlockSegment{ + head: []byte{1, 2}, + tail: []byte{3}, + }}, + }, + }, + }, + { + id: barID, + blocks: []testBlock{ + { + start: start.Add(blockSize * 2), + segments: &testBlockSegments{merged: &testBlockSegment{ + head: []byte{4, 5}, + tail: []byte{6}, + }}, + }, + }, + }, + { + id: bazID, + blocks: []testBlock{ + { + start: start.Add(blockSize * 3), + segments: &testBlockSegments{merged: &testBlockSegment{ + head: []byte{7, 8}, + tail: []byte{9}, + }}, + }, + }, + }, + } + + // Expect the fetch metadata calls + metadataResult := resultMetadataFromBlocks(blocks) + // Skip the first client which is the client for the origin + mockClients[1:3].expectFetchMetadataAndReturn(metadataResult, opts) + mockClients[3].EXPECT(). + FetchBlocksMetadataRawV2(gomock.Any(), gomock.Any()). + Return(ret, nil) + + // Expect the fetch blocks calls + participating := len(mockClients) - 1 + blocksExpectedReqs, blocksResult := expectedReqsAndResultFromBlocks(t, + blocks, batchSize, participating, + func(blockIdx int) (clientIdx int) { + // Round robin to match the best peer selection algorithm + return blockIdx % participating + }) + // Skip the first client which is the client for the origin + for i, client := range mockClients[1:] { + expectFetchBlocksAndReturn(client, blocksExpectedReqs[i], blocksResult[i]) + } + + // Make sure peer selection is round robin to match our expected + // peer fetch calls + session.pickBestPeerFn = newRoundRobinPickBestPeerFn() + + // Fetch blocks + go func() { + // Trigger peer queues to drain explicitly when all work enqueued + for { + qsMutex.RLock() + assigned := 0 + for _, q := range qs { + assigned += int(atomic.LoadUint64(&q.assigned)) + } + qsMutex.RUnlock() + if assigned == len(blocks) { + qsMutex.Lock() + defer qsMutex.Unlock() + for _, q := range qs { + q.drain() + } + return + } + time.Sleep(10 * time.Millisecond) + } + }() + rangeStart := start + rangeEnd := start.Add(blockSize * (24 - 1)) + bootstrapOpts := newResultTestOptions() + result, err := session.FetchBootstrapBlocksFromPeers( + testsNsMetadata(t), 0, rangeStart, rangeEnd, bootstrapOpts) + assert.NoError(t, err) + assert.NotNil(t, result) + + // Assert result + assertFetchBootstrapBlocksResult(t, blocks, result) + + assert.NoError(t, session.Close()) +} + type fetchBlocksFromPeersTestScenarioGenerator func( peerIdx int, numPeers int, From 6932eb465015547687f33e97dc139cde935fff26 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 3 Jun 2019 17:44:46 -0400 Subject: [PATCH 03/10] working regression test --- src/dbnode/client/session.go | 16 ++- .../client/session_fetch_bulk_blocks_test.go | 112 +++++++++++++----- 2 files changed, 95 insertions(+), 33 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 75af71b42f..678b417cad 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2120,13 +2120,17 @@ func (s *session) streamBlocksMetadataFromPeers( firstAttempt = false return true } - currLevel := level.value() - majority := int(majority) - enqueued := int(enqueued) - success := int(atomic.LoadInt32(&success)) - doRetry := !topology.ReadConsistencyAchieved(currLevel, majority, enqueued, success) && - errs.getAbortError() == nil + var ( + currLevel = level.value() + majority = int(majority) + enqueued = int(enqueued) + success = int(atomic.LoadInt32(&success)) + metReadConsistency = topology.ReadConsistencyAchieved( + currLevel, majority, enqueued, success) + doRetry = !metReadConsistency && errs.getAbortError() == nil + ) + if doRetry { // Track that we are reattempting the fetch metadata // pagination from a peer diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index a82e07f623..adf9bda253 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -289,12 +289,61 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - opts := newSessionTestAdminOptions() + opts := newSessionTestAdminOptions(). + // Set bootstrap consistency level to unstrict majority because there are only 3 nodes in the + // cluster. The first one will not return data because it is the origin and the last node will + // return an error. + SetBootstrapConsistencyLevel(topology.ReadConsistencyLevelUnstrictMajority). + // Configure the stream blocks retrier such that if the short-circuit logic did not work the + // test would timeout. + SetStreamBlocksRetrier(xretry.NewRetrier( + xretry.NewOptions(). + SetBackoffFactor(10). + SetMaxRetries(10). + SetInitialBackoff(30 * time.Second). + SetJitter(true), + )) s, err := newSession(opts) require.NoError(t, err) session := s.(*session) - mockHostQueues, mockClients := mockHostQueuesAndClientsForFetchBootstrapBlocks(ctrl, opts) + var ( + mockHostQueues MockHostQueues + mockClients MockTChanNodes + ) + hostShardSets := sessionTestHostAndShards(sessionTestShardSet()) + for i := 0; i < len(hostShardSets)-1; i++ { + host := hostShardSets[i].Host() + hostQueue, client := defaultHostAndClientWithExpect(ctrl, host, opts) + mockHostQueues = append(mockHostQueues, hostQueue) + mockClients = append(mockClients, client) + } + + // Construct the last hostQueue with a connection pool that will error out. + host := hostShardSets[len(hostShardSets)-1].Host() + connectionPool := NewMockconnectionPool(ctrl) + connectionPool.EXPECT(). + NextClient(). + Return(nil, errConnectionPoolHasNoConnections). + AnyTimes() + hostQueue := NewMockhostQueue(ctrl) + hostQueue.EXPECT().Open() + hostQueue.EXPECT().Host().Return(host).AnyTimes() + hostQueue.EXPECT(). + ConnectionCount(). + Return(opts.MinConnectionCount()). + Times(sessionTestShards) + hostQueue.EXPECT(). + ConnectionPool(). + Return(connectionPool). + AnyTimes() + hostQueue.EXPECT(). + BorrowConnection(gomock.Any()). + Return(errConnectionPoolHasNoConnections). + AnyTimes() + hostQueue.EXPECT().Close() + mockHostQueues = append(mockHostQueues, hostQueue) + session.newHostQueueFn = mockHostQueues.newHostQueueFn() // Don't drain the peer blocks queue, explicitly drain ourselves to @@ -362,34 +411,32 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { }, } - // Expect the fetch metadata calls + // Expect the fetch metadata calls. metadataResult := resultMetadataFromBlocks(blocks) - // Skip the first client which is the client for the origin - mockClients[1:3].expectFetchMetadataAndReturn(metadataResult, opts) - mockClients[3].EXPECT(). - FetchBlocksMetadataRawV2(gomock.Any(), gomock.Any()). - Return(ret, nil) + // Skip the first client which is the client for the origin. Skip the last + // client because its going to return an error. + mockClients[1:].expectFetchMetadataAndReturn(metadataResult, opts) - // Expect the fetch blocks calls + // Expect the fetch blocks calls. participating := len(mockClients) - 1 blocksExpectedReqs, blocksResult := expectedReqsAndResultFromBlocks(t, blocks, batchSize, participating, func(blockIdx int) (clientIdx int) { - // Round robin to match the best peer selection algorithm + // Round robin to match the best peer selection algorithm. return blockIdx % participating }) - // Skip the first client which is the client for the origin + // Skip the first client which is the client for the origin. for i, client := range mockClients[1:] { expectFetchBlocksAndReturn(client, blocksExpectedReqs[i], blocksResult[i]) } // Make sure peer selection is round robin to match our expected - // peer fetch calls + // peer fetch calls. session.pickBestPeerFn = newRoundRobinPickBestPeerFn() - // Fetch blocks + // Fetch blocks. go func() { - // Trigger peer queues to drain explicitly when all work enqueued + // Trigger peer queues to drain explicitly when all work enqueued. for { qsMutex.RLock() assigned := 0 @@ -416,7 +463,7 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, result) - // Assert result + // Assert result. assertFetchBootstrapBlocksResult(t, blocks, result) assert.NoError(t, session.Close()) @@ -1940,24 +1987,35 @@ func mockHostQueuesAndClientsForFetchBootstrapBlocks( hostShardSets := sessionTestHostAndShards(sessionTestShardSet()) for i := 0; i < len(hostShardSets); i++ { host := hostShardSets[i].Host() - client := rpc.NewMockTChanNode(ctrl) - connectionPool := NewMockconnectionPool(ctrl) - connectionPool.EXPECT().NextClient().Return(client, nil).AnyTimes() - hostQueue := NewMockhostQueue(ctrl) - hostQueue.EXPECT().Open() - hostQueue.EXPECT().Host().Return(host).AnyTimes() - hostQueue.EXPECT().ConnectionCount().Return(opts.MinConnectionCount()).Times(sessionTestShards) - hostQueue.EXPECT().ConnectionPool().Return(connectionPool).AnyTimes() - hostQueue.EXPECT().BorrowConnection(gomock.Any()).Do(func(fn withConnectionFn) { - fn(client) - }).Return(nil).AnyTimes() - hostQueue.EXPECT().Close() + hostQueue, client := defaultHostAndClientWithExpect(ctrl, host, opts) hostQueues = append(hostQueues, hostQueue) clients = append(clients, client) } return hostQueues, clients } +func defaultHostAndClientWithExpect( + ctrl *gomock.Controller, + host topology.Host, + opts AdminOptions, +) (*MockhostQueue, *rpc.MockTChanNode) { + client := rpc.NewMockTChanNode(ctrl) + connectionPool := NewMockconnectionPool(ctrl) + connectionPool.EXPECT().NextClient().Return(client, nil).AnyTimes() + + hostQueue := NewMockhostQueue(ctrl) + hostQueue.EXPECT().Open() + hostQueue.EXPECT().Host().Return(host).AnyTimes() + hostQueue.EXPECT().ConnectionCount().Return(opts.MinConnectionCount()).Times(sessionTestShards) + hostQueue.EXPECT().ConnectionPool().Return(connectionPool).AnyTimes() + hostQueue.EXPECT().BorrowConnection(gomock.Any()).Do(func(fn withConnectionFn) { + fn(client) + }).Return(nil).AnyTimes() + hostQueue.EXPECT().Close() + + return hostQueue, client +} + func resultMetadataFromBlocks( blocks []testBlocks, ) []testBlocksMetadata { From 1a3e75608cafc5ad5c91ed863d650ce0aa97f3dd Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 3 Jun 2019 17:53:13 -0400 Subject: [PATCH 04/10] make hostNotAvailable non-retryable by default --- src/dbnode/client/errors.go | 12 ++++++++---- src/dbnode/client/session.go | 30 +++++++++--------------------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/dbnode/client/errors.go b/src/dbnode/client/errors.go index f5ad4ae834..50eb9796c9 100644 --- a/src/dbnode/client/errors.go +++ b/src/dbnode/client/errors.go @@ -102,12 +102,16 @@ func (h hostNotAvailableError) Error() string { return h.err.Error() } -func newHostNotAvailableError(err error) hostNotAvailableError { - return hostNotAvailableError{err: err} +func newHostNotAvailableError(err error) error { + return xerrors.NewNonRetryableError(hostNotAvailableError{err: err}) } -func isHostNotAvailableError(e error) bool { - _, ok := e.(hostNotAvailableError) +func isHostNotAvailableError(err error) bool { + inner := xerrors.GetInnerNonRetryableError(err) + if inner == nil { + return false + } + _, ok := inner.(hostNotAvailableError) return ok } diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 678b417cad..9ac27b220d 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2322,27 +2322,20 @@ func (s *session) streamBlocksMetadataFromPeer( fetchFn := func() error { borrowErr := peer.BorrowConnection(checkedAttemptFn) - // Don't retry if the host is not available to prevent exponential - // backoff from causing this to take a long time when subsequent - // requests will fail. - if isHostNotAvailableError(borrowErr) { - borrowErr = xerrors.NewNonRetryableError(borrowErr) - } return xerrors.FirstError(borrowErr, attemptErr) } for moreResults { if err := s.streamBlocksRetrier.Attempt(fetchFn); err != nil { - // Check if the error was a hostNotAvailableError wrapped in a - // nonRetryableError. If so, return the hostNotAvailableError - // instead of the non-retryable error since the error is technically - // retryable, it was only wrapped to prevent the exponential backoff - // in the actual retrier. - if xerrors.IsNonRetryableError(err) { - inner := xerrors.GetInnerNonRetryableError(err) - if isHostNotAvailableError(inner) { - err = inner - } + // Check if the error was a hostNotAvailableError. If so, return the + // raw (retryable) hostNotAvailableError since the error is technically + // retryable but was wrapped to prevent the exponential backoff in the + // actual retrier. + if isHostNotAvailableError(err) { + // hostNotAvailable is non-retryable by default, but in this case we want + // to return a retryable error so that the caller can continue retrying + // indefinitely until consistency is reached. + err = xerrors.GetInnerNonRetryableError(err) } return startPageToken, err } @@ -2853,11 +2846,6 @@ func (s *session) streamBlocksBatchFromPeer( result, attemptErr = client.FetchBlocksRaw(tctx, req) }) err := xerrors.FirstError(borrowErr, attemptErr) - // Do not retry if cannot borrow the connection or - // if the connection pool has no connections - if isHostNotAvailableError(err) { - err = xerrors.NewNonRetryableError(err) - } return err }); err != nil { blocksErr := fmt.Errorf( From 20e5ac84e27a3fcc680d2b1cca9b3be6b70c1ae5 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 3 Jun 2019 18:03:30 -0400 Subject: [PATCH 05/10] refactor test --- .../client/session_fetch_bulk_blocks_test.go | 151 +++++++----------- 1 file changed, 60 insertions(+), 91 deletions(-) diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index adf9bda253..31fa327dbe 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -145,45 +145,8 @@ func newRoundRobinPickBestPeerFn() pickBestPeerFn { } } -func TestFetchBootstrapBlocksAllPeersSucceedV2(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - opts := newSessionTestAdminOptions() - s, err := newSession(opts) - require.NoError(t, err) - session := s.(*session) - - mockHostQueues, mockClients := mockHostQueuesAndClientsForFetchBootstrapBlocks(ctrl, opts) - session.newHostQueueFn = mockHostQueues.newHostQueueFn() - - // Don't drain the peer blocks queue, explicitly drain ourselves to - // avoid unpredictable batches being retrieved from peers - var ( - qs []*peerBlocksQueue - qsMutex sync.RWMutex - ) - session.newPeerBlocksQueueFn = func( - peer peer, - maxQueueSize int, - _ time.Duration, - workers xsync.WorkerPool, - processFn processFn, - ) *peerBlocksQueue { - qsMutex.Lock() - defer qsMutex.Unlock() - q := newPeerBlocksQueue(peer, maxQueueSize, 0, workers, processFn) - qs = append(qs, q) - return q - } - - require.NoError(t, session.Open()) - - batchSize := opts.FetchSeriesBlocksBatchSize() - - start := time.Now().Truncate(blockSize).Add(blockSize * -(24 - 1)) - - blocks := []testBlocks{ +func newTestBlocks(start time.Time) []testBlocks { + return []testBlocks{ { id: fooID, blocks: []testBlock{ @@ -221,6 +184,46 @@ func TestFetchBootstrapBlocksAllPeersSucceedV2(t *testing.T) { }, }, } +} +func TestFetchBootstrapBlocksAllPeersSucceedV2(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := newSessionTestAdminOptions() + s, err := newSession(opts) + require.NoError(t, err) + session := s.(*session) + + mockHostQueues, mockClients := mockHostQueuesAndClientsForFetchBootstrapBlocks(ctrl, opts) + session.newHostQueueFn = mockHostQueues.newHostQueueFn() + + // Don't drain the peer blocks queue, explicitly drain ourselves to + // avoid unpredictable batches being retrieved from peers + var ( + qs []*peerBlocksQueue + qsMutex sync.RWMutex + ) + session.newPeerBlocksQueueFn = func( + peer peer, + maxQueueSize int, + _ time.Duration, + workers xsync.WorkerPool, + processFn processFn, + ) *peerBlocksQueue { + qsMutex.Lock() + defer qsMutex.Unlock() + q := newPeerBlocksQueue(peer, maxQueueSize, 0, workers, processFn) + qs = append(qs, q) + return q + } + + require.NoError(t, session.Open()) + + var ( + batchSize = opts.FetchSeriesBlocksBatchSize() + start = time.Now().Truncate(blockSize).Add(blockSize * -(24 - 1)) + blocks = newTestBlocks(start) + ) // Expect the fetch metadata calls metadataResult := resultMetadataFromBlocks(blocks) @@ -310,13 +313,19 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { var ( mockHostQueues MockHostQueues mockClients MockTChanNodes + hostShardSets = sessionTestHostAndShards(sessionTestShardSet()) ) - hostShardSets := sessionTestHostAndShards(sessionTestShardSet()) + // Skip the last one because it is going to be manually configured to return an error. for i := 0; i < len(hostShardSets)-1; i++ { host := hostShardSets[i].Host() hostQueue, client := defaultHostAndClientWithExpect(ctrl, host, opts) mockHostQueues = append(mockHostQueues, hostQueue) - mockClients = append(mockClients, client) + + if i != 0 { + // Skip creating a client for the origin because it will never be called so we want + // to avoid setting up expectations for it. + mockClients = append(mockClients, client) + } } // Construct the last hostQueue with a connection pool that will error out. @@ -365,60 +374,20 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { qs = append(qs, q) return q } - require.NoError(t, session.Open()) - batchSize := opts.FetchSeriesBlocksBatchSize() - - start := time.Now().Truncate(blockSize).Add(blockSize * -(24 - 1)) - - blocks := []testBlocks{ - { - id: fooID, - blocks: []testBlock{ - { - start: start.Add(blockSize * 1), - segments: &testBlockSegments{merged: &testBlockSegment{ - head: []byte{1, 2}, - tail: []byte{3}, - }}, - }, - }, - }, - { - id: barID, - blocks: []testBlock{ - { - start: start.Add(blockSize * 2), - segments: &testBlockSegments{merged: &testBlockSegment{ - head: []byte{4, 5}, - tail: []byte{6}, - }}, - }, - }, - }, - { - id: bazID, - blocks: []testBlock{ - { - start: start.Add(blockSize * 3), - segments: &testBlockSegments{merged: &testBlockSegment{ - head: []byte{7, 8}, - tail: []byte{9}, - }}, - }, - }, - }, - } + var ( + batchSize = opts.FetchSeriesBlocksBatchSize() + start = time.Now().Truncate(blockSize).Add(blockSize * -(24 - 1)) + blocks = newTestBlocks(start) - // Expect the fetch metadata calls. - metadataResult := resultMetadataFromBlocks(blocks) - // Skip the first client which is the client for the origin. Skip the last - // client because its going to return an error. - mockClients[1:].expectFetchMetadataAndReturn(metadataResult, opts) + // Expect the fetch metadata calls. + metadataResult = resultMetadataFromBlocks(blocks) + ) + mockClients.expectFetchMetadataAndReturn(metadataResult, opts) // Expect the fetch blocks calls. - participating := len(mockClients) - 1 + participating := len(mockClients) blocksExpectedReqs, blocksResult := expectedReqsAndResultFromBlocks(t, blocks, batchSize, participating, func(blockIdx int) (clientIdx int) { @@ -426,7 +395,7 @@ func TestFetchBootstrapBlocksDontRetryHostNotAvailableInRetrier(t *testing.T) { return blockIdx % participating }) // Skip the first client which is the client for the origin. - for i, client := range mockClients[1:] { + for i, client := range mockClients { expectFetchBlocksAndReturn(client, blocksExpectedReqs[i], blocksResult[i]) } From afdf3a7db872493ebde6a41a463c4142f546a281 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 3 Jun 2019 18:28:18 -0400 Subject: [PATCH 06/10] add default sleep to prevent aggressive looping --- src/dbnode/client/session.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 9ac27b220d..09607376cd 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -66,11 +66,12 @@ import ( ) const ( - clusterConnectWaitInterval = 10 * time.Millisecond - blocksMetadataChannelInitialCapacity = 4096 - gaugeReportInterval = 500 * time.Millisecond - blockMetadataChBufSize = 4096 - shardResultCapacity = 4096 + clusterConnectWaitInterval = 10 * time.Millisecond + blocksMetadataChannelInitialCapacity = 4096 + gaugeReportInterval = 500 * time.Millisecond + blockMetadataChBufSize = 4096 + shardResultCapacity = 4096 + streamBlocksMetadataFromPeerErrorSleepInterval = 1 * time.Millisecond ) type resultTypeEnum string @@ -2154,6 +2155,12 @@ func (s *session) streamBlocksMetadataFromPeers( atomic.AddInt32(&success, 1) return } + + // Prevent the loop from spinning too aggressively if + // streamBlocksMetadataFromPeer is short-circuiting (which can happen in + // the situation that the peer is hard-down / has no connections available + // for it in the connection pool). + time.Sleep(streamBlocksMetadataFromPeerErrorSleepInterval) } }() } From 7525d5a0f56a60969bf4244f6ee99e4b55cb98b6 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 5 Jun 2019 21:37:23 -0400 Subject: [PATCH 07/10] refactor and simplify --- src/dbnode/client/session.go | 56 ++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 09607376cd..32b5f88e8e 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -66,12 +66,13 @@ import ( ) const ( - clusterConnectWaitInterval = 10 * time.Millisecond - blocksMetadataChannelInitialCapacity = 4096 - gaugeReportInterval = 500 * time.Millisecond - blockMetadataChBufSize = 4096 - shardResultCapacity = 4096 - streamBlocksMetadataFromPeerErrorSleepInterval = 1 * time.Millisecond + clusterConnectWaitInterval = 10 * time.Millisecond + blocksMetadataChannelInitialCapacity = 4096 + gaugeReportInterval = 500 * time.Millisecond + blockMetadataChBufSize = 4096 + shardResultCapacity = 4096 + hostNotAvailableMinSleepInterval = 1 * time.Millisecond + hostNotAvailableMaxSleepInterval = 10 * time.Millisecond ) type resultTypeEnum string @@ -2113,7 +2114,8 @@ func (s *session) streamBlocksMetadataFromPeers( // returned it will likely not be nil, this lets us restart fetching // if we need to (if consistency has not been achieved yet) without // losing place in the pagination. - currPageToken pageToken + currPageToken pageToken + currHostNotAvailableSleepInterval = hostNotAvailableMinSleepInterval ) condition := func() bool { if firstAttempt { @@ -2146,21 +2148,30 @@ func (s *session) streamBlocksMetadataFromPeers( // Set error or success if err is nil errs.setError(idx, err) - // Check exit criteria - if err != nil && xerrors.IsNonRetryableError(err) { + // hostNotAvailable is a NonRetryableError for the purposes of short-circuiting + // the automatic retry functionality, but in this case the client should avoid + // aborting and continue retrying at this level until consistency can be reached. + if isHostNotAvailableError(err) { + // Prevent the loop from spinning too aggressively in the short-circuiting case. + time.Sleep(currHostNotAvailableSleepInterval) + currHostNotAvailableMinSleepInterval = minDuration( + currHostNotAvailableSleepInterval*2, + hostNotAvailableMaxSleepInterval, + ) + continue + } + + if isNonRetryableErr && xerrors.IsNonRetryableError(err) { errs.setAbortError(err) return // Cannot recover from this error, so we break from the loop } + if err == nil { atomic.AddInt32(&success, 1) return } - // Prevent the loop from spinning too aggressively if - // streamBlocksMetadataFromPeer is short-circuiting (which can happen in - // the situation that the peer is hard-down / has no connections available - // for it in the connection pool). - time.Sleep(streamBlocksMetadataFromPeerErrorSleepInterval) + // There was a retryable error, continue looping. } }() } @@ -2334,16 +2345,6 @@ func (s *session) streamBlocksMetadataFromPeer( for moreResults { if err := s.streamBlocksRetrier.Attempt(fetchFn); err != nil { - // Check if the error was a hostNotAvailableError. If so, return the - // raw (retryable) hostNotAvailableError since the error is technically - // retryable but was wrapped to prevent the exponential backoff in the - // actual retrier. - if isHostNotAvailableError(err) { - // hostNotAvailable is non-retryable by default, but in this case we want - // to return a retryable error so that the caller can continue retrying - // indefinitely until consistency is reached. - err = xerrors.GetInnerNonRetryableError(err) - } return startPageToken, err } } @@ -3896,3 +3897,10 @@ func histogramWithDurationBuckets(scope tally.Scope, name string) tally.Histogra }) return sub.Histogram(name, histogramDurationBuckets()) } + +func minDuration(x, y time.Duration) { + if x < y { + return x + } + return y +} From f5e35ad718621feac207beda8a3322de1bbe4285 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 5 Jun 2019 21:40:47 -0400 Subject: [PATCH 08/10] fix compilation issues --- src/dbnode/client/session.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 32b5f88e8e..619a627f63 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2154,14 +2154,14 @@ func (s *session) streamBlocksMetadataFromPeers( if isHostNotAvailableError(err) { // Prevent the loop from spinning too aggressively in the short-circuiting case. time.Sleep(currHostNotAvailableSleepInterval) - currHostNotAvailableMinSleepInterval = minDuration( + currHostNotAvailableSleepInterval = minDuration( currHostNotAvailableSleepInterval*2, hostNotAvailableMaxSleepInterval, ) continue } - if isNonRetryableErr && xerrors.IsNonRetryableError(err) { + if err != nil && xerrors.IsNonRetryableError(err) { errs.setAbortError(err) return // Cannot recover from this error, so we break from the loop } @@ -3898,7 +3898,7 @@ func histogramWithDurationBuckets(scope tally.Scope, name string) tally.Histogra return sub.Histogram(name, histogramDurationBuckets()) } -func minDuration(x, y time.Duration) { +func minDuration(x, y time.Duration) time.Duration { if x < y { return x } From 7884e396522335ce9f8a6dd3098a197d74e56920 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 5 Jun 2019 21:42:13 -0400 Subject: [PATCH 09/10] fix nit --- src/dbnode/client/session.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 619a627f63..c231aa7b04 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2125,14 +2125,14 @@ func (s *session) streamBlocksMetadataFromPeers( } var ( - currLevel = level.value() - majority = int(majority) - enqueued = int(enqueued) - success = int(atomic.LoadInt32(&success)) - metReadConsistency = topology.ReadConsistencyAchieved( - currLevel, majority, enqueued, success) - doRetry = !metReadConsistency && errs.getAbortError() == nil + currLevel = level.value() + majority = int(majority) + enqueued = int(enqueued) + success = int(atomic.LoadInt32(&success)) ) + metReadConsistency := topology.ReadConsistencyAchieved( + currLevel, majority, enqueued, success) + doRetry := !metReadConsistency && errs.getAbortError() == nil if doRetry { // Track that we are reattempting the fetch metadata From b57b8d3b24f13b38b20083e493c6ee045e829229 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 5 Jun 2019 21:53:53 -0400 Subject: [PATCH 10/10] set max sleep to 100ms --- src/dbnode/client/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c231aa7b04..cf13aea403 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -72,7 +72,7 @@ const ( blockMetadataChBufSize = 4096 shardResultCapacity = 4096 hostNotAvailableMinSleepInterval = 1 * time.Millisecond - hostNotAvailableMaxSleepInterval = 10 * time.Millisecond + hostNotAvailableMaxSleepInterval = 100 * time.Millisecond ) type resultTypeEnum string