diff --git a/client/options.go b/client/options.go index 26b48ba45f..d0888be4b3 100644 --- a/client/options.go +++ b/client/options.go @@ -49,6 +49,9 @@ const ( // defaultClusterConnectTimeout is the default cluster connect timeout defaultClusterConnectTimeout = 30 * time.Second + // defaultClusterConnectConsistencyLevel is the default cluster connect consistency level + defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelQuorum + // defaultWriteRequestTimeout is the default write request timeout defaultWriteRequestTimeout = 5 * time.Second @@ -111,6 +114,7 @@ type options struct { minConnectionCount int hostConnectTimeout time.Duration clusterConnectTimeout time.Duration + clusterConnectConsistencyLevel m3db.ConsistencyLevel writeRequestTimeout time.Duration fetchRequestTimeout time.Duration backgroundConnectInterval time.Duration @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions { minConnectionCount: defaultMinConnectionCount, hostConnectTimeout: defaultHostConnectTimeout, clusterConnectTimeout: defaultClusterConnectTimeout, + clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel, writeRequestTimeout: defaultWriteRequestTimeout, fetchRequestTimeout: defaultFetchRequestTimeout, backgroundConnectInterval: defaultBackgroundConnectInterval, @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration { return o.clusterConnectTimeout } +func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions { + opts := *o + opts.clusterConnectConsistencyLevel = value + return &opts +} + +func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel { + return o.clusterConnectConsistencyLevel +} + func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions { opts := *o opts.writeRequestTimeout = value diff --git a/client/session.go b/client/session.go index a48f313404..4b8e99ac9e 100644 --- a/client/session.go +++ b/client/session.go @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { shards := topologyMap.ShardScheme().All().Shards() minConnectionCount := s.opts.GetMinConnectionCount() + replicas := topologyMap.Replicas() + quorum := topologyMap.QuorumReplicas() + connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel() for { if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() { for i := range queues { @@ -168,24 +171,27 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { return ErrClusterConnectTimeout } // Be optimistic - clusterHasMinConnectionsToAllShards := true + var clusterAvailable bool for _, shard := range shards { - // Be optimistic - shardHasMinConnectionsToOneHost := true + shardReplicasAvailable := 0 routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) { - if queues[idx].GetConnectionCount() < minConnectionCount { - shardHasMinConnectionsToOneHost = false + if queues[idx].GetConnectionCount() >= minConnectionCount { + shardReplicasAvailable++ } }) if routeErr != nil { return routeErr } - if !shardHasMinConnectionsToOneHost { - clusterHasMinConnectionsToAllShards = false - break + switch connectConsistencyLevel { + case m3db.ConsistencyLevelAll: + clusterAvailable = shardReplicasAvailable == replicas + case m3db.ConsistencyLevelQuorum: + clusterAvailable = shardReplicasAvailable >= quorum + case m3db.ConsistencyLevelOne: + clusterAvailable = shardReplicasAvailable > 0 } } - if clusterHasMinConnectionsToAllShards { + if clusterAvailable { // All done break } @@ -197,8 +203,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { s.queues = queues s.topoMap = topologyMap prevReplicas := s.replicas - s.replicas = topologyMap.Replicas() - s.quorum = topologyMap.QuorumReplicas() + s.replicas = replicas + s.quorum = quorum if s.fetchBatchOpArrayArrayPool == nil || s.fetchBatchOpArrayArrayPool.Entries() != len(queues) { s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool( diff --git a/client/session_test.go b/client/session_test.go index 89ed6fdee9..e17c95fc26 100644 --- a/client/session_test.go +++ b/client/session_test.go @@ -23,6 +23,7 @@ package client import ( "fmt" "sync" + "sync/atomic" "testing" "github.com/golang/mock/gomock" @@ -73,16 +74,56 @@ func newSessionTestOptions() m3db.ClientOptions { HostShardSets(hostShardSets))) } -func TestSessionClusterConnectTimesOut(t *testing.T) { +func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelAll + testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess) + for i := 1; i <= 3; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail) + } +} + +func TestSessionClusterConnectConsistencyLevelQuorum(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + level := m3db.ConsistencyLevelQuorum + for i := 0; i <= 1; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess) + } + for i := 2; i <= 3; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail) + } +} + +func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + level := m3db.ConsistencyLevelOne + for i := 0; i <= 2; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess) + } + testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail) +} + +func testSessionClusterConnectConsistencyLevel( + t *testing.T, + ctrl *gomock.Controller, + level m3db.ConsistencyLevel, + failures int, + expected outcome, +) { opts := newSessionTestOptions() opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval) + opts = opts.ClusterConnectConsistencyLevel(level) s, err := newSession(opts) assert.NoError(t, err) session := s.(*session) + var failingConns int32 session.newHostQueueFn = func( host m3db.Host, writeBatchRequestPool writeBatchRequestPool, @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) { ) hostQueue { hostQueue := mocks.NewMockhostQueue(ctrl) hostQueue.EXPECT().Open().Times(1) - hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes() - hostQueue.EXPECT().Close().Times(1) + if atomic.AddInt32(&failingConns, 1) <= int32(failures) { + hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes() + } else { + min := opts.GetMinConnectionCount() + hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes() + } + hostQueue.EXPECT().Close().AnyTimes() return hostQueue } err = session.Open() - assert.Error(t, err) - assert.Equal(t, ErrClusterConnectTimeout, err) + switch expected { + case outcomeSuccess: + assert.NoError(t, err) + case outcomeFail: + assert.Error(t, err) + assert.Equal(t, ErrClusterConnectTimeout, err) + } } func mockHostQueues( diff --git a/client/session_write_test.go b/client/session_write_test.go index 9363c1ed6a..1fe93fabac 100644 --- a/client/session_write_test.go +++ b/client/session_write_test.go @@ -136,9 +136,10 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess) + level := m3db.ConsistencyLevelAll + testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess) for i := 1; i <= 3; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail) } } @@ -146,11 +147,12 @@ func TestSessionWriteConsistencyLevelQuorum(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelQuorum for i := 0; i <= 1; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess) } for i := 2; i <= 3; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail) } } @@ -158,10 +160,11 @@ func TestSessionWriteConsistencyLevelOne(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelOne for i := 0; i <= 2; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess) } - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail) } func testWriteConsistencyLevel( diff --git a/interfaces/m3db/options.go b/interfaces/m3db/options.go index 2e3f34d55b..345441ec6d 100644 --- a/interfaces/m3db/options.go +++ b/interfaces/m3db/options.go @@ -254,6 +254,12 @@ type ClientOptions interface { // GetClusterConnectTimeout returns the clusterConnectTimeout GetClusterConnectTimeout() time.Duration + // ClusterConnectConsistencyLevel sets the clusterConnectConsistencyLevel and returns a new ClientOptions + ClusterConnectConsistencyLevel(value ConsistencyLevel) ClientOptions + + // GetClusterConnectConsistencyLevel returns the clusterConnectConsistencyLevel + GetClusterConnectConsistencyLevel() ConsistencyLevel + // WriteRequestTimeout sets the writeRequestTimeout and returns a new ClientOptions WriteRequestTimeout(value time.Duration) ClientOptions diff --git a/mocks/options.go b/mocks/options.go index e48c1900ec..7d468c2d53 100644 --- a/mocks/options.go +++ b/mocks/options.go @@ -796,6 +796,26 @@ func (_mr *_MockClientOptionsRecorder) GetClusterConnectTimeout() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectTimeout") } +func (_m *MockClientOptions) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions { + ret := _m.ctrl.Call(_m, "ClusterConnectConsistencyLevel", value) + ret0, _ := ret[0].(m3db.ClientOptions) + return ret0 +} + +func (_mr *_MockClientOptionsRecorder) ClusterConnectConsistencyLevel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ClusterConnectConsistencyLevel", arg0) +} + +func (_m *MockClientOptions) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel { + ret := _m.ctrl.Call(_m, "GetClusterConnectConsistencyLevel") + ret0, _ := ret[0].(m3db.ConsistencyLevel) + return ret0 +} + +func (_mr *_MockClientOptionsRecorder) GetClusterConnectConsistencyLevel() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectConsistencyLevel") +} + func (_m *MockClientOptions) WriteRequestTimeout(value time.Duration) m3db.ClientOptions { ret := _m.ctrl.Call(_m, "WriteRequestTimeout", value) ret0, _ := ret[0].(m3db.ClientOptions)