Skip to content

Commit

Permalink
Add session connect consistency level
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Skillington committed Jul 24, 2016
1 parent dc94a8d commit c07cebc
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 22 deletions.
15 changes: 15 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// defaultClusterConnectTimeout is the default cluster connect timeout
defaultClusterConnectTimeout = 30 * time.Second

// defaultClusterConnectConsistencyLevel is the default cluster connect consistency level
defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelQuorum

// defaultWriteRequestTimeout is the default write request timeout
defaultWriteRequestTimeout = 5 * time.Second

Expand Down Expand Up @@ -111,6 +114,7 @@ type options struct {
minConnectionCount int
hostConnectTimeout time.Duration
clusterConnectTimeout time.Duration
clusterConnectConsistencyLevel m3db.ConsistencyLevel
writeRequestTimeout time.Duration
fetchRequestTimeout time.Duration
backgroundConnectInterval time.Duration
Expand Down Expand Up @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions {
minConnectionCount: defaultMinConnectionCount,
hostConnectTimeout: defaultHostConnectTimeout,
clusterConnectTimeout: defaultClusterConnectTimeout,
clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel,
writeRequestTimeout: defaultWriteRequestTimeout,
fetchRequestTimeout: defaultFetchRequestTimeout,
backgroundConnectInterval: defaultBackgroundConnectInterval,
Expand Down Expand Up @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration {
return o.clusterConnectTimeout
}

func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
opts := *o
opts.clusterConnectConsistencyLevel = value
return &opts
}

func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
return o.clusterConnectConsistencyLevel
}

func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
opts := *o
opts.writeRequestTimeout = value
Expand Down
28 changes: 17 additions & 11 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {

shards := topologyMap.ShardScheme().All().Shards()
minConnectionCount := s.opts.GetMinConnectionCount()
replicas := topologyMap.Replicas()
quorum := topologyMap.QuorumReplicas()
connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel()
for {
if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() {
for i := range queues {
Expand All @@ -168,24 +171,27 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
return ErrClusterConnectTimeout
}
// Be optimistic
clusterHasMinConnectionsToAllShards := true
var clusterAvailable bool
for _, shard := range shards {
// Be optimistic
shardHasMinConnectionsToOneHost := true
shardReplicasAvailable := 0
routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) {
if queues[idx].GetConnectionCount() < minConnectionCount {
shardHasMinConnectionsToOneHost = false
if queues[idx].GetConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return routeErr
}
if !shardHasMinConnectionsToOneHost {
clusterHasMinConnectionsToAllShards = false
break
switch connectConsistencyLevel {
case m3db.ConsistencyLevelAll:
clusterAvailable = shardReplicasAvailable == replicas
case m3db.ConsistencyLevelQuorum:
clusterAvailable = shardReplicasAvailable >= quorum
case m3db.ConsistencyLevelOne:
clusterAvailable = shardReplicasAvailable > 0
}
}
if clusterHasMinConnectionsToAllShards {
if clusterAvailable {
// All done
break
}
Expand All @@ -197,8 +203,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
s.queues = queues
s.topoMap = topologyMap
prevReplicas := s.replicas
s.replicas = topologyMap.Replicas()
s.quorum = topologyMap.QuorumReplicas()
s.replicas = replicas
s.quorum = quorum
if s.fetchBatchOpArrayArrayPool == nil ||
s.fetchBatchOpArrayArrayPool.Entries() != len(queues) {
s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool(
Expand Down
61 changes: 56 additions & 5 deletions client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -73,16 +74,56 @@ func newSessionTestOptions() m3db.ClientOptions {
HostShardSets(hostShardSets)))
}

func TestSessionClusterConnectTimesOut(t *testing.T) {
func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelAll
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelQuorum(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelQuorum
for i := 0; i <= 1; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testSessionClusterConnectConsistencyLevel(
t *testing.T,
ctrl *gomock.Controller,
level m3db.ConsistencyLevel,
failures int,
expected outcome,
) {
opts := newSessionTestOptions()
opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval)
opts = opts.ClusterConnectConsistencyLevel(level)
s, err := newSession(opts)
assert.NoError(t, err)
session := s.(*session)

var failingConns int32
session.newHostQueueFn = func(
host m3db.Host,
writeBatchRequestPool writeBatchRequestPool,
Expand All @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) {
) hostQueue {
hostQueue := mocks.NewMockhostQueue(ctrl)
hostQueue.EXPECT().Open().Times(1)
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
hostQueue.EXPECT().Close().Times(1)
if atomic.AddInt32(&failingConns, 1) <= int32(failures) {
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
} else {
min := opts.GetMinConnectionCount()
hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes()
}
hostQueue.EXPECT().Close().AnyTimes()
return hostQueue
}

err = session.Open()
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
switch expected {
case outcomeSuccess:
assert.NoError(t, err)
case outcomeFail:
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
}
}

func mockHostQueues(
Expand Down
15 changes: 9 additions & 6 deletions client/session_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,35 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess)
level := m3db.ConsistencyLevelAll
testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelQuorum(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelQuorum
for i := 0; i <= 1; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testWriteConsistencyLevel(
Expand Down
6 changes: 6 additions & 0 deletions interfaces/m3db/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ type ClientOptions interface {
// GetClusterConnectTimeout returns the clusterConnectTimeout
GetClusterConnectTimeout() time.Duration

// ClusterConnectConsistencyLevel sets the clusterConnectConsistencyLevel and returns a new ClientOptions
ClusterConnectConsistencyLevel(value ConsistencyLevel) ClientOptions

// GetClusterConnectConsistencyLevel returns the clusterConnectConsistencyLevel
GetClusterConnectConsistencyLevel() ConsistencyLevel

// WriteRequestTimeout sets the writeRequestTimeout and returns a new ClientOptions
WriteRequestTimeout(value time.Duration) ClientOptions

Expand Down
20 changes: 20 additions & 0 deletions mocks/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,26 @@ func (_mr *_MockClientOptionsRecorder) GetClusterConnectTimeout() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectTimeout")
}

func (_m *MockClientOptions) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
ret := _m.ctrl.Call(_m, "ClusterConnectConsistencyLevel", value)
ret0, _ := ret[0].(m3db.ClientOptions)
return ret0
}

func (_mr *_MockClientOptionsRecorder) ClusterConnectConsistencyLevel(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "ClusterConnectConsistencyLevel", arg0)
}

func (_m *MockClientOptions) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
ret := _m.ctrl.Call(_m, "GetClusterConnectConsistencyLevel")
ret0, _ := ret[0].(m3db.ConsistencyLevel)
return ret0
}

func (_mr *_MockClientOptionsRecorder) GetClusterConnectConsistencyLevel() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectConsistencyLevel")
}

func (_m *MockClientOptions) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
ret := _m.ctrl.Call(_m, "WriteRequestTimeout", value)
ret0, _ := ret[0].(m3db.ClientOptions)
Expand Down

0 comments on commit c07cebc

Please sign in to comment.