From 9d0ae19e94047a1fb33888df7e4107c6f447fa71 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 16:56:41 -0800 Subject: [PATCH 01/11] Update raft lib to use master --- conn/node.go | 4 +- conn/node_test.go | 4 +- conn/raft_server.go | 2 +- dgraph/cmd/zero/assign.go | 6 +- dgraph/cmd/zero/raft.go | 4 +- raftwal/storage.go | 4 +- raftwal/storage_test.go | 4 +- .../coreos => go.etcd.io}/etcd/LICENSE | 0 .../coreos => go.etcd.io}/etcd/NOTICE | 0 vendor/go.etcd.io/etcd/raft/OWNERS | 19 + .../coreos => go.etcd.io}/etcd/raft/README.md | 13 +- .../coreos => go.etcd.io}/etcd/raft/design.md | 0 .../coreos => go.etcd.io}/etcd/raft/doc.go | 8 +- .../coreos => go.etcd.io}/etcd/raft/log.go | 24 +- .../etcd/raft/log_unstable.go | 4 +- .../coreos => go.etcd.io}/etcd/raft/logger.go | 2 +- .../coreos => go.etcd.io}/etcd/raft/node.go | 120 ++++-- .../etcd/raft/progress.go | 0 .../coreos => go.etcd.io}/etcd/raft/raft.go | 351 +++++++++++++----- .../etcd/raft/raftpb/raft.pb.go | 0 .../etcd/raft/raftpb/raft.proto | 0 .../etcd/raft/rawnode.go | 61 ++- .../etcd/raft/read_only.go | 2 +- .../coreos => go.etcd.io}/etcd/raft/status.go | 34 +- .../etcd/raft/storage.go | 2 +- .../coreos => go.etcd.io}/etcd/raft/util.go | 23 +- vendor/vendor.json | 36 +- worker/draft.go | 4 +- worker/draft_test.go | 2 +- worker/snapshot.go | 2 +- 30 files changed, 528 insertions(+), 207 deletions(-) rename vendor/{github.com/coreos => go.etcd.io}/etcd/LICENSE (100%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/NOTICE (100%) create mode 100644 vendor/go.etcd.io/etcd/raft/OWNERS rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/README.md (96%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/design.md (100%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/doc.go (97%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/log.go (93%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/log_unstable.go (97%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/logger.go (99%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/node.go (83%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/progress.go (100%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/raft.go (80%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/raftpb/raft.pb.go (100%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/raftpb/raft.proto (100%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/rawnode.go (81%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/read_only.go (98%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/status.go (84%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/storage.go (99%) rename vendor/{github.com/coreos => go.etcd.io}/etcd/raft/util.go (86%) diff --git a/conn/node.go b/conn/node.go index c7ee041ccb8..ca3b111646a 100644 --- a/conn/node.go +++ b/conn/node.go @@ -27,14 +27,14 @@ import ( "sync" "time" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" "golang.org/x/net/context" ) diff --git a/conn/node_test.go b/conn/node_test.go index ebc67391d5b..1e094e6ba9e 100644 --- a/conn/node_test.go +++ b/conn/node_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" "golang.org/x/net/context" ) diff --git a/conn/raft_server.go b/conn/raft_server.go index ac79f9ed9be..12d31261bd5 100644 --- a/conn/raft_server.go +++ b/conn/raft_server.go @@ -23,11 +23,11 @@ import ( "sync" "time" - "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + "go.etcd.io/etcd/raft/raftpb" ) type sendmsg struct { diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 7e0d66784d3..03a85703d0f 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -66,9 +66,9 @@ func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.Assigned // TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be // based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this // node would still think that it's the leader. - if !node.AmLeader() { - return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.") - } + // if !node.AmLeader() { + // return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.") + // } if num.Val == 0 && !num.ReadOnly { return &emptyAssignedIds, x.Errorf("Nothing to be leased") diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 50c01ec3768..ef22d605dfb 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -27,8 +27,6 @@ import ( otrace "go.opencensus.io/trace" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" @@ -36,6 +34,8 @@ import ( farm "github.com/dgryski/go-farm" "github.com/golang/glog" "github.com/google/uuid" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" "golang.org/x/net/context" ) diff --git a/raftwal/storage.go b/raftwal/storage.go index 7d76ed26c10..5ab2377a30a 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -23,10 +23,10 @@ import ( "math" "sync" - "github.com/coreos/etcd/raft" - pb "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" "github.com/golang/glog" + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" "golang.org/x/net/trace" "github.com/dgraph-io/dgraph/x" diff --git a/raftwal/storage_test.go b/raftwal/storage_test.go index 46dafab26a5..02fd95c5776 100644 --- a/raftwal/storage_test.go +++ b/raftwal/storage_test.go @@ -39,10 +39,10 @@ import ( "reflect" "testing" - "github.com/coreos/etcd/raft" - pb "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft" + pb "go.etcd.io/etcd/raft/raftpb" ) func openBadger(dir string) (*badger.DB, error) { diff --git a/vendor/github.com/coreos/etcd/LICENSE b/vendor/go.etcd.io/etcd/LICENSE similarity index 100% rename from vendor/github.com/coreos/etcd/LICENSE rename to vendor/go.etcd.io/etcd/LICENSE diff --git a/vendor/github.com/coreos/etcd/NOTICE b/vendor/go.etcd.io/etcd/NOTICE similarity index 100% rename from vendor/github.com/coreos/etcd/NOTICE rename to vendor/go.etcd.io/etcd/NOTICE diff --git a/vendor/go.etcd.io/etcd/raft/OWNERS b/vendor/go.etcd.io/etcd/raft/OWNERS new file mode 100644 index 00000000000..ab781066e23 --- /dev/null +++ b/vendor/go.etcd.io/etcd/raft/OWNERS @@ -0,0 +1,19 @@ +approvers: +- heyitsanthony +- philips +- fanminshi +- gyuho +- mitake +- jpbetz +- xiang90 +- bdarnell +reviewers: +- heyitsanthony +- philips +- fanminshi +- gyuho +- mitake +- jpbetz +- xiang90 +- bdarnell +- tschottdorf diff --git a/vendor/github.com/coreos/etcd/raft/README.md b/vendor/go.etcd.io/etcd/raft/README.md similarity index 96% rename from vendor/github.com/coreos/etcd/raft/README.md rename to vendor/go.etcd.io/etcd/raft/README.md index fde22b16519..a78e5f720ba 100644 --- a/vendor/github.com/coreos/etcd/raft/README.md +++ b/vendor/go.etcd.io/etcd/raft/README.md @@ -13,7 +13,7 @@ To keep the codebase small as well as provide flexibility, the library only impl In order to easily test the Raft library, its behavior should be deterministic. To achieve this determinism, the library models Raft as a state machine. The state machine takes a `Message` as input. A message can either be a local timer update or a network message sent from a remote peer. The state machine's output is a 3-tuple `{[]Messages, []LogEntries, NextState}` consisting of an array of `Messages`, `log entries`, and `Raft state changes`. For state machines with the same state, the same state machine input should always generate the same state machine output. -A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: https://github.com/coreos/etcd/tree/master/contrib/raftexample +A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: https://github.com/etcd-io/etcd/tree/master/contrib/raftexample # Features @@ -21,7 +21,7 @@ This raft implementation is a full feature implementation of Raft protocol. Feat - Leader election - Log replication -- Log compaction +- Log compaction - Membership changes - Leadership transfer extension - Efficient linearizable read-only queries served by both the leader and followers @@ -40,13 +40,14 @@ This raft implementation also includes a few optional enhancements: - Batching log entries to reduce disk synchronized I/O - Writing to leader's disk in parallel - Internal proposal redirection from followers to leader -- Automatic stepping down when the leader loses quorum +- Automatic stepping down when the leader loses quorum +- Protection against unbounded log growth when quorum is lost ## Notable Users - [cockroachdb](https://github.com/cockroachdb/cockroach) A Scalable, Survivable, Strongly-Consistent SQL Database - [dgraph](https://github.com/dgraph-io/dgraph) A Scalable, Distributed, Low Latency, High Throughput Graph Database -- [etcd](https://github.com/coreos/etcd) A distributed reliable key-value store +- [etcd](https://github.com/etcd-io/etcd) A distributed reliable key-value store - [tikv](https://github.com/pingcap/tikv) A Distributed transactional key value database powered by Rust and Raft - [swarmkit](https://github.com/docker/swarmkit) A toolkit for orchestrating distributed systems at any scale. - [chain core](https://github.com/chain/chain) Software for operating permissioned, multi-asset blockchain networks @@ -140,7 +141,7 @@ The total state machine handling loop will look something like this: case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): - saveToStorage(rd.State, rd.Entries, rd.Snapshot) + saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) @@ -166,7 +167,7 @@ To propose changes to the state machine from the node to take application data, n.Propose(ctx, data) ``` -If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; the command may have to be reproposed after a timeout. +If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; the command may have to be reproposed after a timeout. To add or remove node in a cluster, build ConfChange struct 'cc' and call: diff --git a/vendor/github.com/coreos/etcd/raft/design.md b/vendor/go.etcd.io/etcd/raft/design.md similarity index 100% rename from vendor/github.com/coreos/etcd/raft/design.md rename to vendor/go.etcd.io/etcd/raft/design.md diff --git a/vendor/github.com/coreos/etcd/raft/doc.go b/vendor/go.etcd.io/etcd/raft/doc.go similarity index 97% rename from vendor/github.com/coreos/etcd/raft/doc.go rename to vendor/go.etcd.io/etcd/raft/doc.go index b55c591ff5d..c30d88445f2 100644 --- a/vendor/github.com/coreos/etcd/raft/doc.go +++ b/vendor/go.etcd.io/etcd/raft/doc.go @@ -23,7 +23,7 @@ For more details on Raft, see "In Search of an Understandable Consensus Algorith A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: -https://github.com/coreos/etcd/tree/master/contrib/raftexample +https://github.com/etcd-io/etcd/tree/master/contrib/raftexample Usage @@ -87,7 +87,7 @@ large). Note: Marshalling messages is not thread-safe; it is important that you make sure that no new entries are persisted while marshalling. -The easiest way to achieve this is to serialise the messages directly inside +The easiest way to achieve this is to serialize the messages directly inside your main raft loop. 3. Apply Snapshot (if any) and CommittedEntries to the state machine. @@ -153,7 +153,7 @@ If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout. -To add or remove node in a cluster, build ConfChange struct 'cc' and call: +To add or remove a node in a cluster, build ConfChange struct 'cc' and call: n.ProposeConfChange(ctx, cc) @@ -260,7 +260,7 @@ stale log entries: 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election protocol. When Config.PreVote is true, a pre-election is carried out first (using the same rules as a regular election), and no node increases its term - number unless the pre-election indicates that the campaigining node would win. + number unless the pre-election indicates that the campaigning node would win. This minimizes disruption when a partitioned node rejoins the cluster. 'MsgSnap' requests to install a snapshot message. When a node has just diff --git a/vendor/github.com/coreos/etcd/raft/log.go b/vendor/go.etcd.io/etcd/raft/log.go similarity index 93% rename from vendor/github.com/coreos/etcd/raft/log.go rename to vendor/go.etcd.io/etcd/raft/log.go index c3036d3c90d..03f83e61c42 100644 --- a/vendor/github.com/coreos/etcd/raft/log.go +++ b/vendor/go.etcd.io/etcd/raft/log.go @@ -18,7 +18,7 @@ import ( "fmt" "log" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) type raftLog struct { @@ -38,17 +38,29 @@ type raftLog struct { applied uint64 logger Logger + + // maxNextEntsSize is the maximum number aggregate byte size of the messages + // returned from calls to nextEnts. + maxNextEntsSize uint64 } -// newLog returns log using the given storage. It recovers the log to the state -// that it just commits and applies the latest snapshot. +// newLog returns log using the given storage and default options. It +// recovers the log to the state that it just commits and applies the +// latest snapshot. func newLog(storage Storage, logger Logger) *raftLog { + return newLogWithSize(storage, logger, noLimit) +} + +// newLogWithSize returns a log using the given storage and max +// message size. +func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + maxNextEntsSize: maxNextEntsSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -139,7 +151,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/vendor/github.com/coreos/etcd/raft/log_unstable.go b/vendor/go.etcd.io/etcd/raft/log_unstable.go similarity index 97% rename from vendor/github.com/coreos/etcd/raft/log_unstable.go rename to vendor/go.etcd.io/etcd/raft/log_unstable.go index 263af9ce405..1005bf65cc5 100644 --- a/vendor/github.com/coreos/etcd/raft/log_unstable.go +++ b/vendor/go.etcd.io/etcd/raft/log_unstable.go @@ -14,7 +14,7 @@ package raft -import pb "github.com/coreos/etcd/raft/raftpb" +import pb "go.etcd.io/etcd/raft/raftpb" // unstable.entries[i] has raft log position i+unstable.offset. // Note that unstable.offset may be less than the highest log @@ -147,7 +147,7 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { return u.entries[lo-u.offset : hi-u.offset] } -// u.offset <= lo <= hi <= u.offset+len(u.offset) +// u.offset <= lo <= hi <= u.offset+len(u.entries) func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi) diff --git a/vendor/github.com/coreos/etcd/raft/logger.go b/vendor/go.etcd.io/etcd/raft/logger.go similarity index 99% rename from vendor/github.com/coreos/etcd/raft/logger.go rename to vendor/go.etcd.io/etcd/raft/logger.go index 92e55b373e1..426a77d3445 100644 --- a/vendor/github.com/coreos/etcd/raft/logger.go +++ b/vendor/go.etcd.io/etcd/raft/logger.go @@ -114,7 +114,7 @@ func (l *DefaultLogger) Fatalf(format string, v ...interface{}) { } func (l *DefaultLogger) Panic(v ...interface{}) { - l.Logger.Panic(v) + l.Logger.Panic(v...) } func (l *DefaultLogger) Panicf(format string, v ...interface{}) { diff --git a/vendor/github.com/coreos/etcd/raft/node.go b/vendor/go.etcd.io/etcd/raft/node.go similarity index 83% rename from vendor/github.com/coreos/etcd/raft/node.go rename to vendor/go.etcd.io/etcd/raft/node.go index 33a9db84001..749db98758b 100644 --- a/vendor/github.com/coreos/etcd/raft/node.go +++ b/vendor/go.etcd.io/etcd/raft/node.go @@ -18,7 +18,7 @@ import ( "context" "errors" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) type SnapshotStatus int @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool { len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 } +// appliedCursor extracts from the Ready the highest index the client has +// applied (once the Ready is confirmed via Advance). If no information is +// contained in the Ready, returns zero. +func (rd Ready) appliedCursor() uint64 { + if n := len(rd.CommittedEntries); n > 0 { + return rd.CommittedEntries[n-1].Index + } + if index := rd.Snapshot.Metadata.Index; index > 0 { + return index + } + return 0 +} + // Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election @@ -116,7 +129,8 @@ type Node interface { Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error - // Propose proposes that data be appended to the log. + // Propose proposes that data be appended to the log. Note that proposals can be lost without + // notice, therefore it is user's job to ensure proposal retries. Propose(ctx context.Context, data []byte) error // ProposeConfChange proposes config change. // At most one ConfChange can be in the process of going through consensus. @@ -161,7 +175,16 @@ type Node interface { Status() Status // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) - // ReportSnapshot reports the status of the sent snapshot. + // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower + // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. + // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a + // snapshot (for e.g., while streaming it from leader to follower), should be reported to the + // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft + // log probes until the follower can apply the snapshot and advance its state. If the follower + // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any + // updates from the leader. Therefore, it is crucial that the application ensures that any + // failure in snapshot sending is caught and reported back to the leader; so it can resume raft + // log probing in the follower. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() @@ -224,9 +247,14 @@ func RestartNode(c *Config) Node { return &n } +type msgWithResult struct { + m pb.Message + result chan error +} + // node is the canonical implementation of the Node interface type node struct { - propc chan pb.Message + propc chan msgWithResult recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState @@ -242,7 +270,7 @@ type node struct { func newNode() node { return node{ - propc: make(chan pb.Message), + propc: make(chan msgWithResult), recvc: make(chan pb.Message), confc: make(chan pb.ConfChange), confstatec: make(chan pb.ConfState), @@ -271,12 +299,13 @@ func (n *node) Stop() { } func (n *node) run(r *raft) { - var propc chan pb.Message + var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var prevLastUnstablei, prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 + var applyingToI uint64 var rd Ready lead := None @@ -314,19 +343,25 @@ func (n *node) run(r *raft) { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. - case m := <-propc: + case pm := <-propc: + m := pm.m m.From = r.id - r.Step(m) + err := r.Step(m) + if pm.result != nil { + pm.result <- err + close(pm.result) + } case m := <-n.recvc: // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { - r.Step(m) // raft never returns an error + r.Step(m) } case cc := <-n.confc: if cc.NodeID == None { - r.resetPendingConf() select { - case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: + case n.confstatec <- pb.ConfState{ + Nodes: r.nodes(), + Learners: r.learnerNodes()}: case <-n.done: } break @@ -344,12 +379,13 @@ func (n *node) run(r *raft) { } r.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - r.resetPendingConf() default: panic("unexpected conf type") } select { - case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: + case n.confstatec <- pb.ConfState{ + Nodes: r.nodes(), + Learners: r.learnerNodes()}: case <-n.done: } case <-n.tickc: @@ -369,13 +405,18 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } + if index := rd.appliedCursor(); index != 0 { + applyingToI = index + } r.msgs = nil r.readStates = nil + r.reduceUncommittedSize(rd.CommittedEntries) advancec = n.advancec case <-advancec: - if prevHardSt.Commit != 0 { - r.raftLog.appliedTo(prevHardSt.Commit) + if applyingToI != 0 { + r.raftLog.appliedTo(applyingToI) + applyingToI = 0 } if havePrevLastUnstablei { r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) @@ -406,7 +447,7 @@ func (n *node) Tick() { func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } func (n *node) Propose(ctx context.Context, data []byte) error { - return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) + return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -426,22 +467,53 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) } +func (n *node) step(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, false) +} + +func (n *node) stepWait(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, true) +} + // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. -func (n *node) step(ctx context.Context, m pb.Message) error { - ch := n.recvc - if m.Type == pb.MsgProp { - ch = n.propc +func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { + if m.Type != pb.MsgProp { + select { + case n.recvc <- m: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + } + ch := n.propc + pm := msgWithResult{m: m} + if wait { + pm.result = make(chan error, 1) } - select { - case ch <- m: - return nil + case ch <- pm: + if !wait { + return nil + } + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + select { + case rsp := <-pm.result: + if rsp != nil { + return rsp + } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } + return nil } func (n *node) Ready() <-chan Ready { return n.readyc } @@ -523,7 +595,7 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if len(r.readStates) != 0 { rd.ReadStates = r.readStates } - rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries)) + rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) return rd } diff --git a/vendor/github.com/coreos/etcd/raft/progress.go b/vendor/go.etcd.io/etcd/raft/progress.go similarity index 100% rename from vendor/github.com/coreos/etcd/raft/progress.go rename to vendor/go.etcd.io/etcd/raft/progress.go diff --git a/vendor/github.com/coreos/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go similarity index 80% rename from vendor/github.com/coreos/etcd/raft/raft.go rename to vendor/go.etcd.io/etcd/raft/raft.go index b4c0f0248ca..e1e6a16d4a0 100644 --- a/vendor/github.com/coreos/etcd/raft/raft.go +++ b/vendor/go.etcd.io/etcd/raft/raft.go @@ -25,7 +25,7 @@ import ( "sync" "time" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) // None is a placeholder node ID used when there is no leader. @@ -67,9 +67,13 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +// ErrProposalDropped is returned when the proposal is ignored by some cases, +// so that the proposer can be notified and fail fast. +var ErrProposalDropped = errors.New("raft proposal dropped") + // lockedRand is a small wrapper around rand.Rand to provide -// synchronization. Only the methods needed by the code are exposed -// (e.g. Intn). +// synchronization among multiple raft groups. Only the methods needed +// by the code are exposed (e.g. Intn). type lockedRand struct { mu sync.Mutex rand *rand.Rand @@ -116,8 +120,9 @@ type Config struct { // used for testing right now. peers []uint64 - // learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster. - // learners only receives entries from the leader node. It does not vote or promote itself. + // learners contains the IDs of all learner nodes (including self if the + // local node is a learner) in the raft cluster. learners only receives + // entries from the leader node. It does not vote or promote itself. learners []uint64 // ElectionTick is the number of Node.Tick invocations that must pass between @@ -143,12 +148,20 @@ type Config struct { // applied entries. This is a very application dependent configuration. Applied uint64 - // MaxSizePerMsg limits the max size of each append message. Smaller value - // lowers the raft recovery cost(initial probing and message lost during normal - // operation). On the other side, it might affect the throughput during normal - // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per - // message. + // MaxSizePerMsg limits the max byte size of each append message. Smaller + // value lowers the raft recovery cost(initial probing and message lost + // during normal operation). On the other side, it might affect the + // throughput during normal replication. Note: math.MaxUint64 for unlimited, + // 0 for at most one entry per message. MaxSizePerMsg uint64 + // MaxCommittedSizePerReady limits the size of the committed entries which + // can be applied. + MaxCommittedSizePerReady uint64 + // MaxUncommittedEntriesSize limits the aggregate byte size of the + // uncommitted entries that may be appended to a leader's log. Once this + // limit is exceeded, proposals will begin to return ErrProposalDropped + // errors. Note: 0 for no limit. + MaxUncommittedEntriesSize uint64 // MaxInflightMsgs limits the max number of in-flight append messages during // optimistic replication phase. The application transportation layer usually // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid @@ -187,7 +200,7 @@ type Config struct { // this feature would be in a situation where the Raft leader is used to // compute the data of a proposal, for example, adding a timestamp from a // hybrid logical clock to data in a monotonically increasing way. Forwarding - // should be disabled to prevent a follower with an innaccurate hybrid + // should be disabled to prevent a follower with an inaccurate hybrid // logical clock from assigning the timestamp and then forwarding the data // to the leader. DisableProposalForwarding bool @@ -210,6 +223,16 @@ func (c *Config) validate() error { return errors.New("storage cannot be nil") } + if c.MaxUncommittedEntriesSize == 0 { + c.MaxUncommittedEntriesSize = noLimit + } + + // default MaxCommittedSizePerReady to MaxSizePerMsg because they were + // previously the same parameter. + if c.MaxCommittedSizePerReady == 0 { + c.MaxCommittedSizePerReady = c.MaxSizePerMsg + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -236,10 +259,12 @@ type raft struct { // the log raftLog *raftLog - maxInflight int - maxMsgSize uint64 - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress + maxMsgSize uint64 + maxUncommittedSize uint64 + maxInflight int + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + matchBuf uint64Slice state StateType @@ -255,8 +280,17 @@ type raft struct { // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 - // New configuration is ignored if there exists unapplied configuration. - pendingConf bool + // Only one conf change may be pending (in the log, but not yet + // applied) at a time. This is enforced via pendingConfIndex, which + // is set to a value >= the log index of the latest pending + // configuration change (if any). Config changes are only allowed to + // be proposed if the leader's applied index is greater than this + // value. + pendingConfIndex uint64 + // an estimate of the size of the uncommitted tail of the Raft log. Used to + // prevent unbounded log growth. Only maintained by the leader. Reset on + // term changes. + uncommittedSize uint64 readOnly *readOnly @@ -291,7 +325,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLog(c.Storage, c.Logger) + raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) @@ -315,6 +349,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, + maxUncommittedSize: c.MaxUncommittedEntriesSize, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, @@ -371,10 +406,16 @@ func (r *raft) hardState() pb.HardState { func (r *raft) quorum() int { return len(r.prs)/2 + 1 } func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs)) + nodes := make([]uint64, 0, len(r.prs)) for id := range r.prs { nodes = append(nodes, id) } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +func (r *raft) learnerNodes() []uint64 { + nodes := make([]uint64, 0, len(r.learnerPrs)) for id := range r.learnerPrs { nodes = append(nodes, id) } @@ -424,22 +465,35 @@ func (r *raft) getProgress(id uint64) *Progress { return r.learnerPrs[id] } -// sendAppend sends RPC, with entries to the given peer. +// sendAppend sends an append RPC with new entries (if any) and the +// current commit index to the given peer. func (r *raft) sendAppend(to uint64) { + r.maybeSendAppend(to, true) +} + +// maybeSendAppend sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.getProgress(to) if pr.IsPaused() { - return + return false } m := pb.Message{} m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + if len(ents) == 0 && !sendIfEmpty { + return false + } if errt != nil || erre != nil { // send snapshot if we failed to get term or entries if !pr.RecentActive { r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return + return false } m.Type = pb.MsgSnap @@ -447,7 +501,7 @@ func (r *raft) sendAppend(to uint64) { if err != nil { if err == ErrSnapshotTemporarilyUnavailable { r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return + return false } panic(err) // TODO(bdarnell) } @@ -481,9 +535,10 @@ func (r *raft) sendAppend(to uint64) { } } r.send(m) + return true } -// sendHeartbeat sends an empty MsgApp +// sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, @@ -547,13 +602,19 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { // the commit index changed (in which case the caller should call // r.bcastAppend). func (r *raft) maybeCommit() bool { - // TODO(bmizerany): optimize.. Currently naive - mis := make(uint64Slice, 0, len(r.prs)) + // Preserving matchBuf across calls is an optimization + // used to avoid allocating a new slice on each call. + if cap(r.matchBuf) < len(r.prs) { + r.matchBuf = make(uint64Slice, len(r.prs)) + } + mis := r.matchBuf[:len(r.prs)] + idx := 0 for _, p := range r.prs { - mis = append(mis, p.Match) + mis[idx] = p.Match + idx++ } - sort.Sort(sort.Reverse(mis)) - mci := mis[r.quorum()-1] + sort.Sort(mis) + mci := mis[len(mis)-r.quorum()] return r.raftLog.maybeCommit(mci, r.Term) } @@ -578,20 +639,32 @@ func (r *raft) reset(term uint64) { } }) - r.pendingConf = false + r.pendingConfIndex = 0 + r.uncommittedSize = 0 r.readOnly = newReadOnly(r.readOnly.option) } -func (r *raft) appendEntry(es ...pb.Entry) { +func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { li := r.raftLog.lastIndex() for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } - r.raftLog.append(es...) - r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex()) + // Track the size of this uncommitted proposal. + if !r.increaseUncommittedSize(es) { + r.logger.Debugf( + "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", + r.id, + ) + // Drop the proposal. + return false + } + // use latest "last" index after truncate/append + li = r.raftLog.append(es...) + r.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() + return true } // tickElection is run by followers and candidates after r.electionTimeout. @@ -663,6 +736,7 @@ func (r *raft) becomePreCandidate() { r.step = stepCandidate r.votes = make(map[uint64]bool) r.tick = r.tickElection + r.lead = None r.state = StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) } @@ -677,20 +751,29 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader - ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) - if err != nil { - r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) - } - - nconf := numOfPendingConf(ents) - if nconf > 1 { - panic("unexpected multiple uncommitted config entry") - } - if nconf == 1 { - r.pendingConf = true - } - - r.appendEntry(pb.Entry{Data: nil}) + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + r.prs[r.id].becomeReplicate() + + // Conservatively set the pendingConfIndex to the last index in the + // log. There may or may not be a pending config change, but it's + // safe to delay any future proposals until we commit all our + // pending log entries, and scanning the entire tail of the log + // could be expensive. + r.pendingConfIndex = r.raftLog.lastIndex() + + emptyEnt := pb.Entry{Data: nil} + if !r.appendEntry(emptyEnt) { + // This won't happen because we just called reset() above. + r.logger.Panic("empty entry was dropped") + } + // As a special case, don't count the initial empty entry towards the + // uncommitted log quota. This is because we want to preserve the + // behavior of allowing one entry larger than quota if the current + // usage is zero. + r.reduceUncommittedSize([]pb.Entry{emptyEnt}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -786,7 +869,7 @@ func (r *raft) Step(m pb.Message) error { } case m.Term < r.Term: - if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { + if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { // We have received messages from a leader at a lower term. It is possible // that these messages were simply delayed in the network, but this could // also mean that this node has advanced its term number during a network @@ -799,8 +882,23 @@ func (r *raft) Step(m pb.Message) error { // nodes that have been removed from the cluster's configuration: a // removed node will send MsgVotes (or MsgPreVotes) which will be ignored, // but it will not receive MsgApp or MsgHeartbeat, so it will not create - // disruptive term increases + // disruptive term increases, by notifying leader of this node's activeness. + // The above comments also true for Pre-Vote + // + // When follower gets isolated, it soon starts an election ending + // up with a higher term than leader, although it won't receive enough + // votes to win the election. When it regains connectivity, this response + // with "pb.MsgAppResp" of higher term would force leader to step down. + // However, this disruption is inevitable to free this stuck node with + // fresh election. This can be prevented with Pre-Vote phase. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) + } else if m.Type == pb.MsgPreVote { + // Before Pre-Vote enable, there may have candidate with higher term, + // but less log. After update to Pre-Vote, the cluster may deadlock if + // we drop messages with a lower term. + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) } else { // ignore other cases r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", @@ -838,9 +936,14 @@ func (r *raft) Step(m pb.Message) error { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) return nil } - // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should - // always equal r.Term. - if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { + // We can vote if this is a repeat of a vote we've already cast... + canVote := r.Vote == m.From || + // ...we haven't voted and we don't think there's a leader yet in this term... + (r.Vote == None && r.lead == None) || + // ...or this is a PreVote for a future term... + (m.Type == pb.MsgPreVote && m.Term > r.Term) + // ...and we believe the candidate is up to date. + if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) // When responding to Msg{Pre,}Vote messages we include the term @@ -865,25 +968,28 @@ func (r *raft) Step(m pb.Message) error { } default: - r.step(r, m) + err := r.step(r, m) + if err != nil { + return err + } } return nil } -type stepFunc func(r *raft, m pb.Message) +type stepFunc func(r *raft, m pb.Message) error -func stepLeader(r *raft, m pb.Message) { +func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() - return + return nil case pb.MsgCheckQuorum: if !r.checkQuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } - return + return nil case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) @@ -892,30 +998,35 @@ func stepLeader(r *raft, m pb.Message) { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. - return + return ErrProposalDropped } if r.leadTransferee != None { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) - return + return ErrProposalDropped } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { - if r.pendingConf { - r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String()) + if r.pendingConfIndex > r.raftLog.applied { + r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", + e.String(), r.pendingConfIndex, r.raftLog.applied) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } else { + r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 } - r.pendingConf = true } } - r.appendEntry(m.Entries...) + + if !r.appendEntry(m.Entries...) { + return ErrProposalDropped + } r.bcastAppend() - return + return nil case pb.MsgReadIndex: if r.quorum() > 1 { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. - return + return nil } // thinking: use an interally defined context instead of the user given context. @@ -937,14 +1048,14 @@ func stepLeader(r *raft, m pb.Message) { r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } - return + return nil } // All other message types require a progress for m.From (pr). pr := r.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) - return + return nil } switch m.Type { case pb.MsgAppResp: @@ -968,7 +1079,13 @@ func stepLeader(r *raft, m pb.Message) { pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + // Transition back to replicating state via probing state + // (which takes the snapshot into account). If we didn't + // move to replicating state, that would only happen with + // the next round of appends (but there may not be a next + // round for a while, exposing an inconsistent RaftStatus). pr.becomeProbe() + pr.becomeReplicate() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) } @@ -976,10 +1093,18 @@ func stepLeader(r *raft, m pb.Message) { if r.maybeCommit() { r.bcastAppend() } else if oldPaused { - // update() reset the wait state on this node. If we had delayed sending - // an update before, send it now. + // If we were paused before, this node may be missing the + // latest commit index, so send it. r.sendAppend(m.From) } + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + for r.maybeSendAppend(m.From, false) { + } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) @@ -1000,12 +1125,12 @@ func stepLeader(r *raft, m pb.Message) { } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return + return nil } ackCount := r.readOnly.recvAck(m) if ackCount < r.quorum() { - return + return nil } rss := r.readOnly.advance(m) @@ -1019,7 +1144,7 @@ func stepLeader(r *raft, m pb.Message) { } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { - return + return nil } if !m.Reject { pr.becomeProbe() @@ -1043,7 +1168,7 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgTransferLeader: if pr.IsLearner { r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) - return + return nil } leadTransferee := m.From lastLeadTransferee := r.leadTransferee @@ -1051,14 +1176,14 @@ func stepLeader(r *raft, m pb.Message) { if lastLeadTransferee == leadTransferee { r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", r.id, r.Term, leadTransferee, leadTransferee) - return + return nil } r.abortLeaderTransfer() r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) } if leadTransferee == r.id { r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) - return + return nil } // Transfer leadership to third party. r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) @@ -1072,11 +1197,12 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(leadTransferee) } } + return nil } // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. -func stepCandidate(r *raft, m pb.Message) { +func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). @@ -1089,15 +1215,15 @@ func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped case pb.MsgApp: - r.becomeFollower(r.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleAppendEntries(m) case pb.MsgHeartbeat: - r.becomeFollower(r.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) case pb.MsgSnap: - r.becomeFollower(m.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: gr := r.poll(m.From, m.Type, !m.Reject) @@ -1111,22 +1237,25 @@ func stepCandidate(r *raft, m pb.Message) { r.bcastAppend() } case len(r.votes) - gr: + // pb.MsgPreVoteResp contains future term of pre-candidate + // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) } case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } + return nil } -func stepFollower(r *raft, m pb.Message) { +func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped } else if r.disableProposalForwarding { r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) - return + return ErrProposalDropped } m.To = r.lead r.send(m) @@ -1145,7 +1274,7 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgTransferLeader: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) @@ -1162,17 +1291,18 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) case pb.MsgReadIndexResp: if len(m.Entries) != 1 { r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return + return nil } r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } + return nil } func (r *raft) handleAppendEntries(m pb.Message) { @@ -1270,14 +1400,13 @@ func (r *raft) addLearner(id uint64) { } func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - r.pendingConf = false pr := r.getProgress(id) if pr == nil { r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) } else { if isLearner && !pr.IsLearner { // can only change Learner to Voter - r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id) + r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) return } @@ -1306,7 +1435,6 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { func (r *raft) removeNode(id uint64) { r.delProgress(id) - r.pendingConf = false // do not try to commit or abort transferring if there is no nodes in the cluster. if len(r.prs) == 0 && len(r.learnerPrs) == 0 { @@ -1324,8 +1452,6 @@ func (r *raft) removeNode(id uint64) { } } -func (r *raft) resetPendingConf() { r.pendingConf = false } - func (r *raft) setProgress(id, match, next uint64, isLearner bool) { if !isLearner { delete(r.learnerPrs, id) @@ -1395,6 +1521,49 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } +// increaseUncommittedSize computes the size of the proposed entries and +// determines whether they would push leader over its maxUncommittedSize limit. +// If the new entries would exceed the limit, the method returns false. If not, +// the increase in uncommitted entry size is recorded and the method returns +// true. +func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { + var s uint64 + for _, e := range ents { + s += uint64(PayloadSize(e)) + } + + if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { + // If the uncommitted tail of the Raft log is empty, allow any size + // proposal. Otherwise, limit the size of the uncommitted tail of the + // log and drop any proposal that would push the size over the limit. + return false + } + r.uncommittedSize += s + return true +} + +// reduceUncommittedSize accounts for the newly committed entries by decreasing +// the uncommitted entry size limit. +func (r *raft) reduceUncommittedSize(ents []pb.Entry) { + if r.uncommittedSize == 0 { + // Fast-path for followers, who do not track or enforce the limit. + return + } + + var s uint64 + for _, e := range ents { + s += uint64(PayloadSize(e)) + } + if s > r.uncommittedSize { + // uncommittedSize may underestimate the size of the uncommitted Raft + // log tail but will never overestimate it. Saturate at 0 instead of + // allowing overflow. + r.uncommittedSize = 0 + } else { + r.uncommittedSize -= s + } +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go similarity index 100% rename from vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go rename to vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto similarity index 100% rename from vendor/github.com/coreos/etcd/raft/raftpb/raft.proto rename to vendor/go.etcd.io/etcd/raft/raftpb/raft.proto diff --git a/vendor/github.com/coreos/etcd/raft/rawnode.go b/vendor/go.etcd.io/etcd/raft/rawnode.go similarity index 81% rename from vendor/github.com/coreos/etcd/raft/rawnode.go rename to vendor/go.etcd.io/etcd/raft/rawnode.go index 925cb851c4a..d7a272d1435 100644 --- a/vendor/github.com/coreos/etcd/raft/rawnode.go +++ b/vendor/go.etcd.io/etcd/raft/rawnode.go @@ -17,7 +17,7 @@ package raft import ( "errors" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) // ErrStepLocalMsg is returned when try to step a local raft message @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if rn.prevHardSt.Commit != 0 { - // In most cases, prevHardSt and rd.HardState will be the same - // because when there are new entries to apply we just sent a - // HardState with an updated Commit value. However, on initial - // startup the two are different because we don't send a HardState - // until something changes, but we do send any un-applied but - // committed entries (and previously-committed entries may be - // incorporated into the snapshot, even if rd.CommittedEntries is - // empty). Therefore we mark all committed entries as applied - // whether they were included in rd.HardState or not. - rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit) + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) } + if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] rn.raft.raftLog.stableTo(e.Index, e.Term) @@ -169,8 +166,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - rn.raft.resetPendingConf() - return &pb.ConfState{Nodes: rn.raft.nodes()} + return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} } switch cc.Type { case pb.ConfChangeAddNode: @@ -180,11 +176,10 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { case pb.ConfChangeRemoveNode: rn.raft.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - rn.raft.resetPendingConf() default: panic("unexpected conf type") } - return &pb.ConfState{Nodes: rn.raft.nodes()} + return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} } // Step advances the state machine using the given message. @@ -203,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error { func (rn *RawNode) Ready() Ready { rd := rn.newReady() rn.raft.msgs = nil + rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } @@ -240,6 +236,39 @@ func (rn *RawNode) Status() *Status { return &status } +// StatusWithoutProgress returns a Status without populating the Progress field +// (and returns the Status as a value to avoid forcing it onto the heap). This +// is more performant if the Progress is not required. See WithProgress for an +// allocation-free way to introspect the Progress. +func (rn *RawNode) StatusWithoutProgress() Status { + return getStatusWithoutProgress(rn.raft) +} + +// ProgressType indicates the type of replica a Progress corresponds to. +type ProgressType byte + +const ( + // ProgressTypePeer accompanies a Progress for a regular peer replica. + ProgressTypePeer ProgressType = iota + // ProgressTypeLearner accompanies a Progress for a learner replica. + ProgressTypeLearner +) + +// WithProgress is a helper to introspect the Progress for this node and its +// peers. +func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { + for id, pr := range rn.raft.prs { + pr := *pr + pr.ins = nil + visitor(id, ProgressTypePeer, pr) + } + for id, pr := range rn.raft.learnerPrs { + pr := *pr + pr.ins = nil + visitor(id, ProgressTypeLearner, pr) + } +} + // ReportUnreachable reports the given node is not reachable for the last send. func (rn *RawNode) ReportUnreachable(id uint64) { _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id}) diff --git a/vendor/github.com/coreos/etcd/raft/read_only.go b/vendor/go.etcd.io/etcd/raft/read_only.go similarity index 98% rename from vendor/github.com/coreos/etcd/raft/read_only.go rename to vendor/go.etcd.io/etcd/raft/read_only.go index ae746fa73eb..aecc6b291a7 100644 --- a/vendor/github.com/coreos/etcd/raft/read_only.go +++ b/vendor/go.etcd.io/etcd/raft/read_only.go @@ -14,7 +14,7 @@ package raft -import pb "github.com/coreos/etcd/raft/raftpb" +import pb "go.etcd.io/etcd/raft/raftpb" // ReadState provides state for read only query. // It's caller's responsibility to call ReadIndex first before getting diff --git a/vendor/github.com/coreos/etcd/raft/status.go b/vendor/go.etcd.io/etcd/raft/status.go similarity index 84% rename from vendor/github.com/coreos/etcd/raft/status.go rename to vendor/go.etcd.io/etcd/raft/status.go index f4d3d86a4e3..9feca7c03b8 100644 --- a/vendor/github.com/coreos/etcd/raft/status.go +++ b/vendor/go.etcd.io/etcd/raft/status.go @@ -17,7 +17,7 @@ package raft import ( "fmt" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) type Status struct { @@ -32,29 +32,35 @@ type Status struct { LeadTransferee uint64 } -// getStatus gets a copy of the current raft status. -func getStatus(r *raft) Status { +func getProgressCopy(r *raft) map[uint64]Progress { + prs := make(map[uint64]Progress) + for id, p := range r.prs { + prs[id] = *p + } + + for id, p := range r.learnerPrs { + prs[id] = *p + } + return prs +} + +func getStatusWithoutProgress(r *raft) Status { s := Status{ ID: r.id, LeadTransferee: r.leadTransferee, } - s.HardState = r.hardState() s.SoftState = *r.softState() - s.Applied = r.raftLog.applied + return s +} +// getStatus gets a copy of the current raft status. +func getStatus(r *raft) Status { + s := getStatusWithoutProgress(r) if s.RaftState == StateLeader { - s.Progress = make(map[uint64]Progress) - for id, p := range r.prs { - s.Progress[id] = *p - } - - for id, p := range r.learnerPrs { - s.Progress[id] = *p - } + s.Progress = getProgressCopy(r) } - return s } diff --git a/vendor/github.com/coreos/etcd/raft/storage.go b/vendor/go.etcd.io/etcd/raft/storage.go similarity index 99% rename from vendor/github.com/coreos/etcd/raft/storage.go rename to vendor/go.etcd.io/etcd/raft/storage.go index 69c3a7d9033..14ad6860831 100644 --- a/vendor/github.com/coreos/etcd/raft/storage.go +++ b/vendor/go.etcd.io/etcd/raft/storage.go @@ -18,7 +18,7 @@ import ( "errors" "sync" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) // ErrCompacted is returned by Storage.Entries/Compact when a requested diff --git a/vendor/github.com/coreos/etcd/raft/util.go b/vendor/go.etcd.io/etcd/raft/util.go similarity index 86% rename from vendor/github.com/coreos/etcd/raft/util.go rename to vendor/go.etcd.io/etcd/raft/util.go index f4141fe65dd..c145d26dd7f 100644 --- a/vendor/github.com/coreos/etcd/raft/util.go +++ b/vendor/go.etcd.io/etcd/raft/util.go @@ -18,7 +18,7 @@ import ( "bytes" "fmt" - pb "github.com/coreos/etcd/raft/raftpb" + pb "go.etcd.io/etcd/raft/raftpb" ) func (st StateType) MarshalJSON() ([]byte, error) { @@ -77,10 +77,7 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { var buf bytes.Buffer fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index) if m.Reject { - fmt.Fprintf(&buf, " Rejected") - if m.RejectHint != 0 { - fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint) - } + fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint) } if m.Commit != 0 { fmt.Fprintf(&buf, " Commit:%d", m.Commit) @@ -101,6 +98,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { return buf.String() } +// PayloadSize is the size of the payload of this Entry. Notably, it does not +// depend on its Index or Term. +func PayloadSize(e pb.Entry) int { + return len(e.Data) +} + // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string { @@ -113,6 +116,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string { return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) } +// DescribeEntries calls DescribeEntry for each Entry, adding a newline to +// each. +func DescribeEntries(ents []pb.Entry, f EntryFormatter) string { + var buf bytes.Buffer + for _, e := range ents { + _, _ = buf.WriteString(DescribeEntry(e, f) + "\n") + } + return buf.String() +} + func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { if len(ents) == 0 { return ents diff --git a/vendor/vendor.json b/vendor/vendor.json index 9d1c7e5b589..920134b34e9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -398,22 +398,6 @@ "revision": "3a0bb77429bd3a61596f5e8a3172445844342120", "revisionTime": "2016-10-10T02:54:55Z" }, - { - "checksumSHA1": "4VUg2Be1lkd0wm8iVTkoMTa58Ow=", - "path": "github.com/coreos/etcd/raft", - "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", - "revisionTime": "2018-10-10T17:17:54Z", - "version": "v3.3.10", - "versionExact": "v3.3.10" - }, - { - "checksumSHA1": "cwEnAGl7uzwDepjDZcIocMVEVEE=", - "path": "github.com/coreos/etcd/raft/raftpb", - "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", - "revisionTime": "2018-10-10T17:17:54Z", - "version": "v3.3.10", - "versionExact": "v3.3.10" - }, { "checksumSHA1": "Lf3uUXTkKK5DJ37BxQvxO1Fq+K8=", "path": "github.com/davecgh/go-spew/spew", @@ -593,8 +577,8 @@ { "checksumSHA1": "Y2MOwzNZfl4NRNDbLCZa6sgx7O0=", "path": "github.com/golang/protobuf/proto", - "revision": "c823c79ea1570fb5ff454033735a8e68575d1d0f", - "revisionTime": "2019-02-05T22:20:52Z" + "revision": "b5d812f8a3706043e23a9cd5babf2e5423744d30", + "revisionTime": "2019-02-28T15:19:29Z" }, { "checksumSHA1": "z4copNgeTN77OymdDKqLaIK/vSI=", @@ -908,6 +892,22 @@ "revision": "71fa2377963fc761fc6556dd5895dff5816d4e8c", "revisionTime": "2018-10-14T16:12:41Z" }, + { + "checksumSHA1": "G5GOx28aUKrnslEBisXvZtlQdmE=", + "path": "go.etcd.io/etcd/raft", + "revision": "a943ad0ee4c98d83f2d5115f16ca0de321246ddc", + "revisionTime": "2019-02-28T17:01:38Z", + "version": "master", + "versionExact": "master" + }, + { + "checksumSHA1": "0ZI+YcuXXszApIhI+HcsmIcSVgg=", + "path": "go.etcd.io/etcd/raft/raftpb", + "revision": "a943ad0ee4c98d83f2d5115f16ca0de321246ddc", + "revisionTime": "2019-02-28T17:01:38Z", + "version": "master", + "versionExact": "master" + }, { "checksumSHA1": "KuIPOjsnzI5EZHQjUFK3sTiLCTM=", "path": "go.opencensus.io", diff --git a/worker/draft.go b/worker/draft.go index e72c5e2a902..0bfed7a2455 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -27,9 +27,9 @@ import ( "sync/atomic" "time" - "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" humanize "github.com/dustin/go-humanize" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" ostats "go.opencensus.io/stats" "go.opencensus.io/tag" diff --git a/worker/draft_test.go b/worker/draft_test.go index 042ca08ce4e..59669961dc6 100644 --- a/worker/draft_test.go +++ b/worker/draft_test.go @@ -21,13 +21,13 @@ import ( "os" "testing" - "github.com/coreos/etcd/raft/raftpb" "github.com/dgraph-io/badger" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/raftpb" ) func openBadger(dir string) (*badger.DB, error) { diff --git a/worker/snapshot.go b/worker/snapshot.go index 6eebdf92b9e..3cbbdd430f9 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -19,9 +19,9 @@ package worker import ( "sync/atomic" - "github.com/coreos/etcd/raft" bpb "github.com/dgraph-io/badger/pb" "github.com/golang/glog" + "go.etcd.io/etcd/raft" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/posting" From 49ee4a695d21dd8f08f43908b60f08d3daf65d75 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 17:15:40 -0800 Subject: [PATCH 02/11] revert change in assign.go --- contrib/blockade/main.go | 56 ++++++++++++++++++++------------------- dgraph/cmd/zero/assign.go | 6 ++--- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index 98bac2d0e82..1381e027a6e 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -145,35 +145,37 @@ func runTests() error { } // Setting flaky --all just does not converge. Too many network interruptions. - if err := testCommon("blockade flaky", "blockade fast --all", 3); err != nil { - fmt.Printf("Error testFlaky: %v\n", err) - return err - } - fmt.Println("===> Flaky TEST: OK") - - if err := testCommon("blockade slow", "blockade fast --all", 3); err != nil { - fmt.Printf("Error testSlow: %v\n", err) - return err - } - fmt.Println("===> Slow TEST: OK") - - if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { - fmt.Printf("Error testRestart with stop: %v\n", err) - return err - } - fmt.Println("===> Restart TEST1: OK") + // if err := testCommon("blockade flaky", "blockade fast --all", 3); err != nil { + // fmt.Printf("Error testFlaky: %v\n", err) + // return err + // } + // fmt.Println("===> Flaky TEST: OK") + + // if err := testCommon("blockade slow", "blockade fast --all", 3); err != nil { + // fmt.Printf("Error testSlow: %v\n", err) + // return err + // } + // fmt.Println("===> Slow TEST: OK") + + // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { + // fmt.Printf("Error testRestart with stop: %v\n", err) + // return err + // } + // fmt.Println("===> Restart TEST1: OK") + + // if err := testCommon("blockade restart", "", 3); err != nil { + // fmt.Printf("Error testRestart with restart: %v\n", err) + // return err + // } + // fmt.Println("===> Restart TEST2: OK") - if err := testCommon("blockade restart", "", 3); err != nil { - fmt.Printf("Error testRestart with restart: %v\n", err) - return err - } - fmt.Println("===> Restart TEST2: OK") - - if err := testCommon("blockade partition", "blockade join", 2); err != nil { - fmt.Printf("Error testPartitions: %v\n", err) - return err + for { + if err := testCommon("blockade partition", "blockade join", 2); err != nil { + fmt.Printf("Error testPartitions: %v\n", err) + return err + } + fmt.Println("===> Partition TEST: OK") } - fmt.Println("===> Partition TEST: OK") return nil } diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 03a85703d0f..7e0d66784d3 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -66,9 +66,9 @@ func (s *Server) lease(ctx context.Context, num *pb.Num, txn bool) (*pb.Assigned // TODO: Fix when we move to linearizable reads, need to check if we are the leader, might be // based on leader leases. If this node gets partitioned and unless checkquorum is enabled, this // node would still think that it's the leader. - // if !node.AmLeader() { - // return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.") - // } + if !node.AmLeader() { + return &emptyAssignedIds, x.Errorf("Assigning IDs is only allowed on leader.") + } if num.Val == 0 && !num.ReadOnly { return &emptyAssignedIds, x.Errorf("Nothing to be leased") From a7f269fee1b21129784c033c864b8bcb3174e0eb Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 1 Mar 2019 17:08:37 -0800 Subject: [PATCH 03/11] Stop if compilation fails (#3082) --- dgraph/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dgraph/run.sh b/dgraph/run.sh index 6059b9abadc..e997eee1b5c 100755 --- a/dgraph/run.sh +++ b/dgraph/run.sh @@ -82,6 +82,6 @@ if [ "${#SERVICES[@]}" -gt 0 ]; then fi fi -make install -docker-compose ${COMPOSE_FILES[@]} down +make install &&\ +docker-compose ${COMPOSE_FILES[@]} down &&\ DATA=$DATA docker-compose ${COMPOSE_FILES[@]} up --force-recreate --remove-orphans ${SERVICES[@]} From ff0ef92fe5ad05815f83fd6c3c843c66bf70c66c Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 18:36:20 -0800 Subject: [PATCH 04/11] Remove network tests from blockade, so it can run faster. --- contrib/blockade/main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index 1381e027a6e..b4defe603fc 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -157,6 +157,7 @@ func runTests() error { // } // fmt.Println("===> Slow TEST: OK") +<<<<<<< HEAD // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { // fmt.Printf("Error testRestart with stop: %v\n", err) // return err @@ -168,6 +169,13 @@ func runTests() error { // return err // } // fmt.Println("===> Restart TEST2: OK") +======= + if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { + fmt.Printf("Error testRestart with stop: %v\n", err) + return err + } + fmt.Println("===> Restart TEST1: OK") +>>>>>>> Remove network tests from blockade, so it can run faster. for { if err := testCommon("blockade partition", "blockade join", 2); err != nil { From ec3f8375ff6eb899f7ac41f7e4213530ae66b1d9 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 18:37:35 -0800 Subject: [PATCH 05/11] Revert "Prevent alphas from asking zero to serve tablets during queries. (#3055)" This reverts commit 1bde8d55260df221ec1578865f8f217621501c88. --- query/query.go | 5 +---- query/query0_test.go | 2 +- query/recurse.go | 11 ----------- query/shortest.go | 4 ---- worker/groups.go | 15 --------------- worker/mutation.go | 4 ++-- worker/schema.go | 2 +- worker/sort.go | 6 +----- worker/task.go | 26 ++++++++------------------ 9 files changed, 14 insertions(+), 61 deletions(-) diff --git a/query/query.go b/query/query.go index 71a9c942b64..0f7361b6582 100644 --- a/query/query.go +++ b/query/query.go @@ -170,7 +170,6 @@ type Function struct { type SubGraph struct { ReadTs uint64 Attr string - UnknownAttr bool Params params counts []uint32 valueMatrix []*pb.ValueList @@ -2026,9 +2025,7 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) { return } result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery) - if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) { - sg.UnknownAttr = true - } else if err != nil { + if err != nil { rch <- err return } diff --git a/query/query0_test.go b/query/query0_test.go index a6bd3f1daaf..a42bc576e87 100644 --- a/query/query0_test.go +++ b/query/query0_test.go @@ -944,7 +944,7 @@ func TestQueryVarValOrderError(t *testing.T) { ` _, err := processQuery(t, context.Background(), query) require.Error(t, err) - require.Contains(t, err.Error(), "Cannot sort by unknown attribute n") + require.Contains(t, err.Error(), "Cannot sort attribute n of type object.") } func TestQueryVarValOrderDesc(t *testing.T) { diff --git a/query/recurse.go b/query/recurse.go index cd4861c3bd2..a06b6bb3557 100644 --- a/query/recurse.go +++ b/query/recurse.go @@ -51,10 +51,6 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error return ctx.Err() } - if start.UnknownAttr { - return nil - } - // Add children back and expand if necessary if exec, err = expandChildren(ctx, start, startChildren); err != nil { return err @@ -94,10 +90,6 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error } for _, sg := range exec { - if sg.UnknownAttr { - continue - } - if len(sg.Filters) > 0 { // We need to do this in case we had some filters. sg.updateUidMatrix() @@ -134,9 +126,6 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error var out []*SubGraph var exp []*SubGraph for _, sg := range exec { - if sg.UnknownAttr == true { - continue - } if len(sg.DestUIDs.Uids) == 0 { continue } diff --git a/query/shortest.go b/query/shortest.go index 7f5542759c2..a0a78300246 100644 --- a/query/shortest.go +++ b/query/shortest.go @@ -176,10 +176,6 @@ func (sg *SubGraph) expandOut(ctx context.Context, rch <- ctx.Err() return default: - if subgraph.UnknownAttr { - continue - } - // Send the destuids in res chan. for mIdx, fromUID := range subgraph.SrcUIDs.Uids { for lIdx, toUID := range subgraph.uidMatrix[mIdx].Uids { diff --git a/worker/groups.go b/worker/groups.go index d7e5c3aae56..982ea44af42 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -322,15 +322,6 @@ func (g *groupi) BelongsTo(key string) uint32 { return 0 } -// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve -// the tablet for key if no group is currently serving it. -func (g *groupi) BelongsToReadOnly(key string) uint32 { - g.RLock() - defer g.RUnlock() - tablet := g.tablets[key] - return tablet.GetGroupId() -} - func (g *groupi) ServesTablet(key string) bool { tablet := g.Tablet(key) if tablet != nil && tablet.GroupId == groups().groupId() { @@ -339,12 +330,6 @@ func (g *groupi) ServesTablet(key string) bool { return false } -// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to -// serve the tablet for key if no group is currently serving it. -func (g *groupi) ServesTabletReadOnly(key string) bool { - return g.BelongsToReadOnly(key) == groups().groupId() -} - // Do not modify the returned Tablet // TODO: This should return error. func (g *groupi) Tablet(key string) *pb.Tablet { diff --git a/worker/mutation.go b/worker/mutation.go index b78135006b6..fc55cb75262 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -39,8 +39,8 @@ import ( ) var ( - ErrUnservedTabletMessage = "Tablet isn't being served by this instance" - errUnservedTablet = x.Errorf(ErrUnservedTabletMessage) + errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.") + errPredicateMoving = x.Errorf("Predicate is being moved. Please retry later") ) func isStarAll(v []byte) bool { diff --git a/worker/schema.go b/worker/schema.go index bc0ce8c435b..9e6b06e9f6e 100644 --- a/worker/schema.go +++ b/worker/schema.go @@ -62,7 +62,7 @@ func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, erro for _, attr := range predicates { // This can happen after a predicate is moved. We don't delete predicate from schema state // immediately. So lets ignore this predicate. - if !groups().ServesTabletReadOnly(attr) { + if !groups().ServesTablet(attr) { continue } if schemaNode := populateSchema(attr, fields); schemaNode != nil { diff --git a/worker/sort.go b/worker/sort.go index cc58357b332..44b83d17a1a 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -45,11 +45,7 @@ type sortresult struct { // SortOverNetwork sends sort query over the network. func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error) { - gid := groups().BelongsToReadOnly(q.Order[0].Attr) - if gid == 0 { - return nil, fmt.Errorf("Cannot sort by unknown attribute %s", q.Order[0].Attr) - } - + gid := groups().BelongsTo(q.Order[0].Attr) if span := otrace.FromContext(ctx); span != nil { span.Annotatef(nil, "worker.SortOverNetwork. Attr: %s. Group: %d", q.Order[0].Attr, gid) } diff --git a/worker/task.go b/worker/task.go index 821efc8d464..fca15071546 100644 --- a/worker/task.go +++ b/worker/task.go @@ -137,11 +137,10 @@ func processWithBackupRequest( // query. func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) { attr := q.Attr - gid := groups().BelongsToReadOnly(attr) + gid := groups().BelongsTo(attr) if gid == 0 { - return &emptyResult, errUnservedTablet + return &pb.Result{}, errUnservedTablet } - span := otrace.FromContext(ctx) if span != nil { span.Annotatef(nil, "ProcessTaskOverNetwork. attr: %v gid: %v, readTs: %d, node id: %d", @@ -158,9 +157,6 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error return c.ServeTask(ctx, q) }) - if err == errUnservedTablet { - return &emptyResult, errUnservedTablet - } if err != nil { return nil, err } @@ -726,13 +722,11 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro q.Attr, q.ReadTs, maxAssigned) } - // If a group stops serving tablet and it gets partitioned away from group - // zero, then it wouldn't know that this group is no longer serving this - // predicate. There's no issue if a we are serving a particular tablet and - // we get partitioned away from group zero as long as it's not removed. - // ServesTabletReadOnly is called instead of ServesTablet to prevent this - // alpha from requesting to serve this tablet. - if !groups().ServesTabletReadOnly(q.Attr) { + // If a group stops serving tablet and it gets partitioned away from group zero, then it + // wouldn't know that this group is no longer serving this predicate. + // There's no issue if a we are serving a particular tablet and we get partitioned away from + // group zero as long as it's not removed. + if !groups().ServesTablet(q.Attr) { return &emptyResult, errUnservedTablet } qs := queryState{cache: posting.Oracle().CacheAt(q.ReadTs)} @@ -1612,11 +1606,7 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er return &emptyResult, ctx.Err() } - gid := groups().BelongsToReadOnly(q.Attr) - if gid == 0 { - return &emptyResult, errUnservedTablet - } - + gid := groups().BelongsTo(q.Attr) var numUids int if q.UidList != nil { numUids = len(q.UidList.Uids) From cda188888e896b152993f7d88ce790d033eabde3 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 19:18:48 -0800 Subject: [PATCH 06/11] fix up blockade --- contrib/blockade/main.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index b4defe603fc..1381e027a6e 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -157,7 +157,6 @@ func runTests() error { // } // fmt.Println("===> Slow TEST: OK") -<<<<<<< HEAD // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { // fmt.Printf("Error testRestart with stop: %v\n", err) // return err @@ -169,13 +168,6 @@ func runTests() error { // return err // } // fmt.Println("===> Restart TEST2: OK") -======= - if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { - fmt.Printf("Error testRestart with stop: %v\n", err) - return err - } - fmt.Println("===> Restart TEST1: OK") ->>>>>>> Remove network tests from blockade, so it can run faster. for { if err := testCommon("blockade partition", "blockade join", 2); err != nil { From 2c4f6bba00ab394756e1417fa5735fbb79d060e0 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 1 Mar 2019 20:08:47 -0800 Subject: [PATCH 07/11] Revert blockade changes --- contrib/blockade/main.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index 1381e027a6e..bf1871a180d 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -157,25 +157,23 @@ func runTests() error { // } // fmt.Println("===> Slow TEST: OK") - // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { - // fmt.Printf("Error testRestart with stop: %v\n", err) - // return err - // } - // fmt.Println("===> Restart TEST1: OK") + if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { + fmt.Printf("Error testRestart with stop: %v\n", err) + return err + } + fmt.Println("===> Restart TEST1: OK") - // if err := testCommon("blockade restart", "", 3); err != nil { - // fmt.Printf("Error testRestart with restart: %v\n", err) - // return err - // } - // fmt.Println("===> Restart TEST2: OK") + if err := testCommon("blockade restart", "", 3); err != nil { + fmt.Printf("Error testRestart with restart: %v\n", err) + return err + } + fmt.Println("===> Restart TEST2: OK") - for { - if err := testCommon("blockade partition", "blockade join", 2); err != nil { - fmt.Printf("Error testPartitions: %v\n", err) - return err - } - fmt.Println("===> Partition TEST: OK") + if err := testCommon("blockade partition", "blockade join", 2); err != nil { + fmt.Printf("Error testPartitions: %v\n", err) + return err } + fmt.Println("===> Partition TEST: OK") return nil } From 07cd36295a300851366e7e3c0e5d2d6d9b444327 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 2 Mar 2019 17:29:01 -0800 Subject: [PATCH 08/11] Small changes to test blockade --- conn/node.go | 2 +- contrib/blockade/blockade.yml | 12 ++++++------ contrib/blockade/main.go | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/conn/node.go b/conn/node.go index ca3b111646a..b18a3467ad8 100644 --- a/conn/node.go +++ b/conn/node.go @@ -366,7 +366,7 @@ func (n *Node) BatchAndSendMessages() { } func (n *Node) doSendMessage(to uint64, pool *Pool, data []byte) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() client := pool.Get() diff --git a/contrib/blockade/blockade.yml b/contrib/blockade/blockade.yml index 90798f91b58..5e8e9cd365c 100644 --- a/contrib/blockade/blockade.yml +++ b/contrib/blockade/blockade.yml @@ -17,7 +17,7 @@ containers: expose: - 5080 - 6080 - command: /gobin/dgraph zero --my=zero1:5080 --replicas 3 --idx 1 --bindall --expose_trace --logtostderr -v=2 + command: /gobin/dgraph zero --my=zero1:5080 --replicas 3 --idx 1 --bindall --expose_trace --logtostderr -v=3 volumes: # Note: Any environment variables must use the ${} syntax. # ${GOPATH} works, $GOPATH does not. @@ -33,7 +33,7 @@ containers: expose: - 5082 - 6082 - command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --bindall --expose_trace --logtostderr -v=2 + command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --bindall --expose_trace --logtostderr -v=3 volumes: "${GOPATH}/bin": "/gobin" @@ -47,7 +47,7 @@ containers: expose: - 5083 - 6083 - command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --bindall --expose_trace --logtostderr -v=2 + command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --bindall --expose_trace --logtostderr -v=3 volumes: "${GOPATH}/bin": "/gobin" @@ -61,7 +61,7 @@ containers: expose: - 8180 - 9180 - command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --logtostderr -v=2 + command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --logtostderr -v=3 volumes: "${GOPATH}/bin": "/gobin" @@ -76,7 +76,7 @@ containers: - 8182 - 9182 start_delay: 8 - command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --logtostderr -v=2 + command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --logtostderr -v=3 volumes: "${GOPATH}/bin": "/gobin" @@ -91,7 +91,7 @@ containers: - 8183 - 9183 start_delay: 16 - command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --logtostderr -v=2 + command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --logtostderr -v=3 volumes: "${GOPATH}/bin": "/gobin" diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index bf1871a180d..55bb70d2e54 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -157,11 +157,11 @@ func runTests() error { // } // fmt.Println("===> Slow TEST: OK") - if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { - fmt.Printf("Error testRestart with stop: %v\n", err) - return err - } - fmt.Println("===> Restart TEST1: OK") + // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { + // fmt.Printf("Error testRestart with stop: %v\n", err) + // return err + // } + // fmt.Println("===> Restart TEST1: OK") if err := testCommon("blockade restart", "", 3); err != nil { fmt.Printf("Error testRestart with restart: %v\n", err) From 74589d3d13af58db10831527229aa92fe85f5f44 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 2 Mar 2019 20:41:07 -0800 Subject: [PATCH 09/11] Fix the issue where different Zeros would end up with different checksums for groups, because map does random ordering of keys. --- dgraph/cmd/zero/raft.go | 12 +++++++----- dgraph/docker-compose.yml | 12 ++++++------ posting/oracle.go | 13 +++++++++++++ worker/groups.go | 1 + worker/sort.go | 1 + worker/task.go | 7 ++++--- 6 files changed, 32 insertions(+), 14 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index ef22d605dfb..cdd8f41a09d 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -17,11 +17,12 @@ package zero import ( - "bytes" "errors" "fmt" "log" "math" + "sort" + "strings" "sync" "time" @@ -245,11 +246,12 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error { // served by the group. If the tablets that a group is serving changes, and the Alpha does // not know about these changes, then the read request must fail. for _, g := range state.GetGroups() { - var buf bytes.Buffer - for name := range g.GetTablets() { - x.Check2(buf.WriteString(name)) + preds := make([]string, 0, len(g.GetTablets())) + for pred := range g.GetTablets() { + preds = append(preds, pred) } - g.Checksum = farm.Fingerprint64(buf.Bytes()) + sort.Strings(preds) + g.Checksum = farm.Fingerprint64([]byte(strings.Join(preds, ""))) } }() diff --git a/dgraph/docker-compose.yml b/dgraph/docker-compose.yml index a6510e6e104..47357252624 100644 --- a/dgraph/docker-compose.yml +++ b/dgraph/docker-compose.yml @@ -19,7 +19,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 0 --my=zero1:5080 --replicas 3 --idx 1 --logtostderr -v=2 --bindall --expose_trace --profile_mode block --block_rate 10 + command: /gobin/dgraph zero -o 0 --my=zero1:5080 --replicas 3 --idx 1 --logtostderr -v=3 --bindall --expose_trace --profile_mode block --block_rate 10 zero2: image: dgraph/dgraph:latest @@ -38,7 +38,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --idx 2 --logtostderr -v=2 --peer=zero1:5080 + command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --idx 2 --logtostderr -v=3 --peer=zero1:5080 zero3: image: dgraph/dgraph:latest @@ -57,7 +57,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --idx 3 --logtostderr -v=2 --peer=zero1:5080 + command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --idx 3 --logtostderr -v=3 --peer=zero1:5080 dg1: image: dgraph/dgraph:latest @@ -74,7 +74,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg2: image: dgraph/dgraph:latest @@ -93,7 +93,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg3: image: dgraph/dgraph:latest @@ -112,7 +112,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg4: image: dgraph/dgraph:latest container_name: bank-dg4 diff --git a/posting/oracle.go b/posting/oracle.go index aae91c71c2f..141d761af4b 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" ) var o *oracle @@ -186,6 +187,18 @@ func (o *oracle) WaitForTs(ctx context.Context, startTs uint64) error { } func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { + if glog.V(3) { + glog.Infof("ProcessDelta: Max Assigned: %d", delta.MaxAssigned) + glog.Infof("ProcessDelta: Group checksum: %v", delta.GroupChecksums) + for _, txn := range delta.Txns { + if txn.CommitTs == 0 { + glog.Infof("ProcessDelta Aborted: %d", txn.StartTs) + } else { + glog.Infof("ProcessDelta Committed: %d -> %d", txn.StartTs, txn.CommitTs) + } + } + } + o.Lock() defer o.Unlock() for _, txn := range delta.Txns { diff --git a/worker/groups.go b/worker/groups.go index 982ea44af42..ecca4fb06cd 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -263,6 +263,7 @@ func (g *groupi) applyState(state *pb.MembershipState) { g.tablets[tablet.Predicate] = tablet } if gid == g.gid { + glog.V(3).Infof("group %d checksum: %d", g.gid, group.Checksum) atomic.StoreUint64(&g.membershipChecksum, group.Checksum) } } diff --git a/worker/sort.go b/worker/sort.go index 44b83d17a1a..3c076117464 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -380,6 +380,7 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error if err := posting.Oracle().WaitForTs(ctx, ts.ReadTs); err != nil { return nil, err } + span.Annotatef(nil, "Waiting for checksum match") if err := groups().ChecksumsMatch(ctx); err != nil { return nil, err } diff --git a/worker/task.go b/worker/task.go index fca15071546..50b059997cd 100644 --- a/worker/task.go +++ b/worker/task.go @@ -713,14 +713,15 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return nil, err } - if err := groups().ChecksumsMatch(ctx); err != nil { - return nil, err - } if span != nil { maxAssigned := posting.Oracle().MaxAssigned() span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d", q.Attr, q.ReadTs, maxAssigned) } + if err := groups().ChecksumsMatch(ctx); err != nil { + return nil, err + } + span.Annotatef(nil, "Done waiting for checksum match") // If a group stops serving tablet and it gets partitioned away from group zero, then it // wouldn't know that this group is no longer serving this predicate. From acae5ba746bf7430dc503b44ebfd32dc54daa702 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 2 Mar 2019 21:00:25 -0800 Subject: [PATCH 10/11] Remove debug changes --- contrib/blockade/main.go | 10 +++++----- dgraph/docker-compose.yml | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/contrib/blockade/main.go b/contrib/blockade/main.go index 55bb70d2e54..bf1871a180d 100644 --- a/contrib/blockade/main.go +++ b/contrib/blockade/main.go @@ -157,11 +157,11 @@ func runTests() error { // } // fmt.Println("===> Slow TEST: OK") - // if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { - // fmt.Printf("Error testRestart with stop: %v\n", err) - // return err - // } - // fmt.Println("===> Restart TEST1: OK") + if err := testCommon("blockade stop", "blockade start --all", 2); err != nil { + fmt.Printf("Error testRestart with stop: %v\n", err) + return err + } + fmt.Println("===> Restart TEST1: OK") if err := testCommon("blockade restart", "", 3); err != nil { fmt.Printf("Error testRestart with restart: %v\n", err) diff --git a/dgraph/docker-compose.yml b/dgraph/docker-compose.yml index 47357252624..a6510e6e104 100644 --- a/dgraph/docker-compose.yml +++ b/dgraph/docker-compose.yml @@ -19,7 +19,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 0 --my=zero1:5080 --replicas 3 --idx 1 --logtostderr -v=3 --bindall --expose_trace --profile_mode block --block_rate 10 + command: /gobin/dgraph zero -o 0 --my=zero1:5080 --replicas 3 --idx 1 --logtostderr -v=2 --bindall --expose_trace --profile_mode block --block_rate 10 zero2: image: dgraph/dgraph:latest @@ -38,7 +38,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --idx 2 --logtostderr -v=3 --peer=zero1:5080 + command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --idx 2 --logtostderr -v=2 --peer=zero1:5080 zero3: image: dgraph/dgraph:latest @@ -57,7 +57,7 @@ services: source: $GOPATH/bin target: /gobin read_only: true - command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --idx 3 --logtostderr -v=3 --peer=zero1:5080 + command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --idx 3 --logtostderr -v=2 --peer=zero1:5080 dg1: image: dgraph/dgraph:latest @@ -74,7 +74,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg2: image: dgraph/dgraph:latest @@ -93,7 +93,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg3: image: dgraph/dgraph:latest @@ -112,7 +112,7 @@ services: labels: cluster: test service: alpha - command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=3 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 + command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 --whitelist 10.0.0.0/8,172.16.0.0/12,192.168.0.0/16 dg4: image: dgraph/dgraph:latest container_name: bank-dg4 From ffe6de89fd381819a315e5c2f1ef8529a8a40ede Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sat, 2 Mar 2019 21:13:48 -0800 Subject: [PATCH 11/11] Remove unused var --- worker/mutation.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/worker/mutation.go b/worker/mutation.go index fc55cb75262..31dc29f1d92 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -39,8 +39,7 @@ import ( ) var ( - errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.") - errPredicateMoving = x.Errorf("Predicate is being moved. Please retry later") + errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.") ) func isStarAll(v []byte) bool {