Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session connect consistency level #29

Merged
merged 2 commits into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// defaultClusterConnectTimeout is the default cluster connect timeout
defaultClusterConnectTimeout = 30 * time.Second

// defaultClusterConnectConsistencyLevel is the default cluster connect consistency level
defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelQuorum

// defaultWriteRequestTimeout is the default write request timeout
defaultWriteRequestTimeout = 5 * time.Second

Expand Down Expand Up @@ -111,6 +114,7 @@ type options struct {
minConnectionCount int
hostConnectTimeout time.Duration
clusterConnectTimeout time.Duration
clusterConnectConsistencyLevel m3db.ConsistencyLevel
writeRequestTimeout time.Duration
fetchRequestTimeout time.Duration
backgroundConnectInterval time.Duration
Expand Down Expand Up @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions {
minConnectionCount: defaultMinConnectionCount,
hostConnectTimeout: defaultHostConnectTimeout,
clusterConnectTimeout: defaultClusterConnectTimeout,
clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel,
writeRequestTimeout: defaultWriteRequestTimeout,
fetchRequestTimeout: defaultFetchRequestTimeout,
backgroundConnectInterval: defaultBackgroundConnectInterval,
Expand Down Expand Up @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration {
return o.clusterConnectTimeout
}

func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
opts := *o
opts.clusterConnectConsistencyLevel = value
return &opts
}

func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
return o.clusterConnectConsistencyLevel
}

func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
opts := *o
opts.writeRequestTimeout = value
Expand Down
28 changes: 17 additions & 11 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {

shards := topologyMap.ShardScheme().All().Shards()
minConnectionCount := s.opts.GetMinConnectionCount()
replicas := topologyMap.Replicas()
quorum := topologyMap.QuorumReplicas()
connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel()
for {
if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() {
for i := range queues {
Expand All @@ -168,24 +171,27 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
return ErrClusterConnectTimeout
}
// Be optimistic
clusterHasMinConnectionsToAllShards := true
var clusterAvailable bool
for _, shard := range shards {
// Be optimistic
shardHasMinConnectionsToOneHost := true
shardReplicasAvailable := 0
routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) {
if queues[idx].GetConnectionCount() < minConnectionCount {
shardHasMinConnectionsToOneHost = false
if queues[idx].GetConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return routeErr
}
if !shardHasMinConnectionsToOneHost {
clusterHasMinConnectionsToAllShards = false
break
switch connectConsistencyLevel {
case m3db.ConsistencyLevelAll:
clusterAvailable = shardReplicasAvailable == replicas

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems clusterAvailable gets overridden in each iteration? should we break when clusterAvailable is false and only continue otherwise?

case m3db.ConsistencyLevelQuorum:
clusterAvailable = shardReplicasAvailable >= quorum
case m3db.ConsistencyLevelOne:
clusterAvailable = shardReplicasAvailable > 0
}
}
if clusterHasMinConnectionsToAllShards {
if clusterAvailable {
// All done
break
}
Expand All @@ -197,8 +203,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
s.queues = queues
s.topoMap = topologyMap
prevReplicas := s.replicas
s.replicas = topologyMap.Replicas()
s.quorum = topologyMap.QuorumReplicas()
s.replicas = replicas
s.quorum = quorum
if s.fetchBatchOpArrayArrayPool == nil ||
s.fetchBatchOpArrayArrayPool.Entries() != len(queues) {
s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool(
Expand Down
61 changes: 56 additions & 5 deletions client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -73,16 +74,56 @@ func newSessionTestOptions() m3db.ClientOptions {
HostShardSets(hostShardSets)))
}

func TestSessionClusterConnectTimesOut(t *testing.T) {
func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelAll
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelQuorum(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelQuorum
for i := 0; i <= 1; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testSessionClusterConnectConsistencyLevel(
t *testing.T,
ctrl *gomock.Controller,
level m3db.ConsistencyLevel,
failures int,
expected outcome,
) {
opts := newSessionTestOptions()
opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval)
opts = opts.ClusterConnectConsistencyLevel(level)
s, err := newSession(opts)
assert.NoError(t, err)
session := s.(*session)

var failingConns int32
session.newHostQueueFn = func(
host m3db.Host,
writeBatchRequestPool writeBatchRequestPool,
Expand All @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) {
) hostQueue {
hostQueue := mocks.NewMockhostQueue(ctrl)
hostQueue.EXPECT().Open().Times(1)
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
hostQueue.EXPECT().Close().Times(1)
if atomic.AddInt32(&failingConns, 1) <= int32(failures) {
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
} else {
min := opts.GetMinConnectionCount()
hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes()
}
hostQueue.EXPECT().Close().AnyTimes()
return hostQueue
}

err = session.Open()
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
switch expected {
case outcomeSuccess:
assert.NoError(t, err)
case outcomeFail:
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
}
}

func mockHostQueues(
Expand Down
15 changes: 9 additions & 6 deletions client/session_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,35 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess)
level := m3db.ConsistencyLevelAll
testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelQuorum(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelQuorum
for i := 0; i <= 1; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testWriteConsistencyLevel(
Expand Down
6 changes: 6 additions & 0 deletions interfaces/m3db/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ type ClientOptions interface {
// GetClusterConnectTimeout returns the clusterConnectTimeout
GetClusterConnectTimeout() time.Duration

// ClusterConnectConsistencyLevel sets the clusterConnectConsistencyLevel and returns a new ClientOptions
ClusterConnectConsistencyLevel(value ConsistencyLevel) ClientOptions

// GetClusterConnectConsistencyLevel returns the clusterConnectConsistencyLevel
GetClusterConnectConsistencyLevel() ConsistencyLevel

// WriteRequestTimeout sets the writeRequestTimeout and returns a new ClientOptions
WriteRequestTimeout(value time.Duration) ClientOptions

Expand Down
20 changes: 20 additions & 0 deletions mocks/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,26 @@ func (_mr *_MockClientOptionsRecorder) GetClusterConnectTimeout() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectTimeout")
}

func (_m *MockClientOptions) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
ret := _m.ctrl.Call(_m, "ClusterConnectConsistencyLevel", value)
ret0, _ := ret[0].(m3db.ClientOptions)
return ret0
}

func (_mr *_MockClientOptionsRecorder) ClusterConnectConsistencyLevel(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "ClusterConnectConsistencyLevel", arg0)
}

func (_m *MockClientOptions) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
ret := _m.ctrl.Call(_m, "GetClusterConnectConsistencyLevel")
ret0, _ := ret[0].(m3db.ConsistencyLevel)
return ret0
}

func (_mr *_MockClientOptionsRecorder) GetClusterConnectConsistencyLevel() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetClusterConnectConsistencyLevel")
}

func (_m *MockClientOptions) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
ret := _m.ctrl.Call(_m, "WriteRequestTimeout", value)
ret0, _ := ret[0].(m3db.ClientOptions)
Expand Down