Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP range splitting; #109

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var transactionalActions = map[string]struct{}{
storage.Put: struct{}{},
storage.ReapQueue: struct{}{},
storage.Scan: struct{}{},
storage.InternalSplit: struct{}{},
}

func isTransactional(method string) bool {
Expand Down
2 changes: 1 addition & 1 deletion kv/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func createTestDB(t *testing.T) (*DB, *hlc.Clock, *hlc.ManualClock) {
return db, clock, &manual
}

// creatPutRequest returns a ready-made request using the
// createPutRequest returns a ready-made request using the
// specified key, value & txn ID.
func createPutRequest(key engine.Key, value, txnID []byte) *proto.PutRequest {
return &proto.PutRequest{
Expand Down
44 changes: 44 additions & 0 deletions kv/split_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Tobias Schottdorf ([email protected])

package kv

import (
"testing"

"github.com/cockroachdb/cockroach/storage/engine"
)

func TestRangeCoordinateSplit(t *testing.T) {
db, _, _ := createTestDB(t)
defer db.Close()

store, err := db.kv.(*LocalKV).GetStore(1)
if err != nil {
t.Fatal(err)
}
rng, err := store.GetRange(1)
if err != nil {
t.Fatal(err)
}
splitKey := engine.Key("m")

err = rng.CoordinateSplit(splitKey)
if err != nil {
t.Fatal(err)
}
}
14 changes: 7 additions & 7 deletions kv/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func runTransaction(db *DB, opts *storage.TransactionOptions, retryable func(db
// - Re-create transaction on TransactionAbortedError
func (tdb *txnDB) executeCmd(method string, args proto.Request, replyChan interface{}) {
tdb.Lock()
// If the transaction hasn't yet been created, create now, using
// this command's key as the base key.
if tdb.txn == nil {
tdb.txn = storage.NewTransaction(tdb.Name, args.Header().Key,
tdb.UserPriority, tdb.Isolation, tdb.clock)
tdb.timestamp = tdb.txn.Timestamp
}
if method == storage.EndTransaction {
// For EndTransaction, make sure key is set to txn ID.
args.Header().Key = engine.MakeLocalKey(engine.KeyLocalTransactionPrefix, tdb.txn.ID)
Expand All @@ -136,13 +143,6 @@ func (tdb *txnDB) executeCmd(method string, args proto.Request, replyChan interf
tdb.Unlock()
return
}
// If the transaction hasn't yet been created, create now, using
// this command's key as the base key.
if tdb.txn == nil {
tdb.txn = storage.NewTransaction(tdb.Name, args.Header().Key,
tdb.UserPriority, tdb.Isolation, tdb.clock)
tdb.timestamp = tdb.txn.Timestamp
}
// Set args.Timestamp & args.Txn to reflect current values.
txnCopy := *tdb.txn
args.Header().User = tdb.User
Expand Down
5 changes: 3 additions & 2 deletions proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ message RequestHeader {
// defaults to 1.
optional int32 user_priority = 7 [default = 1];
// Txn is set non-nil if a transaction is underway. If set, the value
// of Priority is ignored.
// of UserPriority is ignored.
optional Transaction txn = 8;
}

Expand Down Expand Up @@ -435,7 +435,8 @@ message InternalSnapshotCopyResponse {
// An InternalSplitRequest is arguments to the InternalSplit() method.
message InternalSplitRequest {
optional RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
optional bytes split_key = 2 [(gogoproto.nullable) = false];
optional RangeDescriptor range_descriptor = 2 [(gogoproto.nullable) = false];
optional bytes split_key = 3 [(gogoproto.nullable) = false];
}

// An InternalSplitResponse is the return value from the InternalSplit()
Expand Down
8 changes: 8 additions & 0 deletions storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type DB interface {
InternalPushTxn(args *proto.InternalPushTxnRequest) <-chan *proto.InternalPushTxnResponse
InternalResolveIntent(args *proto.InternalResolveIntentRequest) <-chan *proto.InternalResolveIntentResponse
InternalSnapshotCopy(args *proto.InternalSnapshotCopyRequest) <-chan *proto.InternalSnapshotCopyResponse
InternalSplit(args *proto.InternalSplitRequest) <-chan *proto.InternalSplitResponse

RunTransaction(opts *TransactionOptions, retryable func(db DB) error) error
}
Expand Down Expand Up @@ -246,6 +247,13 @@ func (db *BaseDB) InternalSnapshotCopy(args *proto.InternalSnapshotCopyRequest)
return replyChan
}

// InternalSplit is called by a range leader coordinating a split of its range.
func (db *BaseDB) InternalSplit(args *proto.InternalSplitRequest) <-chan *proto.InternalSplitResponse {
replyChan := make(chan *proto.InternalSplitResponse, 1)
go db.Executor(InternalSplit, args, replyChan)
return replyChan
}

// RunTransaction executes retryable in the context of a distributed
// transaction. The transaction is automatically aborted if retryable
// returns any error aside from a txnDBError, and is automatically
Expand Down
8 changes: 7 additions & 1 deletion storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (mvcc *MVCC) ResolveWriteIntent(key Key, txn *proto.Transaction) error {
// write intents specified by start and end keys for a given
// txn. ResolveWriteIntentRange will skip write intents of other
// txns. Specify max=0 for unbounded resolves.
func (mvcc *MVCC) ResolveWriteIntentRange(key Key, endKey Key, max int64, txn *proto.Transaction) (int64, error) {
func (mvcc *MVCC) ResolveWriteIntentRange(key, endKey Key, max int64, txn *proto.Transaction) (int64, error) {
if txn == nil {
return 0, util.Error("no txn specified")
}
Expand Down Expand Up @@ -572,6 +572,12 @@ func (mvcc *MVCC) ResolveWriteIntentRange(key Key, endKey Key, max int64, txn *p
return num, nil
}

// ApproximateSize computes the approximate size using the underlying engine.
func (mvcc *MVCC) ApproximateSize(key, endKey Key) (uint64, error) {
return mvcc.engine.ApproximateSize(encoding.EncodeBinary(nil, key),
encoding.EncodeBinary(nil, endKey))
}

// a splitSampleItem wraps a key along with an aggregate over key range
// preceding it.
type splitSampleItem struct {
Expand Down
156 changes: 140 additions & 16 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ const (
ttlClusterIDGossip = 30 * time.Second
)

// splitCheckInterval is the duration waited between periodic checks on whether
// the range is going to split.
// It is stored as a variable to allow for dynamic adaption (for instance based
// on write load) and easier testing.
var splitCheckInterval = 1 * time.Minute

// configPrefixes describes administrative configuration maps
// affecting ranges of the key-value map by key prefix.
var configPrefixes = []struct {
Expand Down Expand Up @@ -215,10 +221,7 @@ type Range struct {
respCache *ResponseCache // Provides idempotence for retries
}

// NewRange initializes the range using the given metadata. The range will have
// no knowledge of a possible store that contains it and thus all rebalancing
// operations will fail. Use NewRangeFromStore() instead to create ranges
// contained within a store.
// NewRange initializes the range using the given metadata.
//
// TODO(spencer): need to give range just a single instance of Range manager
// to contain clock, mvcc, engine, allocator & gossip. This will reduce
Expand Down Expand Up @@ -253,6 +256,7 @@ func (r *Range) Start() {
if r.IsFirstRange() {
go r.startGossip()
}
go r.maybeSplit()
}

// Stop ends the log processing loop.
Expand Down Expand Up @@ -1189,30 +1193,141 @@ func (r *Range) InternalSnapshotCopy(args *proto.InternalSnapshotCopyRequest, re
reply.SetGoError(err)
}

// shouldSplit returns whether the range thinks it should split, based on the
// approximate size and the lowest allowed maximal size in the contained zones.
func (r *Range) shouldSplit() bool {
if r.gossip == nil {
return false
}
desc := r.Meta.RangeDescriptor
bytes, err := r.mvcc.ApproximateSize(desc.StartKey, desc.EndKey)
if err != nil {
log.Warning(err)
return false
}
zonePrefixConfigMap, err := r.gossip.GetInfo(gossip.KeyConfigZone)
if err != nil {
log.Warning(err)
return false
}
if zonePrefixConfigMap == nil {
log.Warning("zone configuration map is empty")
return false
}
var maxBytes int64
zonePrefixConfigMap.(PrefixConfigMap).VisitPrefixes(desc.StartKey, desc.EndKey,
func(s, e engine.Key, conf interface{}) error {
maxBytes = conf.(*proto.ZoneConfig).RangeMaxBytes
return nil
})
return err == nil && bytes > uint64(maxBytes)
}

// maybeSplit regularly checks whether the range wants to split. If so, it
// initiates the split with the correct key.
// TODO(Tobias): make sure there can't be concurrent split ops.
// TODO(Tobias): The algorithm should take into account the zone configuration,
// figuring out exactly which subrange violates its zone-defined maximal size
// "most", and preferably split that subrange instead of the whole range.
func (r *Range) maybeSplit() {
ticker := time.NewTicker(splitCheckInterval)
for {
select {
case <-ticker.C:
if !r.IsLeader() {
continue
}
if r.shouldSplit() {
desc := r.Meta.RangeDescriptor
snapshotID, err := r.createSnapshot()
if err != nil {
log.Warning(err)
continue
}
splitKey, err := r.mvcc.FindSplitKey(desc.StartKey, desc.EndKey, snapshotID)
if err != nil {
log.Warning(err)
continue
}
if err := r.engine.ReleaseSnapshot(snapshotID); err != nil {
log.Warning(err)
}
if err := r.CoordinateSplit(splitKey); err != nil {
log.Errorf("error splitting range: %v", err)
}
}
case <-r.closer:
return
}
}
}

// CoordinateSplit .
func (r *Range) CoordinateSplit(splitKey engine.Key) error {
if r == nil || r.rm == nil {
return util.Error("cannot split without an underlying store")
}

if !r.ContainsKey(splitKey) {
return util.Errorf("split key not contained in range")
}

desc := r.Meta.RangeDescriptor
// Initialize new range metadata for the new range. In particular, new
// rangeIDs are generated inside the replicas contained in meta.
newMeta := r.rm.NewRangeMetadata(splitKey, desc.EndKey, desc.Replicas)

txnOpts := &TransactionOptions{
Name: fmt.Sprintf("Split Range %d", r.Meta.RangeID),
User: UserRoot,
Isolation: proto.SERIALIZABLE,
Retry: &util.RetryOptions{
MaxAttempts: 1,
},
}

err := r.rm.RunTransaction(txnOpts, func(txn DB) error {
args := &proto.InternalSplitRequest{
RequestHeader: proto.RequestHeader{Key: splitKey},
RangeDescriptor: newMeta.RangeDescriptor,
}
resp := <-txn.InternalSplit(args)
if err := resp.GoError(); err != nil {
return err
}
// TODO(mrtracy): Transactionally update the Meta{1,2} keys so that both
// ranges have their correct key spaces reflected.
// Special case: This range contains Meta{1,2} keys (-> possible deadlock)
return nil
})
return err
}

// InternalSplit shrinks the given range, using args.SplitKey as its new end.
// A new range is created, containing the remaining range from args.SplitKey to
// the original end key of the first range. The split is only carried out if
// args.Key and args.EndKey match precisely the split key and EndKey of the
// original range, respectively.
// TODO(Tobias): decide which key range we actually want to block. It is
// important that we catch all of the response cache at the precise moment the
// split comes into action, which is
//
// TODO(Tobias): This version, as is, is provisional. A correct implementation
// is much more subtle and requires a transaction. See:
// https://github.com/cockroachdb/cockroach/pull/64/files#r17667926
// See https://github.com/cockroachdb/cockroach/pull/64/files#r17667926
func (r *Range) InternalSplit(args *proto.InternalSplitRequest, reply *proto.InternalSplitResponse) {
var err error
if r.rm == nil {
reply.SetGoError(util.Error("cannot split without an underlying store"))
return
}
desc := r.Meta.RangeDescriptor

// InternalSplit affects the whole range, but in fact only needs to block
// everything that is to be moved into the new range beginning at the split
// key. To make sure that the range is protecting those and only those
// keys, we check that args.Key == splitKey and args.EndKey = desc.EndKey.
if !bytes.Equal(args.Key, args.SplitKey) || !bytes.Equal(args.EndKey, desc.EndKey) {
reply.SetGoError(util.Error("key range indicated does not match range"))
return
}
//if !bytes.Equal(args.Key, args.SplitKey) || !bytes.Equal(args.EndKey, desc.EndKey) {
// reply.SetGoError(util.Error("key range indicated does not match range"))
// return
//}
splitKey := args.SplitKey
if !r.ContainsKey(splitKey) {
reply.SetGoError(util.Errorf("split key not contained in range"))
Expand All @@ -1229,8 +1344,20 @@ func (r *Range) InternalSplit(args *proto.InternalSplitRequest, reply *proto.Int
// data which now belongs to the new queue. Since caching a little
// redundant data does not matter, we simply clone the response cache of
// the old range.
meta := &proto.RangeMetadata{
ClusterID: r.Meta.ClusterID,
RangeDescriptor: args.RangeDescriptor,
}

rangeID, err := r.rm.MatchRangeID(meta)
if err != nil {
reply.SetGoError(err)
return
}
// DropRange is a noop for nonexisting ranges, so this makes sure
// this function is idempotent and can be retried.
r.rm.DropRange(rangeID)

meta := r.rm.NewRangeMetadata(splitKey, desc.EndKey, desc.Replicas)
newRange, err := r.rm.CreateRange(meta)
if err != nil {
reply.SetGoError(err)
Expand Down Expand Up @@ -1263,7 +1390,4 @@ func (r *Range) InternalSplit(args *proto.InternalSplitRequest, reply *proto.Int
}
return
}
// TODO(mrtracy): Transactionally update the Meta{1,2} keys so that both
// ranges have their correct key spaces reflected.
// Special case: This range contains Meta{1,2} keys (-> possible deadlock)
}
Loading