From 31670766743568f4ca2fd69424a24c7df89df26a Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Tue, 3 Feb 2015 15:38:43 -0500 Subject: [PATCH] Replace hard-coded NodeID 1 with actual node/store IDs. This makes it possible for multiple stores to address each other with a suitable multiraft.Transport (which will be provided by --- kv/local_sender_test.go | 4 ++-- kv/txn_coord_sender_test.go | 2 +- storage/db_test.go | 2 +- storage/raft.go | 25 +++++++++---------- storage/range.go | 3 ++- storage/range_test.go | 2 +- storage/store.go | 25 ++++++++++++++++++- storage/store_test.go | 48 ++++++++++++++++++++++++++++++++++++- 8 files changed, 91 insertions(+), 20 deletions(-) diff --git a/kv/local_sender_test.go b/kv/local_sender_test.go index b2477a8e7aa2..eeb57b141d2c 100644 --- a/kv/local_sender_test.go +++ b/kv/local_sender_test.go @@ -127,7 +127,7 @@ func TestLocalSenderLookupReplica(t *testing.T) { ls := NewLocalSender() db := client.NewKV(NewTxnCoordSender(ls, clock), nil) store := storage.NewStore(clock, eng, db, nil) - if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil { t.Fatal(err) } ls.AddStore(store) @@ -157,7 +157,7 @@ func TestLocalSenderLookupReplica(t *testing.T) { e[i] = engine.NewInMem(proto.Attributes{}, 1<<20) s[i] = storage.NewStore(clock, e[i], db, nil) s[i].Ident.StoreID = rng.storeID - if err := s[i].Bootstrap(proto.StoreIdent{StoreID: rng.storeID}); err != nil { + if err := s[i].Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: rng.storeID}); err != nil { t.Fatal(err) } if err := s[i].Start(); err != nil { diff --git a/kv/txn_coord_sender_test.go b/kv/txn_coord_sender_test.go index a2a70db554b6..7b3e6cc5e98e 100644 --- a/kv/txn_coord_sender_test.go +++ b/kv/txn_coord_sender_test.go @@ -50,7 +50,7 @@ func createTestDB() (db *client.KV, eng engine.Engine, clock *hlc.Clock, db = client.NewKV(sender, nil) db.User = storage.UserRoot store := storage.NewStore(clock, eng, db, g) - if err = store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + if err = store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil { return } if err = store.Start(); err != nil { diff --git a/storage/db_test.go b/storage/db_test.go index cbc97d751908..5cd75f1d229f 100644 --- a/storage/db_test.go +++ b/storage/db_test.go @@ -61,7 +61,7 @@ func createTestStoreWithEngine(t *testing.T, eng engine.Engine, clock *hlc.Clock db.User = storage.UserRoot store := storage.NewStore(clock, eng, db, g) if bootstrap { - if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil { t.Fatal(err) } } diff --git a/storage/raft.go b/storage/raft.go index c1de8ec86922..0398afc18094 100644 --- a/storage/raft.go +++ b/storage/raft.go @@ -57,7 +57,7 @@ type raftInterface interface { stop() } -type singleNodeRaft struct { +type raftImpl struct { mr *multiraft.MultiRaft mu sync.Mutex // groups is the set of active group IDs. The map itself is @@ -69,9 +69,10 @@ type singleNodeRaft struct { stopper *util.Stopper } -func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft { - mr, err := multiraft.NewMultiRaft(1, &multiraft.Config{ - Transport: multiraft.NewLocalRPCTransport(), +func newRaft(nodeID multiraft.NodeID, storage multiraft.Storage, + transport multiraft.Transport) raftInterface { + mr, err := multiraft.NewMultiRaft(nodeID, &multiraft.Config{ + Transport: transport, Storage: storage, TickInterval: time.Millisecond, ElectionTimeoutTicks: 5, @@ -81,7 +82,7 @@ func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft { if err != nil { log.Fatal(err) } - snr := &singleNodeRaft{ + snr := &raftImpl{ mr: mr, groups: map[int64]chan struct{}{}, commitCh: make(chan committedCommand, 10), @@ -92,9 +93,9 @@ func newSingleNodeRaft(storage multiraft.Storage) *singleNodeRaft { return snr } -var _ raftInterface = (*singleNodeRaft)(nil) +var _ raftInterface = (*raftImpl)(nil) -func (snr *singleNodeRaft) createGroup(id int64) error { +func (snr *raftImpl) createGroup(id int64) error { snr.mu.Lock() ch, ok := snr.groups[id] if !ok { @@ -110,7 +111,7 @@ func (snr *singleNodeRaft) createGroup(id int64) error { return nil } -func (snr *singleNodeRaft) removeGroup(id int64) error { +func (snr *raftImpl) removeGroup(id int64) error { snr.mu.Lock() if ch, ok := snr.groups[id]; ok { delete(snr.groups, id) @@ -122,7 +123,7 @@ func (snr *singleNodeRaft) removeGroup(id int64) error { return nil } -func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) { +func (snr *raftImpl) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) { if cmd.Cmd.GetValue() == nil { panic("proposed a nil command") } @@ -138,15 +139,15 @@ func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftComm snr.mr.SubmitCommand(uint64(cmd.RaftID), string(cmdIDKey), data) } -func (snr *singleNodeRaft) committed() <-chan committedCommand { +func (snr *raftImpl) committed() <-chan committedCommand { return snr.commitCh } -func (snr *singleNodeRaft) stop() { +func (snr *raftImpl) stop() { snr.stopper.Stop() } -func (snr *singleNodeRaft) run() { +func (snr *raftImpl) run() { for { select { case e := <-snr.mr.Events: diff --git a/storage/range.go b/storage/range.go index 31507a94ff5b..ed3512f2026c 100644 --- a/storage/range.go +++ b/storage/range.go @@ -140,6 +140,7 @@ type RangeManager interface { Engine() engine.Engine Gossip() *gossip.Gossip StoreID() proto.StoreID + RaftNodeID() multiraft.NodeID // Range manipulation methods. AddRange(rng *Range) error @@ -1474,7 +1475,7 @@ func (r *Range) InitialState() (raft.InitialState, error) { Commit: raftInitialLogIndex, } cs := raftpb.ConfState{ - Nodes: []uint64{1}, + Nodes: []uint64{uint64(r.rm.RaftNodeID())}, } _, err := engine.MVCCGetProto(r.rm.Engine(), engine.RaftStateKey(r.Desc.RaftID), proto.ZeroTimestamp, nil, &hs) diff --git a/storage/range_test.go b/storage/range_test.go index 5c0206dd096a..6e1e70f25299 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -105,7 +105,7 @@ func (tc *testContext) Start(t *testing.T) { if tc.store == nil { tc.store = NewStore(tc.clock, tc.engine, nil, tc.gossip) if !tc.skipBootstrap { - if err := tc.store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + if err := tc.store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil { t.Fatal(err) } } diff --git a/storage/store.go b/storage/store.go index a634bc922017..418384d30829 100644 --- a/storage/store.go +++ b/storage/store.go @@ -110,6 +110,24 @@ func verifyKeys(start, end proto.Key) error { return nil } +// makeRaftNodeID packs a NodeID and StoreID into a single uint64 for use in raft. +// Both values are int32s, but we only allocate 8 bits for StoreID so we have +// the option of expanding proto.NodeID and being more "wasteful" of node IDs. +func makeRaftNodeID(n proto.NodeID, s proto.StoreID) multiraft.NodeID { + if n <= 0 || s <= 0 { + // Zeroes are likely the result of incomplete initialization. + panic("NodeID and StoreID must be > 0") + } + if s > 0xff { + panic("StoreID must be <= 0xff") + } + return multiraft.NodeID(n)<<8 | multiraft.NodeID(s) +} + +func decodeRaftNodeID(n multiraft.NodeID) (proto.NodeID, proto.StoreID) { + return proto.NodeID(n >> 8), proto.StoreID(n & 0xff) +} + type rangeAlreadyExists struct { rng *Range } @@ -299,7 +317,7 @@ func (s *Store) Start() error { start := engine.RangeDescriptorKey(engine.KeyMin) end := engine.RangeDescriptorKey(engine.KeyMax) - s.raft = newSingleNodeRaft(s) + s.raft = newRaft(s.RaftNodeID(), s, multiraft.NewLocalRPCTransport()) // Start Raft processing goroutine. go s.processRaft(s.raft, s.closer) @@ -566,6 +584,11 @@ func (s *Store) ClusterID() string { return s.Ident.ClusterID } // StoreID accessor. func (s *Store) StoreID() proto.StoreID { return s.Ident.StoreID } +// RaftNodeID accessor. +func (s *Store) RaftNodeID() multiraft.NodeID { + return makeRaftNodeID(s.Ident.NodeID, s.Ident.StoreID) +} + // Clock accessor. func (s *Store) Clock() *hlc.Clock { return s.clock } diff --git a/storage/store_test.go b/storage/store_test.go index 0dad06fe2704..d2eec228e0bf 100644 --- a/storage/store_test.go +++ b/storage/store_test.go @@ -31,6 +31,7 @@ import ( "code.google.com/p/go-uuid/uuid" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/multiraft" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/rpc" "github.com/cockroachdb/cockroach/storage/engine" @@ -97,7 +98,7 @@ func createTestStore(t *testing.T) (*Store, *hlc.ManualClock) { clock := hlc.NewClock(manual.UnixNano) eng := engine.NewInMem(proto.Attributes{}, 1<<20) store := NewStore(clock, eng, nil, g) - if err := store.Bootstrap(proto.StoreIdent{StoreID: 1}); err != nil { + if err := store.Bootstrap(proto.StoreIdent{NodeID: 1, StoreID: 1}); err != nil { t.Fatal(err) } store.db = client.NewKV(&testSender{store: store}, nil) @@ -912,3 +913,48 @@ func TestStoreResolveWriteIntentNoTxn(t *testing.T) { t.Errorf("expected transaction aborted error; got %s", err) } } + +func TestRaftNodeID(t *testing.T) { + cases := []struct { + nodeID proto.NodeID + storeID proto.StoreID + expected multiraft.NodeID + }{ + {1, 1, 257}, + {2, 3, 515}, + {math.MaxInt32, 0xff, 549755813887}, + } + for _, c := range cases { + x := makeRaftNodeID(c.nodeID, c.storeID) + if x != c.expected { + t.Errorf("makeRaftNodeID(%v, %v) returned %v; expected %v", + c.nodeID, c.storeID, x, c.expected) + } + n, s := decodeRaftNodeID(x) + if n != c.nodeID || s != c.storeID { + t.Errorf("decodeRaftNodeID(%v) returned %v, %v; expected %v, %v", + x, n, s, c.nodeID, c.storeID) + } + } + + panicCases := []struct { + nodeID proto.NodeID + storeID proto.StoreID + }{ + {0, 1}, + {1, 0}, + {1, 0x100}, + {1, -1}, + {-1, 1}, + } + for _, c := range panicCases { + func() { + defer func() { + recover() + }() + x := makeRaftNodeID(c.nodeID, c.storeID) + t.Errorf("makeRaftNodeID(%v, %v) returned %v; expected panic", + c.nodeID, c.storeID, x) + }() + } +}