Skip to content

Commit

Permalink
Add session connect consistency level (#29)
Browse files Browse the repository at this point in the history
* Add session connect consistency level

* Update terminology to reflect making consistency guarantees on majority not quorum
  • Loading branch information
robskillington authored Jul 27, 2016
1 parent ee299dd commit 7d663c8
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 53 deletions.
17 changes: 16 additions & 1 deletion client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

const (
// defaultConsistencyLevel is the default consistency level
defaultConsistencyLevel = m3db.ConsistencyLevelQuorum
defaultConsistencyLevel = m3db.ConsistencyLevelMajority

// defaultMaxConnectionCount is the default max connection count
defaultMaxConnectionCount = 32
Expand All @@ -49,6 +49,9 @@ const (
// defaultClusterConnectTimeout is the default cluster connect timeout
defaultClusterConnectTimeout = 30 * time.Second

// defaultClusterConnectConsistencyLevel is the default cluster connect consistency level
defaultClusterConnectConsistencyLevel = m3db.ConsistencyLevelMajority

// defaultWriteRequestTimeout is the default write request timeout
defaultWriteRequestTimeout = 5 * time.Second

Expand Down Expand Up @@ -111,6 +114,7 @@ type options struct {
minConnectionCount int
hostConnectTimeout time.Duration
clusterConnectTimeout time.Duration
clusterConnectConsistencyLevel m3db.ConsistencyLevel
writeRequestTimeout time.Duration
fetchRequestTimeout time.Duration
backgroundConnectInterval time.Duration
Expand Down Expand Up @@ -140,6 +144,7 @@ func NewOptions() m3db.ClientOptions {
minConnectionCount: defaultMinConnectionCount,
hostConnectTimeout: defaultHostConnectTimeout,
clusterConnectTimeout: defaultClusterConnectTimeout,
clusterConnectConsistencyLevel: defaultClusterConnectConsistencyLevel,
writeRequestTimeout: defaultWriteRequestTimeout,
fetchRequestTimeout: defaultFetchRequestTimeout,
backgroundConnectInterval: defaultBackgroundConnectInterval,
Expand Down Expand Up @@ -277,6 +282,16 @@ func (o *options) GetClusterConnectTimeout() time.Duration {
return o.clusterConnectTimeout
}

func (o *options) ClusterConnectConsistencyLevel(value m3db.ConsistencyLevel) m3db.ClientOptions {
opts := *o
opts.clusterConnectConsistencyLevel = value
return &opts
}

func (o *options) GetClusterConnectConsistencyLevel() m3db.ConsistencyLevel {
return o.clusterConnectConsistencyLevel
}

func (o *options) WriteRequestTimeout(value time.Duration) m3db.ClientOptions {
opts := *o
opts.writeRequestTimeout = value
Expand Down
53 changes: 32 additions & 21 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type session struct {
topoMap m3db.TopologyMap
topoMapCh chan m3db.TopologyMap
replicas int
quorum int
majority int
queues []hostQueue
state state
writeOpPool writeOpPool
Expand Down Expand Up @@ -160,6 +160,9 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {

shards := topologyMap.ShardScheme().All().Shards()
minConnectionCount := s.opts.GetMinConnectionCount()
replicas := topologyMap.Replicas()
majority := topologyMap.MajorityReplicas()
connectConsistencyLevel := s.opts.GetClusterConnectConsistencyLevel()
for {
if s.nowFn().Sub(start) >= s.opts.GetClusterConnectTimeout() {
for i := range queues {
Expand All @@ -168,24 +171,32 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
return ErrClusterConnectTimeout
}
// Be optimistic
clusterHasMinConnectionsToAllShards := true
clusterAvailable := true
for _, shard := range shards {
// Be optimistic
shardHasMinConnectionsToOneHost := true
shardReplicasAvailable := 0
routeErr := topologyMap.RouteShardForEach(shard, func(idx int, host m3db.Host) {
if queues[idx].GetConnectionCount() < minConnectionCount {
shardHasMinConnectionsToOneHost = false
if queues[idx].GetConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return routeErr
}
if !shardHasMinConnectionsToOneHost {
clusterHasMinConnectionsToAllShards = false
var clusterAvailableForShard bool
switch connectConsistencyLevel {
case m3db.ConsistencyLevelAll:
clusterAvailableForShard = shardReplicasAvailable == replicas
case m3db.ConsistencyLevelMajority:
clusterAvailableForShard = shardReplicasAvailable >= majority
case m3db.ConsistencyLevelOne:
clusterAvailableForShard = shardReplicasAvailable > 0
}
if !clusterAvailableForShard {
clusterAvailable = false
break
}
}
if clusterHasMinConnectionsToAllShards {
if clusterAvailable {
// All done
break
}
Expand All @@ -197,8 +208,8 @@ func (s *session) setTopologyMap(topologyMap m3db.TopologyMap) error {
s.queues = queues
s.topoMap = topologyMap
prevReplicas := s.replicas
s.replicas = topologyMap.Replicas()
s.quorum = topologyMap.QuorumReplicas()
s.replicas = replicas
s.majority = majority
if s.fetchBatchOpArrayArrayPool == nil ||
s.fetchBatchOpArrayArrayPool.Entries() != len(queues) {
s.fetchBatchOpArrayArrayPool = newFetchBatchOpArrayArrayPool(
Expand Down Expand Up @@ -271,7 +282,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
resultErrLock sync.Mutex
resultErr error
resultErrs int
quorum int
majority int
)

timeType, timeTypeErr := convert.UnitToTimeType(unit)
Expand Down Expand Up @@ -303,7 +314,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
}

s.RLock()
quorum = s.quorum
majority = s.majority
routeErr := s.topoMap.RouteForEach(id, func(idx int, host m3db.Host) {
wg.Add(1)
enqueued++
Expand All @@ -330,7 +341,7 @@ func (s *session) Write(id string, t time.Time, value float64, unit xtime.Unit,
// Return write to pool
s.writeOpPool.Put(w)

return s.consistencyResult(quorum, enqueued, resultErrs, resultErr)
return s.consistencyResult(majority, enqueued, resultErrs, resultErr)
}

func (s *session) Fetch(id string, startInclusive, endExclusive time.Time) (m3db.SeriesIterator, error) {
Expand All @@ -355,7 +366,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
resultErrLock sync.Mutex
resultErr error
resultErrs int
quorum int
majority int
fetchBatchOpsByHostIdx [][]*fetchBatchOp
)

Expand Down Expand Up @@ -388,7 +399,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
}()

s.RLock()
quorum = s.quorum
majority = s.majority
for idx := range ids {
idx := idx

Expand Down Expand Up @@ -425,7 +436,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
// Requests still pending
return
}
if err := s.consistencyResult(quorum, enqueued, int(errs), firstErr); err != nil {
if err := s.consistencyResult(majority, enqueued, int(errs), firstErr); err != nil {
resultErrLock.Lock()
if resultErr == nil {
resultErr = err
Expand Down Expand Up @@ -509,7 +520,7 @@ func (s *session) FetchAll(ids []string, startInclusive, endExclusive time.Time)
return iters, nil
}

func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr error) error {
func (s *session) consistencyResult(majority, enqueued, resultErrs int, resultErr error) error {
if resultErrs == 0 {
return nil
}
Expand All @@ -525,9 +536,9 @@ func (s *session) consistencyResult(quorum, enqueued, resultErrs int, resultErr
switch s.level {
case m3db.ConsistencyLevelAll:
return reportErr()
case m3db.ConsistencyLevelQuorum:
if success >= quorum {
// Meets quorum
case m3db.ConsistencyLevelMajority:
if success >= majority {
// Meets majority
break
}
return reportErr()
Expand Down
6 changes: 3 additions & 3 deletions client/session_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ func TestSessionFetchConsistencyLevelAll(t *testing.T) {
}
}

func TestSessionFetchConsistencyLevelQuorum(t *testing.T) {
func TestSessionFetchConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

for i := 0; i <= 1; i++ {
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testFetchConsistencyLevel(t, ctrl, m3db.ConsistencyLevelMajority, i, outcomeFail)
}
}

Expand Down
69 changes: 60 additions & 9 deletions client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -67,22 +68,62 @@ func newSessionTestOptions() m3db.ClientOptions {
WriteOpPoolSize(0).
FetchBatchOpPoolSize(0).
TopologyType(topology.NewStaticTopologyType(
topology.NewStaticTopologyTypeOptions().
Replicas(sessionTestReplicas).
ShardScheme(shardScheme).
HostShardSets(hostShardSets)))
topology.NewStaticTopologyTypeOptions().
Replicas(sessionTestReplicas).
ShardScheme(shardScheme).
HostShardSets(hostShardSets)))
}

func TestSessionClusterConnectTimesOut(t *testing.T) {
func TestSessionClusterConnectConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelAll
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelMajority
for i := 0; i <= 1; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionClusterConnectConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testSessionClusterConnectConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testSessionClusterConnectConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testSessionClusterConnectConsistencyLevel(
t *testing.T,
ctrl *gomock.Controller,
level m3db.ConsistencyLevel,
failures int,
expected outcome,
) {
opts := newSessionTestOptions()
opts = opts.ClusterConnectTimeout(3 * clusterConnectWaitInterval)
opts = opts.ClusterConnectConsistencyLevel(level)
s, err := newSession(opts)
assert.NoError(t, err)
session := s.(*session)

var failingConns int32
session.newHostQueueFn = func(
host m3db.Host,
writeBatchRequestPool writeBatchRequestPool,
Expand All @@ -91,14 +132,24 @@ func TestSessionClusterConnectTimesOut(t *testing.T) {
) hostQueue {
hostQueue := mocks.NewMockhostQueue(ctrl)
hostQueue.EXPECT().Open().Times(1)
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
hostQueue.EXPECT().Close().Times(1)
if atomic.AddInt32(&failingConns, 1) <= int32(failures) {
hostQueue.EXPECT().GetConnectionCount().Return(0).AnyTimes()
} else {
min := opts.GetMinConnectionCount()
hostQueue.EXPECT().GetConnectionCount().Return(min).AnyTimes()
}
hostQueue.EXPECT().Close().AnyTimes()
return hostQueue
}

err = session.Open()
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
switch expected {
case outcomeSuccess:
assert.NoError(t, err)
case outcomeFail:
assert.Error(t, err)
assert.Equal(t, ErrClusterConnectTimeout, err)
}
}

func mockHostQueues(
Expand Down
17 changes: 10 additions & 7 deletions client/session_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,35 @@ func TestSessionWriteConsistencyLevelAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, 0, outcomeSuccess)
level := m3db.ConsistencyLevelAll
testWriteConsistencyLevel(t, ctrl, level, 0, outcomeSuccess)
for i := 1; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelAll, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelQuorum(t *testing.T) {
func TestSessionWriteConsistencyLevelMajority(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelMajority
for i := 0; i <= 1; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
for i := 2; i <= 3; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelQuorum, i, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeFail)
}
}

func TestSessionWriteConsistencyLevelOne(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

level := m3db.ConsistencyLevelOne
for i := 0; i <= 2; i++ {
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, i, outcomeSuccess)
testWriteConsistencyLevel(t, ctrl, level, i, outcomeSuccess)
}
testWriteConsistencyLevel(t, ctrl, m3db.ConsistencyLevelOne, 3, outcomeFail)
testWriteConsistencyLevel(t, ctrl, level, 3, outcomeFail)
}

func testWriteConsistencyLevel(
Expand Down
10 changes: 5 additions & 5 deletions interfaces/m3db/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ const (
consistencyLevelNone ConsistencyLevel = iota

// ConsistencyLevelOne corresponds to a single node participating
// for an operation
// for an operation to succeed
ConsistencyLevelOne

// ConsistencyLevelQuorum corresponds to quorum of nodes participating
// ConsistencyLevelMajority corresponds to the majority of nodes participating
// for an operation to succeed
ConsistencyLevelQuorum
ConsistencyLevelMajority

// ConsistencyLevelAll corresponds to all nodes participating
// for an operation to succeed
Expand All @@ -44,8 +44,8 @@ func (l ConsistencyLevel) String() string {
switch l {
case ConsistencyLevelOne:
return "ConsistencyLevelOne"
case ConsistencyLevelQuorum:
return "ConsistencyLevelQuorum"
case ConsistencyLevelMajority:
return "ConsistencyLevelMajority"
case ConsistencyLevelAll:
return "ConsistencyLevelAll"
}
Expand Down
Loading

0 comments on commit 7d663c8

Please sign in to comment.