Skip to content

Commit

Permalink
Merge pull request #68 from cockroachdb/spencerkimball/address-bens-c…
Browse files Browse the repository at this point in the history
…omments

Created new block ID allocator & put it into use.
  • Loading branch information
spencerkimball committed Sep 18, 2014
2 parents 7271660 + 478c6c2 commit 7861260
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 166 deletions.
18 changes: 11 additions & 7 deletions kv/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,20 @@ func createTestDB(t *testing.T) (*DB, *hlc.Clock, *hlc.ManualClock) {
manual := hlc.ManualClock(0)
clock := hlc.NewClock(manual.UnixNano)
eng := engine.NewInMem(proto.Attributes{}, 1<<20)
store := storage.NewStore(clock, eng, nil, nil)
store.Ident.StoreID = 1
replica := proto.Replica{NodeID: 1, StoreID: 1, RangeID: 1}
_, err := store.CreateRange(engine.KeyMin, engine.KeyMax, []proto.Replica{replica})
if err != nil {
kv := NewLocalKV()
db := NewDB(kv, clock)
store := storage.NewStore(clock, eng, db, nil)
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
t.Fatal(err)
}
kv := NewLocalKV()
kv.AddStore(store)
db := NewDB(kv, clock)
_, err := store.CreateRange(store.BootstrapRangeMetadata())
if err != nil {
t.Fatal(err)
}
if err := store.Init(); err != nil {
t.Fatal(err)
}
return db, clock, &manual
}

Expand Down
67 changes: 42 additions & 25 deletions kv/local_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,42 +100,59 @@ func TestLocalKVGetStore(t *testing.T) {
}
}

// createTestStore creates a new Store instance with a single range
// spanning from start to end.
func createTestStore(storeID int32, start, end engine.Key, t *testing.T) *storage.Store {
func TestLocalKVLookupReplica(t *testing.T) {
manual := hlc.ManualClock(0)
clock := hlc.NewClock(manual.UnixNano)
eng := engine.NewInMem(proto.Attributes{}, 1<<20)
store := storage.NewStore(clock, eng, nil, nil)
store.Ident.StoreID = storeID
replica := proto.Replica{StoreID: storeID, RangeID: 1}
_, err := store.CreateRange(start, end, []proto.Replica{replica})
if err != nil {
kv := NewLocalKV()
db := NewDB(kv, clock)
store := storage.NewStore(clock, eng, db, nil)
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
t.Fatal(err)
}
return store
}

func TestLocalKVLookupReplica(t *testing.T) {
kv := NewLocalKV()
s1 := createTestStore(1, engine.Key("a"), engine.Key("c"), t)
s2 := createTestStore(2, engine.Key("x"), engine.Key("z"), t)
kv.AddStore(s1)
kv.AddStore(s2)
kv.AddStore(store)
meta := store.BootstrapRangeMetadata()
meta.StartKey = engine.KeySystemPrefix
meta.EndKey = engine.PrefixEndKey(engine.KeySystemPrefix)
if _, err := store.CreateRange(meta); err != nil {
t.Fatal(err)
}
if err := store.Init(); err != nil {
t.Fatal(err)
}
// Create two new stores with ranges we care about.
var s [2]*storage.Store
ranges := []struct {
storeID int32
start, end engine.Key
}{
{2, engine.Key("a"), engine.Key("c")},
{3, engine.Key("x"), engine.Key("z")},
}
for i, rng := range ranges {
s[i] = storage.NewStore(clock, eng, db, nil)
s[i].Ident.StoreID = rng.storeID
replica := proto.Replica{StoreID: rng.storeID}
_, err := s[i].CreateRange(store.NewRangeMetadata(rng.start, rng.end, []proto.Replica{replica}))
if err != nil {
t.Fatal(err)
}
kv.AddStore(s[i])
}

if r, err := kv.lookupReplica(engine.Key("a"), engine.Key("c")); r.StoreID != s1.Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s1.Ident.StoreID, r.StoreID, err)
if r, err := kv.lookupReplica(engine.Key("a"), engine.Key("c")); r.StoreID != s[0].Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s[0].Ident.StoreID, r.StoreID, err)
}
if r, err := kv.lookupReplica(engine.Key("b"), nil); r.StoreID != s1.Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s1.Ident.StoreID, r.StoreID, err)
if r, err := kv.lookupReplica(engine.Key("b"), nil); r.StoreID != s[0].Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s[0].Ident.StoreID, r.StoreID, err)
}
if r, err := kv.lookupReplica(engine.Key("b"), engine.Key("d")); r != nil || err == nil {
t.Errorf("expected store 0 and error got %d", r.StoreID)
}
if r, err := kv.lookupReplica(engine.Key("x"), engine.Key("z")); r.StoreID != s2.Ident.StoreID {
t.Errorf("expected store %d; got %d: %v", s2.Ident.StoreID, r.StoreID, err)
if r, err := kv.lookupReplica(engine.Key("x"), engine.Key("z")); r.StoreID != s[1].Ident.StoreID {
t.Errorf("expected store %d; got %d: %v", s[1].Ident.StoreID, r.StoreID, err)
}
if r, err := kv.lookupReplica(engine.Key("y"), nil); r.StoreID != s2.Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s2.Ident.StoreID, r.StoreID, err)
if r, err := kv.lookupReplica(engine.Key("y"), nil); r.StoreID != s[1].Ident.StoreID || err != nil {
t.Errorf("expected store %d; got %d: %v", s[1].Ident.StoreID, r.StoreID, err)
}
}
11 changes: 1 addition & 10 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,10 @@ func BootstrapCluster(clusterID string, eng engine.Engine) (*kv.DB, error) {
}

