diff --git a/client/options.go b/client/options.go index 26b48ba45f..311512405b 100644 --- a/client/options.go +++ b/client/options.go @@ -35,7 +35,7 @@ import ( const ( // defaultConsistencyLevel is the default consistency level - defaultConsistencyLevel = m3db.ConsistencyLevelQuorum + defaultConsistencyLevel = m3db.ConsistencyLevelMajority // defaultMaxConnectionCount is the default max connection count defaultMaxConnectionCount = 32 @@ -49,6 +49,9 @@ const ( // defaultClusterConnectTimeout is the default cluster connect timeout defaultClusterConnectTimeout = 30 * time.Second + // defaultClusterConnectConsistencyLevel is the default cluster connect consistency level + defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelMajority + // defaultWriteRequestTimeout is the default write request timeout defaultWriteRequestTimeout = 5 * time.Second @@ -111,6 +114,7 @@ type options struct { minConnectionCount int hostConnectTimeout time.Duration clusterConnectTimeout time.Duration + clusterConnectConsistencyLevel m3db.ConsistencyLevel writeRequestTimeout time.Duration fetchRequestTimeout time.Duration backgroundConnectInterval time.Duration @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions { minConnectionCount: defaultMinConnectionCount, hostConnectTimeout: defaultHostConnectTimeout, clusterConnectTimeout: defaultClusterConnectTimeout, + clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel, writeRequestTimeout: defaultWriteRequestTimeout, fetchRequestTimeout: defaultFetchRequestTimeout, backgroundConnectInterval: defaultBackgroundConnectInterval, @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration { return o.clusterConnectTimeout } +func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions { + opts := *o + opts.clusterConnectConsistencyLevel = value + return &opts +} + +func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel { + return o.clusterConnectConsistencyLevel +} + func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions { opts := *o opts.writeRequestTimeout = value diff --git a/client/session.go b/client/session.go index a48f313404..e81f3bf319 100644 --- a/client/session.go +++ b/client/session.go @@ -68,7 +68,7 @@ type session struct { topoMap m3db.TopologyMap topoMapCh chan m3db.TopologyMap replicas int - quorum int + majority int queues []hostQueue state state writeOpPool writeOpPool @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { shards := topologyMap.ShardScheme().All().Shards() minConnectionCount := s.opts.GetMinConnectionCount() + replicas := topologyMap.Replicas() + majority := topologyMap.MajorityReplicas() + connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel() for { if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() { for i := range queues { @@ -168,24 +171,32 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { return ErrClusterConnectTimeout } // Be optimistic - clusterHasMinConnectionsToAllShards := true + clusterAvailable := true for _, shard := range shards { - // Be optimistic - shardHasMinConnectionsToOneHost := true + shardReplicasAvailable := 0 routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) { - if queues[idx].GetConnectionCount() < minConnectionCount { - shardHasMinConnectionsToOneHost = false + if queues[idx].GetConnectionCount() >= minConnectionCount { + shardReplicasAvailable++ } }) if routeErr != nil { return routeErr } - if !shardHasMinConnectionsToOneHost { - clusterHasMinConnectionsToAllShards = false + var clusterAvailableForShard bool + switch connectConsistencyLevel { + case m3db.ConsistencyLevelAll: + clusterAvailableForShard = shardReplicasAvailable == replicas + case m3db.ConsistencyLevelMajority: + clusterAvailableForShard = shardReplicasAvailable >= majority + case m3db.ConsistencyLevelOne: + clusterAvailableForShard = shardReplicasAvailable > 0 + } + if !clusterAvailableForShard { + clusterAvailable = false break } } - if clusterHasMinConnectionsToAllShards { + if clusterAvailable { // All done break } @@ -197,8 +208,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error { s.queues = queues s.topoMap = topologyMap prevReplicas := s.replicas - s.replicas = topologyMap.Replicas() - s.quorum = topologyMap.QuorumReplicas() + s.replicas = replicas + s.majority = majority if s.fetchBatchOpArrayArrayPool == nil || s.fetchBatchOpArrayArrayPool.Entries() != len(queues) { s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool( @@ -271,7 +282,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit, resultErrLock sync.Mutex resultErr error resultErrs int - quorum int + majority int ) timeType, timeTypeErr := convert.UnitToTimeType(unit) @@ -303,7 +314,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit, } s.RLock() - quorum = s.quorum + majority = s.majority routeErr := s.topoMap.RouteForEach(id, func(idx int, host m3db.Host) { wg.Add(1) enqueued++ @@ -330,7 +341,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit, // Return write to pool s.writeOpPool.Put(w) - return s.consistencyResult(quorum, enqueued, resultErrs, resultErr) + return s.consistencyResult(majority, enqueued, resultErrs, resultErr) } func (s *session) Fetch(id string, startInclusive, endExclusive time.Time) (m3db.SeriesIterator, error) { @@ -355,7 +366,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time) resultErrLock sync.Mutex resultErr error resultErrs int - quorum int + majority int fetchBatchOpsByHostIdx [][]*fetchBatchOp ) @@ -388,7 +399,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time) }() s.RLock() - quorum = s.quorum + majority = s.majority for idx := range ids { idx := idx @@ -425,7 +436,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time) // Requests still pending return } - if err := s.consistencyResult(quorum, enqueued, int(errs), firstErr); err != nil { + if err := s.consistencyResult(majority, enqueued, int(errs), firstErr); err != nil { resultErrLock.Lock() if resultErr == nil { resultErr = err @@ -509,7 +520,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time) return iters, nil } -func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr error) error { +func (s *session) consistencyResult(majority, enqueued, resultErrs int, resultErr error) error { if resultErrs == 0 { return nil } @@ -525,9 +536,9 @@ func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr switch s.level { case m3db.ConsistencyLevelAll: return reportErr() - case m3db.ConsistencyLevelQuorum: - if success >= quorum { - // Meets quorum + case m3db.ConsistencyLevelMajority: + if success >= majority { + // Meets majority break } return reportErr() diff --git a/client/session_fetch_test.go b/client/session_fetch_test.go index f044c37d15..a8fd4069ae 100644 --- a/client/session_fetch_test.go +++ b/client/session_fetch_test.go @@ -165,15 +165,15 @@ func TestSessionFetchConsistencyLevelAll(t *testing.T) { } } -func TestSessionFetchConsistencyLevelQuorum(t *testing.T) { +func TestSessionFetchConsistencyLevelMajority(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() for i := 0; i <= 1; i++ { - testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess) + testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeSuccess) } for i := 2; i <= 3; i++ { - testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail) + testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeFail) } } diff --git a/client/session_test.go b/client/session_test.go index 89ed6fdee9..49ce926b96 100644 --- a/client/session_test.go +++ b/client/session_test.go @@ -23,6 +23,7 @@ package client import ( "fmt" "sync" + "sync/atomic" "testing" "github.com/golang/mock/gomock" @@ -67,22 +68,62 @@ func newSessionTestOptions() m3db.ClientOptions { WriteOpPoolSize(0). FetchBatchOpPoolSize(0). TopologyType(topology.NewStaticTopologyType( - topology.NewStaticTopologyTypeOptions(). - Replicas(sessionTestReplicas). - ShardScheme(shardScheme). - HostShardSets(hostShardSets))) + topology.NewStaticTopologyTypeOptions(). + Replicas(sessionTestReplicas). + ShardScheme(shardScheme). + HostShardSets(hostShardSets))) } -func TestSessionClusterConnectTimesOut(t *testing.T) { +func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelAll + testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess) + for i := 1; i <= 3; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail) + } +} + +func TestSessionClusterConnectConsistencyLevelMajority(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + level := m3db.ConsistencyLevelMajority + for i := 0; i <= 1; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess) + } + for i := 2; i <= 3; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail) + } +} + +func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + level := m3db.ConsistencyLevelOne + for i := 0; i <= 2; i++ { + testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess) + } + testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail) +} + +func testSessionClusterConnectConsistencyLevel( + t *testing.T, + ctrl *gomock.Controller, + level m3db.ConsistencyLevel, + failures int, + expected outcome, +) { opts := newSessionTestOptions() opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval) + opts = opts.ClusterConnectConsistencyLevel(level) s, err := newSession(opts) assert.NoError(t, err) session := s.(*session) + var failingConns int32 session.newHostQueueFn = func( host m3db.Host, writeBatchRequestPool writeBatchRequestPool, @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) { ) hostQueue { hostQueue := mocks.NewMockhostQueue(ctrl) hostQueue.EXPECT().Open().Times(1) - hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes() - hostQueue.EXPECT().Close().Times(1) + if atomic.AddInt32(&failingConns, 1) <= int32(failures) { + hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes() + } else { + min := opts.GetMinConnectionCount() + hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes() + } + hostQueue.EXPECT().Close().AnyTimes() return hostQueue } err = session.Open() - assert.Error(t, err) - assert.Equal(t, ErrClusterConnectTimeout, err) + switch expected { + case outcomeSuccess: + assert.NoError(t, err) + case outcomeFail: + assert.Error(t, err) + assert.Equal(t, ErrClusterConnectTimeout, err) + } } func mockHostQueues( diff --git a/client/session_write_test.go b/client/session_write_test.go index 9363c1ed6a..60a7b37204 100644 --- a/client/session_write_test.go +++ b/client/session_write_test.go @@ -136,21 +136,23 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess) + level := m3db.ConsistencyLevelAll + testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess) for i := 1; i <= 3; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail) } } -func TestSessionWriteConsistencyLevelQuorum(t *testing.T) { +func TestSessionWriteConsistencyLevelMajority(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelMajority for i := 0; i <= 1; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess) } for i := 2; i <= 3; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail) } } @@ -158,10 +160,11 @@ func TestSessionWriteConsistencyLevelOne(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + level := m3db.ConsistencyLevelOne for i := 0; i <= 2; i++ { - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess) + testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess) } - testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail) + testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail) } func testWriteConsistencyLevel( diff --git a/interfaces/m3db/consistency.go b/interfaces/m3db/consistency.go index 7358bdbc06..1092a2d366 100644 --- a/interfaces/m3db/consistency.go +++ b/interfaces/m3db/consistency.go @@ -27,12 +27,12 @@ const ( consistencyLevelNone ConsistencyLevel = iota // ConsistencyLevelOne corresponds to a single node participating - // for an operation + // for an operation to succeed ConsistencyLevelOne - // ConsistencyLevelQuorum corresponds to quorum of nodes participating + // ConsistencyLevelMajority corresponds to the majority of nodes participating // for an operation to succeed - ConsistencyLevelQuorum + ConsistencyLevelMajority // ConsistencyLevelAll corresponds to all nodes participating // for an operation to succeed @@ -44,8 +44,8 @@ func (l ConsistencyLevel) String() string { switch l { case ConsistencyLevelOne: return "ConsistencyLevelOne" - case ConsistencyLevelQuorum: - return "ConsistencyLevelQuorum" + case ConsistencyLevelMajority: + return "ConsistencyLevelMajority" case ConsistencyLevelAll: return "ConsistencyLevelAll" } diff --git a/interfaces/m3db/options.go b/interfaces/m3db/options.go index 2e3f34d55b..345441ec6d 100644 --- a/interfaces/m3db/options.go +++ b/interfaces/m3db/options.go @@ -254,6 +254,12 @@ type ClientOptions interface { // GetClusterConnectTimeout returns the clusterConnectTimeout GetClusterConnectTimeout() time.Duration + // ClusterConnectConsistencyLevel sets the clusterConnectConsistencyLevel and returns a new ClientOptions + ClusterConnectConsistencyLevel(value ConsistencyLevel) ClientOptions + + // GetClusterConnectConsistencyLevel returns the clusterConnectConsistencyLevel + GetClusterConnectConsistencyLevel() ConsistencyLevel + // WriteRequestTimeout sets the writeRequestTimeout and returns a new ClientOptions WriteRequestTimeout(value time.Duration) ClientOptions diff --git a/interfaces/m3db/topology.go b/interfaces/m3db/topology.go index a2e70e0428..549229bf9b 100644 --- a/interfaces/m3db/topology.go +++ b/interfaces/m3db/topology.go @@ -84,8 +84,8 @@ type TopologyMap interface { // Replicas returns the number of replicas in the topology Replicas() int - // QuorumReplicas returns the number of replicas to establish quorum in the topology - QuorumReplicas() int + // MajorityReplicas returns the number of replicas to establish majority in the topology + MajorityReplicas() int } // RouteForEachFn is a function to execute for each routed to host diff --git a/mocks/options.go b/mocks/options.go index e48c1900ec..7d468c2d53 100644 --- a/mocks/options.go +++ b/mocks/options.go @@ -796,6 +796,26 @@ func (_mr *_MockClientOptionsRecorder) GetClusterConnectTimeout() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectTimeout") } +func (_m *MockClientOptions) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions { + ret := _m.ctrl.Call(_m, "ClusterConnectConsistencyLevel", value) + ret0, _ := ret[0].(m3db.ClientOptions) + return ret0 +} + +func (_mr *_MockClientOptionsRecorder) ClusterConnectConsistencyLevel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ClusterConnectConsistencyLevel", arg0) +} + +func (_m *MockClientOptions) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel { + ret := _m.ctrl.Call(_m, "GetClusterConnectConsistencyLevel") + ret0, _ := ret[0].(m3db.ConsistencyLevel) + return ret0 +} + +func (_mr *_MockClientOptionsRecorder) GetClusterConnectConsistencyLevel() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectConsistencyLevel") +} + func (_m *MockClientOptions) WriteRequestTimeout(value time.Duration) m3db.ClientOptions { ret := _m.ctrl.Call(_m, "WriteRequestTimeout", value) ret0, _ := ret[0].(m3db.ClientOptions) diff --git a/topology/host.go b/topology/host.go index b39aacd71f..583812e622 100644 --- a/topology/host.go +++ b/topology/host.go @@ -26,7 +26,7 @@ import ( "github.com/m3db/m3db/interfaces/m3db" ) -func quorum(replicas int) int { +func majority(replicas int) int { return int(math.Ceil(0.5 * float64(replicas+1))) } diff --git a/topology/static.go b/topology/static.go index 09f3cfd832..108d8ff51f 100644 --- a/topology/static.go +++ b/topology/static.go @@ -77,7 +77,7 @@ type staticTopologyMap struct { hostsByShard [][]m3db.Host orderedHostsByShard [][]orderedHost replicas int - quorum int + majority int } func newStaticTopologyMap(opts m3db.TopologyTypeOptions) staticTopologyMap { @@ -89,7 +89,7 @@ func newStaticTopologyMap(opts m3db.TopologyTypeOptions) staticTopologyMap { hostsByShard: make([][]m3db.Host, totalShards), orderedHostsByShard: make([][]orderedHost, totalShards), replicas: opts.GetReplicas(), - quorum: quorum(opts.GetReplicas()), + majority: majority(opts.GetReplicas()), } for idx, hostShardSet := range hostShardSets { @@ -158,6 +158,6 @@ func (t *staticTopologyMap) Replicas() int { return t.replicas } -func (t *staticTopologyMap) QuorumReplicas() int { - return t.quorum +func (t *staticTopologyMap) MajorityReplicas() int { + return t.majority }