Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session connect consistency level #29

Merged
merged 2 commits into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

const (
// defaultConsistencyLevel is the default consistency level
defaultConsistencyLevel = m3db.ConsistencyLevelQuorum
defaultConsistencyLevel = m3db.ConsistencyLevelMajority

// defaultMaxConnectionCount is the default max connection count
defaultMaxConnectionCount = 32
Expand All @@ -49,6 +49,9 @@ const (
// defaultClusterConnectTimeout is the default cluster connect timeout
defaultClusterConnectTimeout = 30 * time.Second

// defaultClusterConnectConsistencyLevel is the default cluster connect consistency level
defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelMajority

// defaultWriteRequestTimeout is the default write request timeout
defaultWriteRequestTimeout = 5 * time.Second

Expand Down Expand Up @@ -111,6 +114,7 @@ type options struct {
minConnectionCount int
hostConnectTimeout time.Duration
clusterConnectTimeout time.Duration
clusterConnectConsistencyLevel m3db.ConsistencyLevel
writeRequestTimeout time.Duration
fetchRequestTimeout time.Duration
backgroundConnectInterval time.Duration
Expand Down Expand Up @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions {
minConnectionCount: defaultMinConnectionCount,
hostConnectTimeout: defaultHostConnectTimeout,
clusterConnectTimeout: defaultClusterConnectTimeout,
clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel,
writeRequestTimeout: defaultWriteRequestTimeout,
fetchRequestTimeout: defaultFetchRequestTimeout,
backgroundConnectInterval: defaultBackgroundConnectInterval,
Expand Down Expand Up @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration {
return o.clusterConnectTimeout
}

func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
opts := *o
opts.clusterConnectConsistencyLevel = value
return &opts
}

func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
return o.clusterConnectConsistencyLevel
}

func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
opts := *o
opts.writeRequestTimeout = value
Expand Down
53 changes: 32 additions & 21 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type session struct {
topoMap m3db.TopologyMap
topoMapCh chan m3db.TopologyMap
replicas int
quorum int
majority int
queues []hostQueue
state state
writeOpPool writeOpPool
Expand Down Expand Up @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {

shards := topologyMap.ShardScheme().All().Shards()
minConnectionCount := s.opts.GetMinConnectionCount()
replicas := topologyMap.Replicas()
majority := topologyMap.MajorityReplicas()
connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel()
for {
if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() {
for i := range queues {
Expand All @@ -168,24 +171,32 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
return ErrClusterConnectTimeout
}
// Be optimistic
clusterHasMinConnectionsToAllShards := true
clusterAvailable := true
for _, shard := range shards {
// Be optimistic
shardHasMinConnectionsToOneHost := true
shardReplicasAvailable := 0
routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) {
if queues[idx].GetConnectionCount() < minConnectionCount {
shardHasMinConnectionsToOneHost = false
if queues[idx].GetConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return routeErr
}
if !shardHasMinConnectionsToOneHost {
clusterHasMinConnectionsToAllShards = false
var clusterAvailableForShard bool
switch connectConsistencyLevel {
case m3db.ConsistencyLevelAll:
clusterAvailableForShard = shardReplicasAvailable == replicas
case m3db.ConsistencyLevelMajority:
clusterAvailableForShard = shardReplicasAvailable >= majority
case m3db.ConsistencyLevelOne:
clusterAvailableForShard = shardReplicasAvailable > 0
}
if !clusterAvailableForShard {
clusterAvailable = false
break
}
}
if clusterHasMinConnectionsToAllShards {
if clusterAvailable {
// All done
break
}
Expand All @@ -197,8 +208,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
s.queues = queues
s.topoMap = topologyMap
prevReplicas := s.replicas
s.replicas = topologyMap.Replicas()
s.quorum = topologyMap.QuorumReplicas()
s.replicas = replicas
s.majority = majority
if s.fetchBatchOpArrayArrayPool == nil ||
s.fetchBatchOpArrayArrayPool.Entries() != len(queues) {
s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool(
Expand Down Expand Up @@ -271,7 +282,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
resultErrLock sync.Mutex
resultErr error
resultErrs int
quorum int
majority int
)

timeType, timeTypeErr := convert.UnitToTimeType(unit)
Expand Down Expand Up @@ -303,7 +314,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
}

s.RLock()
quorum = s.quorum
majority = s.majority
routeErr := s.topoMap.RouteForEach(id, func(idx int, host m3db.Host) {
wg.Add(1)
enqueued++
Expand All @@ -330,7 +341,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
// Return write to pool
s.writeOpPool.Put(w)

return s.consistencyResult(quorum, enqueued, resultErrs, resultErr)
return s.consistencyResult(majority, enqueued, resultErrs, resultErr)
}

func (s *session) Fetch(id string, startInclusive, endExclusive time.Time) (m3db.SeriesIterator, error) {
Expand All @@ -355,7 +366,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
resultErrLock sync.Mutex
resultErr error
resultErrs int
quorum int
majority int
fetchBatchOpsByHostIdx [][]*fetchBatchOp
)

Expand Down Expand Up @@ -388,7 +399,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
}()

s.RLock()
quorum = s.quorum
majority = s.majority
for idx := range ids {
idx := idx

Expand Down Expand Up @@ -425,7 +436,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
// Requests still pending
return
}
if err := s.consistencyResult(quorum, enqueued, int(errs), firstErr); err != nil {
if err := s.consistencyResult(majority, enqueued, int(errs), firstErr); err != nil {
resultErrLock.Lock()
if resultErr == nil {
resultErr = err
Expand Down Expand Up @@ -509,7 +520,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
return iters, nil
}

func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr error) error {
func (s *session) consistencyResult(majority, enqueued, resultErrs int, resultErr error) error {
if resultErrs == 0 {
return nil
}
Expand All @@ -525,9 +536,9 @@ func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr
switch s.level {
case m3db.ConsistencyLevelAll:
return reportErr()
case m3db.ConsistencyLevelQuorum:
if success >= quorum {
// Meets quorum
case m3db.ConsistencyLevelMajority:
if success >= majority {
// Meets majority
break
}
return reportErr()
Expand Down
6 changes: 3 additions & 3 deletions client/session_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ func TestSessionFetchConsistencyLevelAll(t *testing.T) {
}
}

func TestSessionFetchConsistencyLevelQuorum(t *testing.T) {
func TestSessionFetchConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

for i := 0; i <= 1; i++ {
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeFail)
}
}

Expand Down
69 changes: 60 additions & 9 deletions client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -67,22 +68,62 @@ func newSessionTestOptions() m3db.ClientOptions {
WriteOpPoolSize(0).
FetchBatchOpPoolSize(0).
TopologyType(topology.NewStaticTopologyType(
topology.NewStaticTopologyTypeOptions().
Replicas(sessionTestReplicas).
ShardScheme(shardScheme).
HostShardSets(hostShardSets)))
topology.NewStaticTopologyTypeOptions().
Replicas(sessionTestReplicas).
ShardScheme(shardScheme).
HostShardSets(hostShardSets)))
}

func TestSessionClusterConnectTimesOut(t *testing.T) {
func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelAll
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelMajority
for i := 0; i <= 1; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testSessionClusterConnectConsistencyLevel(
t *testing.T,
ctrl *gomock.Controller,
level m3db.ConsistencyLevel,
failures int,
expected outcome,
) {
opts := newSessionTestOptions()
opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval)
opts = opts.ClusterConnectConsistencyLevel(level)
s, err := newSession(opts)
assert.NoError(t, err)
session := s.(*session)

var failingConns int32
session.newHostQueueFn = func(
host m3db.Host,
writeBatchRequestPool writeBatchRequestPool,
Expand All @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) {
) hostQueue {
hostQueue := mocks.NewMockhostQueue(ctrl)
hostQueue.EXPECT().Open().Times(1)
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
hostQueue.EXPECT().Close().Times(1)
if atomic.AddInt32(&failingConns, 1) <= int32(failures) {
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
} else {
min := opts.GetMinConnectionCount()
hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes()
}
hostQueue.EXPECT().Close().AnyTimes()
return hostQueue
}

err = session.Open()
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
switch expected {
case outcomeSuccess:
assert.NoError(t, err)
case outcomeFail:
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
}
}

func mockHostQueues(
Expand Down
17 changes: 10 additions & 7 deletions client/session_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,35 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess)
level := m3db.ConsistencyLevelAll
testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelQuorum(t *testing.T) {
func TestSessionWriteConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelMajority
for i := 0; i <= 1; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testWriteConsistencyLevel(
Expand Down
10 changes: 5 additions & 5 deletions interfaces/m3db/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ const (
consistencyLevelNone ConsistencyLevel = iota

// ConsistencyLevelOne corresponds to a single node participating
// for an operation
// for an operation to succeed
ConsistencyLevelOne

// ConsistencyLevelQuorum corresponds to quorum of nodes participating
// ConsistencyLevelMajority corresponds to the majority of nodes participating
// for an operation to succeed
ConsistencyLevelQuorum
ConsistencyLevelMajority

// ConsistencyLevelAll corresponds to all nodes participating
// for an operation to succeed
Expand All @@ -44,8 +44,8 @@ func (l ConsistencyLevel) String() string {
switch l {
case ConsistencyLevelOne:
return "ConsistencyLevelOne"
case ConsistencyLevelQuorum:
return "ConsistencyLevelQuorum"
case ConsistencyLevelMajority:
return "ConsistencyLevelMajority"
case ConsistencyLevelAll:
return "ConsistencyLevelAll"
}
Expand Down
Loading