diff --git a/kv/coordinator_test.go b/kv/coordinator_test.go index 380b7a91424b..67d6d48af64c 100644 --- a/kv/coordinator_test.go +++ b/kv/coordinator_test.go @@ -35,16 +35,20 @@ func createTestDB(t *testing.T) (*DB, *hlc.Clock, *hlc.ManualClock) { manual := hlc.ManualClock(0) clock := hlc.NewClock(manual.UnixNano) eng := engine.NewInMem(proto.Attributes{}, 1<<20) - store := storage.NewStore(clock, eng, nil, nil) - store.Ident.StoreID = 1 - replica := proto.Replica{NodeID: 1, StoreID: 1, RangeID: 1} - _, err := store.CreateRange(engine.KeyMin, engine.KeyMax, []proto.Replica{replica}) - if err != nil { + kv := NewLocalKV() + db := NewDB(kv, clock) + store := storage.NewStore(clock, eng, db, nil) + if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { t.Fatal(err) } - kv := NewLocalKV() kv.AddStore(store) - db := NewDB(kv, clock) + _, err := store.CreateRange(store.BootstrapRangeMetadata()) + if err != nil { + t.Fatal(err) + } + if err := store.Init(); err != nil { + t.Fatal(err) + } return db, clock, &manual } diff --git a/kv/local_kv_test.go b/kv/local_kv_test.go index 92e211cb93b3..99062ba4a652 100644 --- a/kv/local_kv_test.go +++ b/kv/local_kv_test.go @@ -100,42 +100,59 @@ func TestLocalKVGetStore(t *testing.T) { } } -// createTestStore creates a new Store instance with a single range -// spanning from start to end. -func createTestStore(storeID int32, start, end engine.Key, t *testing.T) *storage.Store { +func TestLocalKVLookupReplica(t *testing.T) { manual := hlc.ManualClock(0) clock := hlc.NewClock(manual.UnixNano) eng := engine.NewInMem(proto.Attributes{}, 1<<20) - store := storage.NewStore(clock, eng, nil, nil) - store.Ident.StoreID = storeID - replica := proto.Replica{StoreID: storeID, RangeID: 1} - _, err := store.CreateRange(start, end, []proto.Replica{replica}) - if err != nil { + kv := NewLocalKV() + db := NewDB(kv, clock) + store := storage.NewStore(clock, eng, db, nil) + if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { t.Fatal(err) } - return store -} - -func TestLocalKVLookupReplica(t *testing.T) { - kv := NewLocalKV() - s1 := createTestStore(1, engine.Key("a"), engine.Key("c"), t) - s2 := createTestStore(2, engine.Key("x"), engine.Key("z"), t) - kv.AddStore(s1) - kv.AddStore(s2) + kv.AddStore(store) + meta := store.BootstrapRangeMetadata() + meta.StartKey = engine.KeySystemPrefix + meta.EndKey = engine.PrefixEndKey(engine.KeySystemPrefix) + if _, err := store.CreateRange(meta); err != nil { + t.Fatal(err) + } + if err := store.Init(); err != nil { + t.Fatal(err) + } + // Create two new stores with ranges we care about. + var s [2]*storage.Store + ranges := []struct { + storeID int32 + start, end engine.Key + }{ + {2, engine.Key("a"), engine.Key("c")}, + {3, engine.Key("x"), engine.Key("z")}, + } + for i, rng := range ranges { + s[i] = storage.NewStore(clock, eng, db, nil) + s[i].Ident.StoreID = rng.storeID + replica := proto.Replica{StoreID: rng.storeID} + _, err := s[i].CreateRange(store.NewRangeMetadata(rng.start, rng.end, []proto.Replica{replica})) + if err != nil { + t.Fatal(err) + } + kv.AddStore(s[i]) + } - if r, err := kv.lookupReplica(engine.Key("a"), engine.Key("c")); r.StoreID != s1.Ident.StoreID || err != nil { - t.Errorf("expected store %d; got %d: %v", s1.Ident.StoreID, r.StoreID, err) + if r, err := kv.lookupReplica(engine.Key("a"), engine.Key("c")); r.StoreID != s[0].Ident.StoreID || err != nil { + t.Errorf("expected store %d; got %d: %v", s[0].Ident.StoreID, r.StoreID, err) } - if r, err := kv.lookupReplica(engine.Key("b"), nil); r.StoreID != s1.Ident.StoreID || err != nil { - t.Errorf("expected store %d; got %d: %v", s1.Ident.StoreID, r.StoreID, err) + if r, err := kv.lookupReplica(engine.Key("b"), nil); r.StoreID != s[0].Ident.StoreID || err != nil { + t.Errorf("expected store %d; got %d: %v", s[0].Ident.StoreID, r.StoreID, err) } if r, err := kv.lookupReplica(engine.Key("b"), engine.Key("d")); r != nil || err == nil { t.Errorf("expected store 0 and error got %d", r.StoreID) } - if r, err := kv.lookupReplica(engine.Key("x"), engine.Key("z")); r.StoreID != s2.Ident.StoreID { - t.Errorf("expected store %d; got %d: %v", s2.Ident.StoreID, r.StoreID, err) + if r, err := kv.lookupReplica(engine.Key("x"), engine.Key("z")); r.StoreID != s[1].Ident.StoreID { + t.Errorf("expected store %d; got %d: %v", s[1].Ident.StoreID, r.StoreID, err) } - if r, err := kv.lookupReplica(engine.Key("y"), nil); r.StoreID != s2.Ident.StoreID || err != nil { - t.Errorf("expected store %d; got %d: %v", s2.Ident.StoreID, r.StoreID, err) + if r, err := kv.lookupReplica(engine.Key("y"), nil); r.StoreID != s[1].Ident.StoreID || err != nil { + t.Errorf("expected store %d; got %d: %v", s[1].Ident.StoreID, r.StoreID, err) } } diff --git a/server/node.go b/server/node.go index 9622cd383471..2ab5bab3ae0a 100644 --- a/server/node.go +++ b/server/node.go @@ -126,19 +126,10 @@ func BootstrapCluster(clusterID string, eng engine.Engine) (*kv.DB, error) { } // Create first range. - replica := proto.Replica{ - NodeID: 1, - StoreID: 1, - RangeID: 1, - Attrs: proto.Attributes{}, - } - rng, err := s.CreateRange(engine.KeyMin, engine.KeyMax, []proto.Replica{replica}) + rng, err := s.CreateRange(s.BootstrapRangeMetadata()) if err != nil { return nil, err } - if rng.Meta.RangeID != 1 { - return nil, util.Errorf("expected range id of 1, got %d", rng.Meta.RangeID) - } // Create a KV DB with a local KV to directly modify the new range. localKV := kv.NewLocalKV() diff --git a/simout b/simout deleted file mode 100755 index 7b1d55d6d832..000000000000 Binary files a/simout and /dev/null differ diff --git a/storage/engine/keys.go b/storage/engine/keys.go index 56c6d540b3bb..e87476f879b3 100644 --- a/storage/engine/keys.go +++ b/storage/engine/keys.go @@ -177,9 +177,6 @@ var ( // KeyLocalIdent stores an immutable identifier for this store, // created when the store is first bootstrapped. KeyLocalIdent = MakeKey(KeyLocalPrefix, Key("store-ident")) - // KeyLocalRangeIDGenerator is a range ID generator sequence. Range - // IDs must be unique per node ID. - KeyLocalRangeIDGenerator = MakeKey(KeyLocalPrefix, Key("range-idgen")) // KeyLocalRangeMetadataPrefix is the prefix for keys storing range metadata. // The value is a struct of type RangeMetadata. KeyLocalRangeMetadataPrefix = MakeKey(KeyLocalPrefix, Key("range-")) @@ -224,10 +221,12 @@ var ( // KeyConfigZonePrefix specifies the key prefix for zone // configurations. The suffix is the affected key prefix. KeyConfigZonePrefix = MakeKey(KeySystemPrefix, Key("zone")) - // KeyNodeIDGenerator contains a sequence generator for node IDs. + // KeyNodeIDGenerator is the global node ID generator sequence. KeyNodeIDGenerator = MakeKey(KeySystemPrefix, Key("node-idgen")) - // KeyRaftIDGenerator is a Raft consensus group ID generator sequence. + // KeyRaftIDGenerator is the global Raft consensus group ID generator sequence. KeyRaftIDGenerator = MakeKey(KeySystemPrefix, Key("raft-idgen")) + // KeyRangeIDGenerator is the global range ID generator sequence. + KeyRangeIDGenerator = MakeKey(KeySystemPrefix, Key("range-idgen")) // KeySchemaPrefix specifies key prefixes for schema definitions. KeySchemaPrefix = MakeKey(KeySystemPrefix, Key("schema")) // KeyStoreIDGeneratorPrefix specifies key prefixes for sequence diff --git a/storage/id_alloc.go b/storage/id_alloc.go new file mode 100644 index 000000000000..3b32f18e86e1 --- /dev/null +++ b/storage/id_alloc.go @@ -0,0 +1,109 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package storage + +import ( + "github.com/cockroachdb/cockroach/proto" + "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/util/log" +) + +const ( + // allocationTrigger is a special ID which if encountered, + // causes allocation of the next block of IDs. + allocationTrigger = 0 +) + +type IDAllocator struct { + idKey engine.Key + db DB + minID int64 // Minimum ID to return + blockSize int64 // Block allocation size + ids chan int64 // Channel of available IDs +} + +// NewIDAllocator creates a new ID allocator which increments the +// specified key in allocation blocks of size blockSize, with +// allocated IDs starting at minID. Allocated IDs are positive +// integers. +func NewIDAllocator(idKey engine.Key, db DB, minID int64, blockSize int64) *IDAllocator { + if minID <= allocationTrigger { + log.Fatalf("minID must be > %d", allocationTrigger) + } + if blockSize < 1 { + log.Fatalf("blockSize must be a positive integer: %d", blockSize) + } + ia := &IDAllocator{ + idKey: idKey, + db: db, + minID: minID, + blockSize: blockSize, + ids: make(chan int64, blockSize+blockSize/2+1), + } + ia.ids <- allocationTrigger + return ia +} + +// Allocate allocates a new ID from the global KV DB. If multiple +func (ia *IDAllocator) Allocate() int64 { + for { + id := <-ia.ids + if id == allocationTrigger { + go ia.allocateBlock(ia.blockSize) + } else { + return id + } + } +} + +// allocateBlock allocates a block of IDs using db.Increment and +// sends all IDs on the ids channel. Midway through the block, a +// special allocationTrigger ID is inserted which causes allocation +// to occur before IDs run out to hide Increment latency. +func (ia *IDAllocator) allocateBlock(incr int64) { + ir := <-ia.db.Increment(&proto.IncrementRequest{ + RequestHeader: proto.RequestHeader{ + Key: ia.idKey, + User: UserRoot, + }, + Increment: ia.blockSize, + }) + if ir.Error != nil { + log.Errorf("unable to allocate %d %q IDs: %v", ia.blockSize, ia.idKey, ir.Error) + } + if ir.NewValue <= ia.minID { + log.Warningf("allocator key is currently set at %d; minID is %d; allocating again to skip %d IDs", + ir.NewValue, ia.minID, ia.minID-ir.NewValue) + ia.allocateBlock(ia.minID - ir.NewValue + ia.blockSize) + return + } + + // Add all new ids to the channel for consumption. + start := ir.NewValue - ia.blockSize + 1 + end := ir.NewValue + 1 + if start < ia.minID { + start = ia.minID + } + + for i := start; i < end; i++ { + ia.ids <- i + if i == (start+end)/2 { + ia.ids <- allocationTrigger + } + } +} diff --git a/storage/id_alloc_test.go b/storage/id_alloc_test.go new file mode 100644 index 000000000000..25979febe9e3 --- /dev/null +++ b/storage/id_alloc_test.go @@ -0,0 +1,86 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package storage + +import ( + "sort" + "testing" + + "github.com/cockroachdb/cockroach/storage/engine" +) + +// TestIDAllocator creates an ID allocator which allocates from +// the Raft ID generator system key in blocks of 10 with a minimum +// ID value of 2 and then starts up 10 goroutines each allocating +// 10 IDs. All goroutines deposit the allocated IDs into a final +// channel, which is queried at the end to ensure that all IDs +// from 2 to 101 are present. +func TestIDAllocator(t *testing.T) { + store, _ := createTestStore(false, t) + allocd := make(chan int, 100) + idAlloc := NewIDAllocator(engine.KeyRaftIDGenerator, store.db, 2, 10) + + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 10; j++ { + allocd <- int(idAlloc.Allocate()) + } + }() + } + + // Verify all IDs accounted for. + ids := make([]int, 100) + for i := 0; i < 100; i++ { + ids[i] = <-allocd + } + sort.Ints(ids) + for i := 0; i < 100; i++ { + if ids[i] != i+2 { + t.Error("expected \"%d\"th ID to be %d; got %d", i, i+2, ids[i]) + } + } + + // Verify no leftover IDs. + select { + case id := <-allocd: + t.Error("there appear to be leftover IDs, starting with %d", id) + default: + // Expected; noop. + } +} + +// TestIDAllocatorNegativeValue creates an ID allocator against an +// increment key which is preset to a negative value. We verify that +// the id allocator makes a double-alloc to make up the difference +// and push the id allocation into positive integers. +func TestIDAllocatorNegativeValue(t *testing.T) { + store, _ := createTestStore(false, t) + // Increment our key to a negative value. + newValue, err := engine.Increment(store.engine, engine.KeyRaftIDGenerator, -1024) + if err != nil { + t.Fatal(err) + } + if newValue != -1024 { + t.Errorf("expected new value to be -1024; got %d", newValue) + } + idAlloc := NewIDAllocator(engine.KeyRaftIDGenerator, store.db, 2, 10) + value := idAlloc.Allocate() + if value != 2 { + t.Errorf("expected id allocation to have value 2; got %d", value) + } +} diff --git a/storage/store.go b/storage/store.go index d330bef6b3e5..dd323d705c16 100644 --- a/storage/store.go +++ b/storage/store.go @@ -26,6 +26,7 @@ import ( "sync" "time" + gogoproto "code.google.com/p/gogoprotobuf/proto" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/storage/engine" @@ -39,8 +40,10 @@ const ( // GCResponseCacheExpiration is the expiration duration for response // cache entries. GCResponseCacheExpiration = 1 * time.Hour - // rangeIDAllocCount is the number of range IDs to allocate per allocation. + // raftIDAllocCount is the number of Raft IDs to allocate per allocation. raftIDAllocCount = 10 + // rangeIDAllocCount is the number of range IDs to allocate per allocation. + rangeIDAllocCount = 10 ) // rangeMetadataKeyPrefix and hexadecimal-formatted range ID. @@ -106,25 +109,25 @@ func (s StoreDescriptor) Less(b util.Ordered) bool { // A Store maintains a map of ranges by start key. A Store corresponds // to one physical device. type Store struct { - Ident proto.StoreIdent - clock *hlc.Clock - engine engine.Engine // The underlying key-value store - db DB // Cockroach KV DB - allocator *allocator // Makes allocation decisions - gossip *gossip.Gossip // Passed to new ranges + Ident proto.StoreIdent + clock *hlc.Clock + engine engine.Engine // The underlying key-value store + db DB // Cockroach KV DB + allocator *allocator // Makes allocation decisions + gossip *gossip.Gossip // Passed to new ranges + raftIDAlloc *IDAllocator // Raft ID allocator + rangeIDAlloc *IDAllocator // Range ID allocator mu sync.RWMutex // Protects variables below... ranges map[int64]*Range // Map of ranges by range ID rangesByKey RangeSlice // Sorted slice of ranges by StartKey - nextRaftID int64 // Next available Raft ID - lastRaftID int64 // Last available Raft ID in pre-alloc'd block } // NewStore returns a new instance of a store. -func NewStore(clock *hlc.Clock, engine engine.Engine, db DB, gossip *gossip.Gossip) *Store { +func NewStore(clock *hlc.Clock, eng engine.Engine, db DB, gossip *gossip.Gossip) *Store { return &Store{ clock: clock, - engine: engine, + engine: eng, db: db, allocator: &allocator{}, gossip: gossip, @@ -150,10 +153,18 @@ func (s *Store) String() string { // Init starts the engine, sets the GC and reads the StoreIdent. func (s *Store) Init() error { + // Close store for idempotency. + s.Close() + // Start engine and set garbage collector. if err := s.engine.Start(); err != nil { return err } + + // Create ID allocators. + s.raftIDAlloc = NewIDAllocator(engine.KeyRaftIDGenerator, s.db, 2, raftIDAllocCount) + s.rangeIDAlloc = NewIDAllocator(engine.KeyRangeIDGenerator, s.db, 2, rangeIDAllocCount) + // GCTimeouts method is called each time an engine compaction is // underway. It sets minimum timeouts for transaction records and // response cache entries. @@ -172,20 +183,35 @@ func (s *Store) Init() error { return &NotBootstrappedError{} } - // TODO(spencer): scan through all range metadata and instantiate - // ranges. Right now we just get range ID hardcoded as 1. - var meta proto.RangeMetadata - if ok, err = engine.GetProto(s.engine, makeRangeKey(1), &meta); err != nil || !ok { - return util.Errorf("unable to read range 1: %v", err) + s.mu.Lock() + defer s.mu.Unlock() + start := engine.KeyLocalRangeMetadataPrefix + end := engine.PrefixEndKey(start) + const rows = 64 + for { + kvs, err := s.engine.Scan(start, end, rows) + if err != nil { + return err + } + for _, kv := range kvs { + var meta proto.RangeMetadata + if err := gogoproto.Unmarshal(kv.Value, &meta); err != nil { + return err + } + rng := NewRange(&meta, s.clock, s.engine, s.allocator, s.gossip, s) + rng.Start() + s.ranges[meta.RangeID] = rng + s.rangesByKey = append(s.rangesByKey, rng) + } + if len(kvs) < rows { + break + } + start = engine.NextKey(kvs[rows-1].Key) } - rng := NewRange(&meta, s.clock, s.engine, s.allocator, s.gossip, s) - rng.Start() + // Ensure that ranges are sorted. + sort.Sort(s.rangesByKey) - s.mu.Lock() - defer s.mu.Unlock() - s.ranges[meta.RangeID] = rng - s.rangesByKey = append(s.rangesByKey, rng) return nil } @@ -233,58 +259,82 @@ func (s *Store) LookupRange(start, end engine.Key) *Range { return s.rangesByKey[n] } -// CreateRange creates a new range by allocating a new range ID and -// storing range metadata. On success, returns the new range. -// -// TODO(spencer): this method is temporary and will need to be -// removed. In its place, ranges themselves will initiate the -// creation of a new range. This is done during splits. The range -// leader calls store.SplitRange, which allocates the raft ID and all -// range IDs (range IDs will need to be changed from the local ID -// allocator used here to a global allocator). This way, the complete -// replica slice will be available for the Raft.InternalSplitRange -// command to be executed at each of the replicas in the original -// range, yielding identical results. -func (s *Store) CreateRange(startKey, endKey engine.Key, replicas []proto.Replica) (*Range, error) { - if len(replicas) != 1 { - panic("CreateRange can only handle a single replica currently") - } - raftID, err := s.allocateRaftID() - if err != nil { - return nil, err - } - rangeID, err := engine.Increment(s.engine, engine.KeyLocalRangeIDGenerator, 1) - if err != nil { - return nil, err - } - if ok, _ := engine.GetProto(s.engine, makeRangeKey(rangeID), nil); ok { - return nil, util.Error("range ID already in use") +// BootstrapRangeMetadata returns a range metadata for the very first range +// in a cluster. +func (s *Store) BootstrapRangeMetadata() *proto.RangeMetadata { + return &proto.RangeMetadata{ + ClusterID: s.Ident.ClusterID, + RangeDescriptor: proto.RangeDescriptor{ + RaftID: 1, + StartKey: engine.KeyMin, + EndKey: engine.KeyMax, + Replicas: []proto.Replica{ + proto.Replica{ + NodeID: 1, + StoreID: 1, + RangeID: 1, + }, + }, + }, + RangeID: 1, } - // RangeMetadata is stored local to this store only. It is neither - // replicated via raft nor available via the global kv store. +} + +// NewRangeMetadata creates a new RangeMetadata based on start and +// end keys and the supplied proto.Replicas slice. It allocates new +// Raft and range IDs to fill out the supplied RangeMetadata. Returns +// the new RangeMetadata. +func (s *Store) NewRangeMetadata(start, end engine.Key, replicas []proto.Replica) *proto.RangeMetadata { meta := &proto.RangeMetadata{ ClusterID: s.Ident.ClusterID, RangeDescriptor: proto.RangeDescriptor{ - RaftID: raftID, - StartKey: startKey, - EndKey: endKey, + RaftID: s.raftIDAlloc.Allocate(), + StartKey: start, + EndKey: end, Replicas: replicas, }, - RangeID: rangeID, + // Note that RangeID is specifically left blank, as it varies + // for each replica which belongs to the range. + } + + // Allocate a range ID for each replica. + for i := range meta.Replicas { + meta.Replicas[i].RangeID = s.rangeIDAlloc.Allocate() + } + + return meta +} + +// CreateRange creates a new Range using the provided RangeMetadata. +// It persists the metadata locally and adds the new range to the +// ranges map and sorted rangesByKey slice for doing range lookups +// by key. +func (s *Store) CreateRange(meta *proto.RangeMetadata) (*Range, error) { + // Set the RangeID for meta based on the replica which matches this store. + for _, repl := range meta.Replicas { + if repl.StoreID == s.Ident.StoreID { + meta.RangeID = repl.RangeID + break + } + } + if meta.RangeID == 0 { + return nil, util.Errorf("unable to determine range ID for this range; no replicas match store %d: %s", + s.Ident.StoreID, meta.Replicas) } - meta.Replicas[0].RangeID = rangeID - err = engine.PutProto(s.engine, makeRangeKey(rangeID), meta) + + err := engine.PutProto(s.engine, makeRangeKey(meta.RangeID), meta) if err != nil { return nil, err } - rng := NewRange(meta, s.clock, s.engine, s.allocator, s.gossip, s) - rng.Start() + s.mu.Lock() defer s.mu.Unlock() - s.ranges[rangeID] = rng - // Append new ranges to rangesByKey and keep sorted. + rng := NewRange(meta, s.clock, s.engine, s.allocator, s.gossip, s) + rng.Start() + s.ranges[meta.RangeID] = rng s.rangesByKey = append(s.rangesByKey, rng) sort.Sort(s.rangesByKey) + return rng, nil } @@ -355,58 +405,11 @@ func (s *Store) ExecuteCmd(method string, args proto.Request, reply proto.Respon return rng.ReadWriteCmd(method, args, reply) } -// allocateRaftID allocates a new Raft ID from the global KV DB. -// Each store allocates blocks of IDS in raftIDAllocCount increments -// for efficiency. -func (s *Store) allocateRaftID() (int64, error) { - // Handle the bootstrapping case where db is nil; bootstrap raft ID is 1. - if s.db == nil { - return 1, nil - } - - s.mu.Lock() - // If we have pre-alloc'd Raft IDs available, return one. - if s.nextRaftID < s.lastRaftID { - id := s.nextRaftID - s.nextRaftID++ - s.mu.Unlock() - return id, nil - } - - // Unlock mutex in anticipation of increment. - s.mu.Unlock() - ir := <-s.db.Increment(&proto.IncrementRequest{ - RequestHeader: proto.RequestHeader{ - Key: engine.KeyRaftIDGenerator, - User: UserRoot, - }, - Increment: raftIDAllocCount, - }) - if ir.Error != nil { - return 0, util.Errorf("unable to allocate raft IDs: %v", ir.Error) - } - if ir.NewValue <= 1 { - return 0, util.Errorf("raft ID allocation returned invalid allocation: %+v", ir) - } - - // Re-lock mutex after call to increment. - s.mu.Lock() - defer s.mu.Unlock() - s.lastRaftID = ir.NewValue + 1 - s.nextRaftID = ir.NewValue - raftIDAllocCount + 1 - // Raft ID 1 is reserved for the bootstrap raft group. - if s.nextRaftID <= 1 { - s.nextRaftID = 2 - } - s.nextRaftID++ - return s.nextRaftID - 1, nil - -} - // RangeManager is an interface satisfied by Store through which ranges // contained in the store can access the methods required for rebalancing // (i.e. splitting and merging) operations. // TODO(Tobias): add necessary operations as we need them. type RangeManager interface { - CreateRange(startKey, endKey engine.Key, replicas []proto.Replica) (*Range, error) + NewRangeMetadata(start, end engine.Key, replicas []proto.Replica) *proto.RangeMetadata + CreateRange(meta *proto.RangeMetadata) (*Range, error) } diff --git a/storage/store_test.go b/storage/store_test.go index 906ffec7e75e..7532abda6ebd 100644 --- a/storage/store_test.go +++ b/storage/store_test.go @@ -62,8 +62,7 @@ func TestStoreInitAndBootstrap(t *testing.T) { } // Create range and fetch. - replica := proto.Replica{StoreID: store.Ident.StoreID, RangeID: 1} - if _, err := store.CreateRange(engine.KeyMin, engine.KeyMax, []proto.Replica{replica}); err != nil { + if _, err := store.CreateRange(store.BootstrapRangeMetadata()); err != nil { t.Errorf("failure to create first range: %v", err) } if _, err := store.GetRange(1); err != nil { @@ -137,21 +136,29 @@ func createTestStore(createDefaultRange bool, t *testing.T) (*Store, *hlc.Manual clock := hlc.NewClock(manual.UnixNano) eng := engine.NewInMem(proto.Attributes{}, 1<<20) store := NewStore(clock, eng, nil, nil) - store.Ident.StoreID = 1 + if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + t.Fatal(err) + } + db, _ := newTestDB(store) + store.db = db replica := proto.Replica{StoreID: 1, RangeID: 1} // Create system key range for allocations. - _, err := store.CreateRange(engine.KeySystemPrefix, engine.PrefixEndKey(engine.KeySystemPrefix), []proto.Replica{replica}) + meta := store.BootstrapRangeMetadata() + meta.StartKey = engine.KeySystemPrefix + meta.EndKey = engine.PrefixEndKey(engine.KeySystemPrefix) + _, err := store.CreateRange(meta) if err != nil { t.Fatal(err) } - // Now that the system key range is available, set store DB so new + if err := store.Init(); err != nil { + t.Fatal(err) + } + // Now that the system key range is available, initialize the store. set store DB so new // ranges can be allocated as needed for tests. - db, _ := newTestDB(store) - store.db = db // If requested, create a default range for tests from "a"-"z". if createDefaultRange { - replica = proto.Replica{StoreID: 1, RangeID: 2} - _, err := store.CreateRange(engine.Key("a"), engine.Key("z"), []proto.Replica{replica}) + replica = proto.Replica{StoreID: 1} + _, err := store.CreateRange(store.NewRangeMetadata(engine.Key("a"), engine.Key("z"), []proto.Replica{replica})) if err != nil { t.Fatal(err) } @@ -278,7 +285,7 @@ func addTestRange(store *Store, start, end engine.Key, t *testing.T) *Range { replicas := []proto.Replica{ proto.Replica{StoreID: store.Ident.StoreID}, } - r, err := store.CreateRange(start, end, replicas) + r, err := store.CreateRange(store.NewRangeMetadata(start, end, replicas)) if err != nil { t.Fatal(err) }