// Create first range.
replica := proto.Replica{
NodeID: 1,
StoreID: 1,
RangeID: 1,
Attrs: proto.Attributes{},
}
rng, err := s.CreateRange(engine.KeyMin, engine.KeyMax, []proto.Replica{replica})
rng, err := s.CreateRange(s.BootstrapRangeMetadata())
if err != nil {
return nil, err
}
if rng.Meta.RangeID != 1 {
return nil, util.Errorf("expected range id of 1, got %d", rng.Meta.RangeID)
}

// Create a KV DB with a local KV to directly modify the new range.
localKV := kv.NewLocalKV()
Expand Down
Binary file removed simout
Binary file not shown.
9 changes: 4 additions & 5 deletions storage/engine/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ var (
// KeyLocalIdent stores an immutable identifier for this store,
// created when the store is first bootstrapped.
KeyLocalIdent = MakeKey(KeyLocalPrefix, Key("store-ident"))
// KeyLocalRangeIDGenerator is a range ID generator sequence. Range
// IDs must be unique per node ID.
KeyLocalRangeIDGenerator = MakeKey(KeyLocalPrefix, Key("range-idgen"))
// KeyLocalRangeMetadataPrefix is the prefix for keys storing range metadata.
// The value is a struct of type RangeMetadata.
KeyLocalRangeMetadataPrefix = MakeKey(KeyLocalPrefix, Key("range-"))
Expand Down Expand Up @@ -224,10 +221,12 @@ var (
// KeyConfigZonePrefix specifies the key prefix for zone
// configurations. The suffix is the affected key prefix.
KeyConfigZonePrefix = MakeKey(KeySystemPrefix, Key("zone"))
// KeyNodeIDGenerator contains a sequence generator for node IDs.
// KeyNodeIDGenerator is the global node ID generator sequence.
KeyNodeIDGenerator = MakeKey(KeySystemPrefix, Key("node-idgen"))
// KeyRaftIDGenerator is a Raft consensus group ID generator sequence.
// KeyRaftIDGenerator is the global Raft consensus group ID generator sequence.
KeyRaftIDGenerator = MakeKey(KeySystemPrefix, Key("raft-idgen"))
// KeyRangeIDGenerator is the global range ID generator sequence.
KeyRangeIDGenerator = MakeKey(KeySystemPrefix, Key("range-idgen"))
// KeySchemaPrefix specifies key prefixes for schema definitions.
KeySchemaPrefix = MakeKey(KeySystemPrefix, Key("schema"))
// KeyStoreIDGeneratorPrefix specifies key prefixes for sequence
Expand Down
109 changes: 109 additions & 0 deletions storage/id_alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

package storage

import (
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/log"
)

const (
// allocationTrigger is a special ID which if encountered,
// causes allocation of the next block of IDs.
allocationTrigger = 0
)

type IDAllocator struct {
idKey engine.Key
db DB
minID int64 // Minimum ID to return
blockSize int64 // Block allocation size
ids chan int64 // Channel of available IDs
}

// NewIDAllocator creates a new ID allocator which increments the
// specified key in allocation blocks of size blockSize, with
// allocated IDs starting at minID. Allocated IDs are positive
// integers.
func NewIDAllocator(idKey engine.Key, db DB, minID int64, blockSize int64) *IDAllocator {
if minID <= allocationTrigger {
log.Fatalf("minID must be > %d", allocationTrigger)
}
if blockSize < 1 {
log.Fatalf("blockSize must be a positive integer: %d", blockSize)
}
ia := &IDAllocator{
idKey: idKey,
db: db,
minID: minID,
blockSize: blockSize,
ids: make(chan int64, blockSize+blockSize/2+1),
}
ia.ids <- allocationTrigger
return ia
}

// Allocate allocates a new ID from the global KV DB. If multiple
func (ia *IDAllocator) Allocate() int64 {
for {
id := <-ia.ids
if id == allocationTrigger {
go ia.allocateBlock(ia.blockSize)
} else {
return id
}
}
}

// allocateBlock allocates a block of IDs using db.Increment and
// sends all IDs on the ids channel. Midway through the block, a
// special allocationTrigger ID is inserted which causes allocation
// to occur before IDs run out to hide Increment latency.
func (ia *IDAllocator) allocateBlock(incr int64) {
ir := <-ia.db.Increment(&proto.IncrementRequest{
RequestHeader: proto.RequestHeader{
Key: ia.idKey,
User: UserRoot,
},
Increment: ia.blockSize,
})
if ir.Error != nil {
log.Errorf("unable to allocate %d %q IDs: %v", ia.blockSize, ia.idKey, ir.Error)
}
if ir.NewValue <= ia.minID {
log.Warningf("allocator key is currently set at %d; minID is %d; allocating again to skip %d IDs",
ir.NewValue, ia.minID, ia.minID-ir.NewValue)
ia.allocateBlock(ia.minID - ir.NewValue + ia.blockSize)
return
}

// Add all new ids to the channel for consumption.
start := ir.NewValue - ia.blockSize + 1
end := ir.NewValue + 1
if start < ia.minID {
start = ia.minID
}

for i := start; i < end; i++ {
ia.ids <- i
if i == (start+end)/2 {
ia.ids <- allocationTrigger
}
}
}
86 changes: 86 additions & 0 deletions storage/id_alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

package storage

import (
"sort"
"testing"

"github.com/cockroachdb/cockroach/storage/engine"
)

// TestIDAllocator creates an ID allocator which allocates from
// the Raft ID generator system key in blocks of 10 with a minimum
// ID value of 2 and then starts up 10 goroutines each allocating
// 10 IDs. All goroutines deposit the allocated IDs into a final
// channel, which is queried at the end to ensure that all IDs
// from 2 to 101 are present.
func TestIDAllocator(t *testing.T) {
store, _ := createTestStore(false, t)
allocd := make(chan int, 100)
idAlloc := NewIDAllocator(engine.KeyRaftIDGenerator, store.db, 2, 10)

for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 10; j++ {
allocd <- int(idAlloc.Allocate())
}
}()
}

// Verify all IDs accounted for.
ids := make([]int, 100)
for i := 0; i < 100; i++ {
ids[i] = <-allocd
}
sort.Ints(ids)
for i := 0; i < 100; i++ {
if ids[i] != i+2 {
t.Error("expected \"%d\"th ID to be %d; got %d", i, i+2, ids[i])
}
}

// Verify no leftover IDs.
select {
case id := <-allocd:
t.Error("there appear to be leftover IDs, starting with %d", id)
default:
// Expected; noop.
}
}

// TestIDAllocatorNegativeValue creates an ID allocator against an
// increment key which is preset to a negative value. We verify that
// the id allocator makes a double-alloc to make up the difference
// and push the id allocation into positive integers.
func TestIDAllocatorNegativeValue(t *testing.T) {
store, _ := createTestStore(false, t)
// Increment our key to a negative value.
newValue, err := engine.Increment(store.engine, engine.KeyRaftIDGenerator, -1024)
if err != nil {
t.Fatal(err)
}
if newValue != -1024 {
t.Errorf("expected new value to be -1024; got %d", newValue)
}
idAlloc := NewIDAllocator(engine.KeyRaftIDGenerator, store.db, 2, 10)
value := idAlloc.Allocate()
if value != 2 {
t.Errorf("expected id allocation to have value 2; got %d", value)
}
}
Loading

0 comments on commit 7861260

Please sign in to comment.