Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lin reads timeouts and AssignUid recursion in Zero #3203

Merged
merged 18 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,8 @@ func (n *Node) streamMessages(to uint64, stream *Stream) {
}
lastLog = time.Now()
}
if i >= 1e3 {
if time.Now().After(deadline) {
return
}
i = 0
if i >= 1e3 || time.Now().After(deadline) {
return
}
}
}
Expand All @@ -406,12 +403,18 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
if err != nil {
return err
}

c := pb.NewRaftClient(pool.Get())
mc, err := c.RaftMessage(context.Background())
ctx, span := otrace.StartSpan(context.Background(),
fmt.Sprintf("RaftMessage-%d-to-%d", n.Id, to))
defer span.End()

mc, err := c.RaftMessage(ctx)
if err != nil {
return err
}

var packets, lastPackets uint64
slurp := func(batch *pb.RaftBatch) {
for {
if len(batch.Payload.Data) > messageBatchSoftLimit {
Expand All @@ -420,21 +423,30 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
select {
case data := <-msgCh:
batch.Payload.Data = append(batch.Payload.Data, data...)
packets++
default:
return
}
}
}
ctx := mc.Context()

ctx = mc.Context()
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()

for {
select {
case data := <-msgCh:
batch := &pb.RaftBatch{
Context: n.RaftContext,
Payload: &api.Payload{Data: data},
}
packets++
slurp(batch) // Pick up more entries from msgCh, if present.
span.Annotatef(nil, "[Packets: %d] Sending data of length: %d.",
packets, len(batch.Payload.Data))
if err := mc.Send(batch); err != nil {
span.Annotatef(nil, "Error while mc.Send: %v", err)
switch {
case strings.Contains(err.Error(), "TransientFailure"):
glog.Warningf("Reporting node: %d addr: %s as unreachable.", to, pool.Addr)
Expand All @@ -446,6 +458,13 @@ func (n *Node) doSendMessage(to uint64, msgCh chan []byte) error {
// RAFT would automatically retry.
return err
}
case <-ticker.C:
if lastPackets == packets {
span.Annotatef(nil,
"No activity for a while [Packets == %d]. Closing connection.", packets)
return mc.CloseSend()
}
lastPackets = packets
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -592,15 +611,13 @@ func (n *Node) WaitLinearizableRead(ctx context.Context) error {

func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func() (uint64, error) {
// Read Request can get rejected then we would wait idefinitely on the channel
readIndex := func(activeRctx []byte) (uint64, error) {
// Read Request can get rejected then we would wait indefinitely on the channel
// so have a timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var activeRctx [8]byte
x.Check2(n.Rand.Read(activeRctx[:]))
if err := n.Raft().ReadIndex(ctx, activeRctx[:]); err != nil {
if err := n.Raft().ReadIndex(ctx, activeRctx); err != nil {
glog.Errorf("Error while trying to call ReadIndex: %v\n", err)
return 0, err
}
Expand All @@ -610,8 +627,8 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
case <-closer.HasBeenClosed():
return 0, errors.New("Closer has been called")
case rs := <-readStateCh:
if !bytes.Equal(activeRctx[:], rs.RequestCtx) {
glog.V(1).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:])
if !bytes.Equal(activeRctx, rs.RequestCtx) {
glog.V(3).Infof("Read state: %x != requested %x", rs.RequestCtx, activeRctx[:])
goto again
}
return rs.Index, nil
Expand Down Expand Up @@ -641,8 +658,15 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
break slurpLoop
}
}
// Create one activeRctx slice for the read index, even if we have to call readIndex
// repeatedly. That way, we can process the requests as soon as we encounter the first
// activeRctx. This is better than flooding readIndex with a new activeRctx on each
// call, causing more unique traffic and further delays in request processing.
activeRctx := make([]byte, 8)
x.Check2(n.Rand.Read(activeRctx))
glog.V(3).Infof("Request readctx: %#x", activeRctx)
for {
index, err := readIndex()
index, err := readIndex(activeRctx)
if err == errInternalRetry {
continue
}
Expand Down
57 changes: 38 additions & 19 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"go.etcd.io/etcd/raft/raftpb"
otrace "go.opencensus.io/trace"
)

type sendmsg struct {
Expand Down Expand Up @@ -192,30 +193,19 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
if ctx.Err() != nil {
return ctx.Err()
}
span := otrace.FromContext(ctx)

n := w.GetNode()
if n == nil || n.Raft() == nil {
return ErrNoNode
}
span.Annotatef(nil, "Stream server is node %#x", n.Id)

var rc *pb.RaftContext
raft := w.GetNode().Raft()
for loop := 1; ; loop++ {
batch, err := server.Recv()
if err != nil {
return err
}
if loop%1e6 == 0 {
glog.V(2).Infof("%d messages received by %#x", loop, n.Id)
}
if loop == 1 {
rc := batch.GetContext()
if rc != nil {
n.Connect(rc.Id, rc.Addr)
}
}
if batch.Payload == nil {
continue
}
data := batch.Payload.Data
step := func(data []byte) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancel function is not used on all paths (possible context leak) (from govet)

defer cancel()

for idx := 0; idx < len(data); {
x.AssertTruef(len(data[idx:]) >= 4,
Expand All @@ -233,11 +223,40 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
x.Check(err)
}
// This should be done in order, and not via a goroutine.
// Step can block forever. See: https://github.com/etcd-io/etcd/issues/10585
// So, add a context with timeout to allow it to get out of the blockage.
if err := raft.Step(ctx, msg); err != nil {
return err
glog.Warningf("Error while raft.Step from %#x: %v. Closing RaftMessage stream.",
rc.GetId(), err)
return x.Errorf("Error while raft.Step from %#x: %v", rc.GetId(), err)
}
idx += sz
}
return nil
}

for loop := 1; ; loop++ {
batch, err := server.Recv()
if err != nil {
return err
}
if loop%1e6 == 0 {
glog.V(2).Infof("%d messages received by %#x from %#x", loop, n.Id, rc.GetId())
}
if loop == 1 {
rc = batch.GetContext()
span.Annotatef(nil, "Stream from %#x", rc.GetId())
if rc != nil {
n.Connect(rc.Id, rc.Addr)
}
}
if batch.Payload == nil {
continue
}
data := batch.Payload.Data
if err := step(data); err != nil {
return err
}
}
}

Expand Down
34 changes: 25 additions & 9 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package zero
import (
"errors"

otrace "go.opencensus.io/trace"
"golang.org/x/net/context"

"github.com/dgraph-io/dgraph/protos/pb"
Expand Down Expand Up @@ -159,30 +160,45 @@ func (s *Server) AssignUids(ctx context.Context, num *pb.Num) (*pb.AssignedIds,
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
}
ctx, span := otrace.StartSpan(ctx, "Zero.AssignUids")
defer span.End()

reply := &emptyAssignedIds
c := make(chan error, 1)
go func() {
lease := func() error {
var err error
if s.Node.AmLeader() {
span.Annotatef(nil, "Zero leader leasing %d ids", num.GetVal())
reply, err = s.lease(ctx, num, false)
c <- err
return
return err
}
span.Annotate(nil, "Not Zero leader")
// I'm not the leader and this request was forwarded to me by a peer, who thought I'm the
// leader.
if num.Forward {
return x.Errorf("Invalid Zero received AssignUids request forward. Please retry")
}
// This is an original request. Forward it to the leader.
pl := s.Leader(0)
if pl == nil {
err = x.Errorf("No healthy connection found to Leader of group zero")
} else {
zc := pb.NewZeroClient(pl.Get())
reply, err = zc.AssignUids(ctx, num)
return x.Errorf("No healthy connection found to Leader of group zero")
}
c <- err
span.Annotatef(nil, "Sending request to %v", pl.Addr)
zc := pb.NewZeroClient(pl.Get())
num.Forward = true
reply, err = zc.AssignUids(ctx, num)
return err
}

c := make(chan error, 1)
go func() {
c <- lease()
}()

select {
case <-ctx.Done():
return reply, ctx.Err()
case err := <-c:
span.Annotatef(nil, "Error while leasing %+v: %v", num, err)
return reply, err
}
}
59 changes: 45 additions & 14 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (n *node) checkQuorum(closer *y.Closer) {
defer ticker.Stop()

quorum := func() {
// Make this timeout 3x the timeout on RunReadIndexLoop.
// Make this timeout 1.5x the timeout on RunReadIndexLoop.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

Expand Down Expand Up @@ -603,7 +603,7 @@ func (n *node) Run() {

// snapshot can cause select loop to block while deleting entries, so run
// it in goroutine
readStateCh := make(chan raft.ReadState, 10)
readStateCh := make(chan raft.ReadState, 100)
closer := y.NewCloser(4)
defer func() {
closer.SignalAndWait()
Expand All @@ -626,11 +626,38 @@ func (n *node) Run() {
case <-ticker.C:
n.Raft().Tick()
case rd := <-n.Raft().Ready():
start := time.Now()
_, span := otrace.StartSpan(n.ctx, "Zero.RunLoop",
otrace.WithSampler(otrace.ProbabilitySampler(0.001)))
for _, rs := range rd.ReadStates {
readStateCh <- rs
select {
case readStateCh <- rs:
default:
// Don't block here.
}
}
span.Annotatef(nil, "Pushed %d readstates", len(rd.ReadStates))

if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader && !leader {
glog.Infoln("I've become the leader, updating leases.")
n.server.updateLeases()
}
leader = rd.RaftState == raft.StateLeader
// Oracle stream would close the stream once it steps down as leader
// predicate move would cancel any in progress move on stepping down.
n.triggerLeaderChange()
}
if leader {
// Leader can send messages in parallel with writing to disk.
for _, msg := range rd.Messages {
n.Send(msg)
}
}
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
span.Annotatef(nil, "Saved to storage")
diskDur := time.Since(start)

if !raft.IsEmptySnap(rd.Snapshot) {
var state pb.MembershipState
x.Check(state.Unmarshal(rd.Snapshot.Data))
Expand All @@ -655,22 +682,26 @@ func (n *node) Run() {
}
n.Applied.Done(entry.Index)
}
span.Annotatef(nil, "Applied %d CommittedEntries", len(rd.CommittedEntries))

if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader && !leader {
glog.Infoln("I've become the leader, updating leases.")
n.server.updateLeases()
if !leader {
// Followers should send messages later.
for _, msg := range rd.Messages {
n.Send(msg)
}
leader = rd.RaftState == raft.StateLeader
// Oracle stream would close the stream once it steps down as leader
// predicate move would cancel any in progress move on stepping down.
n.triggerLeaderChange()
}
span.Annotate(nil, "Sent messages")

for _, msg := range rd.Messages {
n.Send(msg)
}
n.Raft().Advance()
span.Annotate(nil, "Advanced Raft")
span.End()
if time.Since(start) > 100*time.Millisecond {
glog.Warningf(
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+
" Num entries: %d. MustSync: %v",
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond),
len(rd.Entries), rd.MustSync)
}
}
}
}
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ service Worker {
message Num {
uint64 val = 1;
bool read_only = 2;
bool forward = 3; // True if this request was forwarded by a peer.
}

message AssignedIds {
Expand Down
Loading