Skip to content

Commit

Permalink
Merge pull request #300 from bdarnell/raft-addressing
Browse files Browse the repository at this point in the history
Replace hard-coded NodeID 1 with actual node/store IDs.
  • Loading branch information
bdarnell committed Feb 4, 2015
2 parents 99168a8 + 3167076 commit cc5e4ae
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 20 deletions.
4 changes: 2 additions & 2 deletions kv/local_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestLocalSenderLookupReplica(t *testing.T) {
ls := NewLocalSender()
db := client.NewKV(NewTxnCoordSender(ls, clock), nil)
store := storage.NewStore(clock, eng, db, nil)
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil {
t.Fatal(err)
}
ls.AddStore(store)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestLocalSenderLookupReplica(t *testing.T) {
e[i] = engine.NewInMem(proto.Attributes{}, 1<<20)
s[i] = storage.NewStore(clock, e[i], db, nil)
s[i].Ident.StoreID = rng.storeID
if err := s[i].Bootstrap(proto.StoreIdent{StoreID: rng.storeID}); err != nil {
if err := s[i].Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: rng.storeID}); err != nil {
t.Fatal(err)
}
if err := s[i].Start(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func createTestDB() (db *client.KV, eng engine.Engine, clock *hlc.Clock,
db = client.NewKV(sender, nil)
db.User = storage.UserRoot
store := storage.NewStore(clock, eng, db, g)
if err = store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
if err = store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil {
return
}
if err = store.Start(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion storage/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func createTestStoreWithEngine(t *testing.T, eng engine.Engine, clock *hlc.Clock
db.User = storage.UserRoot
store := storage.NewStore(clock, eng, db, g)
if bootstrap {
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil {
t.Fatal(err)
}
}
Expand Down
25 changes: 13 additions & 12 deletions storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type raftInterface interface {
stop()
}

type singleNodeRaft struct {
type raftImpl struct {
mr *multiraft.MultiRaft
mu sync.Mutex
// groups is the set of active group IDs. The map itself is
Expand All @@ -69,9 +69,10 @@ type singleNodeRaft struct {
stopper *util.Stopper
}

func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft {
mr, err := multiraft.NewMultiRaft(1, &multiraft.Config{
Transport: multiraft.NewLocalRPCTransport(),
func newRaft(nodeID multiraft.NodeID, storage multiraft.Storage,
transport multiraft.Transport) raftInterface {
mr, err := multiraft.NewMultiRaft(nodeID, &multiraft.Config{
Transport: transport,
Storage: storage,
TickInterval: time.Millisecond,
ElectionTimeoutTicks: 5,
Expand All @@ -81,7 +82,7 @@ func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft {
if err != nil {
log.Fatal(err)
}
snr := &singleNodeRaft{
snr := &raftImpl{
mr: mr,
groups: map[int64]chan struct{}{},
commitCh: make(chan committedCommand, 10),
Expand All @@ -92,9 +93,9 @@ func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft {
return snr
}

var _ raftInterface = (*singleNodeRaft)(nil)
var _ raftInterface = (*raftImpl)(nil)

func (snr *singleNodeRaft) createGroup(id int64) error {
func (snr *raftImpl) createGroup(id int64) error {
snr.mu.Lock()
ch, ok := snr.groups[id]
if !ok {
Expand All @@ -110,7 +111,7 @@ func (snr *singleNodeRaft) createGroup(id int64) error {
return nil
}

func (snr *singleNodeRaft) removeGroup(id int64) error {
func (snr *raftImpl) removeGroup(id int64) error {
snr.mu.Lock()
if ch, ok := snr.groups[id]; ok {
delete(snr.groups, id)
Expand All @@ -122,7 +123,7 @@ func (snr *singleNodeRaft) removeGroup(id int64) error {
return nil
}

func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) {
func (snr *raftImpl) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) {
if cmd.Cmd.GetValue() == nil {
panic("proposed a nil command")
}
Expand All @@ -138,15 +139,15 @@ func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftComm
snr.mr.SubmitCommand(uint64(cmd.RaftID), string(cmdIDKey), data)
}

func (snr *singleNodeRaft) committed() <-chan committedCommand {
func (snr *raftImpl) committed() <-chan committedCommand {
return snr.commitCh
}

func (snr *singleNodeRaft) stop() {
func (snr *raftImpl) stop() {
snr.stopper.Stop()
}

func (snr *singleNodeRaft) run() {
func (snr *raftImpl) run() {
for {
select {
case e := <-snr.mr.Events:
Expand Down
3 changes: 2 additions & 1 deletion storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type RangeManager interface {
Engine() engine.Engine
Gossip() *gossip.Gossip
StoreID() proto.StoreID
RaftNodeID() multiraft.NodeID

// Range manipulation methods.
AddRange(rng *Range) error
Expand Down Expand Up @@ -1474,7 +1475,7 @@ func (r *Range) InitialState() (raft.InitialState, error) {
Commit: raftInitialLogIndex,
}
cs := raftpb.ConfState{
Nodes: []uint64{1},
Nodes: []uint64{uint64(r.rm.RaftNodeID())},
}
_, err := engine.MVCCGetProto(r.rm.Engine(), engine.RaftStateKey(r.Desc.RaftID),
proto.ZeroTimestamp, nil, &hs)
Expand Down
2 changes: 1 addition & 1 deletion storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (tc *testContext) Start(t *testing.T) {
if tc.store == nil {
tc.store = NewStore(tc.clock, tc.engine, nil, tc.gossip)
if !tc.skipBootstrap {
if err := tc.store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
if err := tc.store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil {
t.Fatal(err)
}
}
Expand Down
25 changes: 24 additions & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ func verifyKeys(start, end proto.Key) error {
return nil
}

// makeRaftNodeID packs a NodeID and StoreID into a single uint64 for use in raft.
// Both values are int32s, but we only allocate 8 bits for StoreID so we have
// the option of expanding proto.NodeID and being more "wasteful" of node IDs.
func makeRaftNodeID(n proto.NodeID, s proto.StoreID) multiraft.NodeID {
if n <= 0 || s <= 0 {
// Zeroes are likely the result of incomplete initialization.
panic("NodeID and StoreID must be > 0")
}
if s > 0xff {
panic("StoreID must be <= 0xff")
}
return multiraft.NodeID(n)<<8 | multiraft.NodeID(s)
}

func decodeRaftNodeID(n multiraft.NodeID) (proto.NodeID, proto.StoreID) {
return proto.NodeID(n >> 8), proto.StoreID(n & 0xff)
}

type rangeAlreadyExists struct {
rng *Range
}
Expand Down Expand Up @@ -299,7 +317,7 @@ func (s *Store) Start() error {
start := engine.RangeDescriptorKey(engine.KeyMin)
end := engine.RangeDescriptorKey(engine.KeyMax)

s.raft = newSingleNodeRaft(s)
s.raft = newRaft(s.RaftNodeID(), s, multiraft.NewLocalRPCTransport())
// Start Raft processing goroutine.
go s.processRaft(s.raft, s.closer)

Expand Down Expand Up @@ -566,6 +584,11 @@ func (s *Store) ClusterID() string { return s.Ident.ClusterID }
// StoreID accessor.
func (s *Store) StoreID() proto.StoreID { return s.Ident.StoreID }

// RaftNodeID accessor.
func (s *Store) RaftNodeID() multiraft.NodeID {
return makeRaftNodeID(s.Ident.NodeID, s.Ident.StoreID)
}

// Clock accessor.
func (s *Store) Clock() *hlc.Clock { return s.clock }

Expand Down
48 changes: 47 additions & 1 deletion storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"code.google.com/p/go-uuid/uuid"
"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/multiraft"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/storage/engine"
Expand Down Expand Up @@ -97,7 +98,7 @@ func createTestStore(t *testing.T) (*Store, *hlc.ManualClock) {
clock := hlc.NewClock(manual.UnixNano)
eng := engine.NewInMem(proto.Attributes{}, 1<<20)
store := NewStore(clock, eng, nil, g)
if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil {
if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil {
t.Fatal(err)
}
store.db = client.NewKV(&testSender{store: store}, nil)
Expand Down Expand Up @@ -912,3 +913,48 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) {
t.Errorf("expected transaction aborted error; got %s", err)
}
}

func TestRaftNodeID(t *testing.T) {
cases := []struct {
nodeID proto.NodeID
storeID proto.StoreID
expected multiraft.NodeID
}{
{1, 1, 257},
{2, 3, 515},
{math.MaxInt32, 0xff, 549755813887},
}
for _, c := range cases {
x := makeRaftNodeID(c.nodeID, c.storeID)
if x != c.expected {
t.Errorf("makeRaftNodeID(%v, %v) returned %v; expected %v",
c.nodeID, c.storeID, x, c.expected)
}
n, s := decodeRaftNodeID(x)
if n != c.nodeID || s != c.storeID {
t.Errorf("decodeRaftNodeID(%v) returned %v, %v; expected %v, %v",
x, n, s, c.nodeID, c.storeID)
}
}

panicCases := []struct {
nodeID proto.NodeID
storeID proto.StoreID
}{
{0, 1},
{1, 0},
{1, 0x100},
{1, -1},
{-1, 1},
}
for _, c := range panicCases {
func() {
defer func() {
recover()
}()
x := makeRaftNodeID(c.nodeID, c.storeID)
t.Errorf("makeRaftNodeID(%v, %v) returned %v; expected panic",
c.nodeID, c.storeID, x)
}()
}
}

0 comments on commit cc5e4ae

Please sign in to comment.