Skip to content

Commit

Permalink
Merge pull request #8613 from bdarnell/raft-transport-sync
Browse files Browse the repository at this point in the history
storage: Send preemptive snapshots synchronously
  • Loading branch information
bdarnell authored Aug 18, 2016
2 parents f6cb86c + ee8bbc2 commit 0d11f31
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 78 deletions.
42 changes: 33 additions & 9 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,15 +1700,12 @@ func TestStoreRangeRebalance(t *testing.T) {
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}
// TODO(peter): We're sometimes generating normal snapshots immediately after
// the preemptive ones. Need to figure out why and fix.
if false {
if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}

if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
}
}

Expand Down Expand Up @@ -2303,3 +2300,30 @@ func TestTransferRaftLeadership(t *testing.T) {
return nil
})
}

// TestFailedPreemptiveSnapshot verifies that ChangeReplicas is
// aborted if we are unable to send a preemptive snapshot.
func TestFailedPreemptiveSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

mtc := startMultiTestContext(t, 2)
defer mtc.Stop()

// Replicate a range onto the two stores. This replication is
// important because if there was only one node to begin with, the
// ChangeReplicas would fail because it was unable to achieve quorum
// even if the preemptive snapshot failure were ignored.
mtc.replicateRange(1, 1)

// Now try to add a third. It should fail because we cannot send a
// preemptive snapshot to it.
rep, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}
if err := rep.ChangeReplicas(context.Background(), roachpb.ADD_REPLICA,
roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3},
rep.Desc()); !testutils.IsError(err, "aborted due to failed preemptive snapshot: unable to get connection for node 3") {
t.Fatalf("got %s instead of expected error", err)
}
}
101 changes: 68 additions & 33 deletions storage/raft.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ message ConfChangeContext {

service MultiRaft {
rpc RaftMessage (stream RaftMessageRequest) returns (stream RaftMessageResponse) {}
rpc RaftMessageSync (RaftMessageRequest) returns (RaftMessageResponse) {}
}
91 changes: 68 additions & 23 deletions storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,39 @@ func NewRaftTransport(resolver NodeAddressResolver, grpcServer *grpc.Server, rpc
return t
}

// RaftMessage proxies the incoming request to the listening server interface.
// handleRaftRequest proxies a request to the listening server interface.
func (t *RaftTransport) handleRaftRequest(
ctx context.Context, req *RaftMessageRequest,
) *roachpb.Error {
t.mu.Lock()
handler, ok := t.mu.handlers[req.ToReplica.StoreID]
t.mu.Unlock()

if !ok {
return roachpb.NewErrorf(
"unable to accept Raft message from %+v: no store registered for %+v",
req.ToReplica, req.FromReplica)
}

return handler.HandleRaftRequest(ctx, req)
}

// newRaftMessageResponse constructs a RaftMessageResponse from the
// given request and error.
func newRaftMessageResponse(req *RaftMessageRequest, pErr *roachpb.Error) *RaftMessageResponse {
resp := &RaftMessageResponse{
RangeID: req.RangeID,
// From and To are reversed in the response.
ToReplica: req.FromReplica,
FromReplica: req.ToReplica,
}
if pErr != nil {
resp.Union.SetValue(pErr)
}
return resp
}

// RaftMessage proxies the incoming requests to the listening server interface.
func (t *RaftTransport) RaftMessage(stream MultiRaft_RaftMessageServer) (err error) {
errCh := make(chan error, 1)

Expand All @@ -144,28 +176,8 @@ func (t *RaftTransport) RaftMessage(stream MultiRaft_RaftMessageServer) (err err
return err
}

t.mu.Lock()
handler, ok := t.mu.handlers[req.ToReplica.StoreID]
t.mu.Unlock()

if !ok {
return errors.Errorf(
"unable to accept Raft message from %+v: no store registered for %+v",
req.ToReplica, req.FromReplica)
}

if pErr := handler.HandleRaftRequest(stream.Context(), req); pErr != nil {
resp := &RaftMessageResponse{
RangeID: req.RangeID,
// From and To are reversed in the response.
ToReplica: req.FromReplica,
FromReplica: req.ToReplica,

Union: RaftMessageResponseUnion{
Error: pErr,
},
}
if err := stream.Send(resp); err != nil {
if pErr := t.handleRaftRequest(stream.Context(), req); pErr != nil {
if err := stream.Send(newRaftMessageResponse(req, pErr)); err != nil {
return err
}
}
Expand All @@ -184,6 +196,18 @@ func (t *RaftTransport) RaftMessage(stream MultiRaft_RaftMessageServer) (err err
}
}

// RaftMessageSync proxies the incoming request to the listening server interface.
func (t *RaftTransport) RaftMessageSync(ctx context.Context, req *RaftMessageRequest,
) (*RaftMessageResponse, error) {
var pErr *roachpb.Error
if err := t.rpcContext.Stopper.RunTask(func() {
pErr = t.handleRaftRequest(ctx, req)
}); err != nil {
return nil, err
}
return newRaftMessageResponse(req, pErr), nil
}

// Listen registers a raftMessageHandler to receive proxied messages.
func (t *RaftTransport) Listen(storeID roachpb.StoreID, handler RaftMessageHandler) {
t.mu.Lock()
Expand Down Expand Up @@ -386,3 +410,24 @@ func (t *RaftTransport) SendAsync(req *RaftMessageRequest) bool {
return false
}
}

// SendSync sends a raft message and waits for an acknowledgement.
func (t *RaftTransport) SendSync(ctx context.Context, req *RaftMessageRequest) error {
conn := t.getNodeConn(req.ToReplica.NodeID)
if conn == nil {
return errors.Errorf("unable to get connection for node %s", req.ToReplica.NodeID)
}
client := NewMultiRaftClient(conn)
resp, err := client.RaftMessageSync(ctx, req)
if err != nil {
return err
}
switch val := resp.Union.GetValue().(type) {
case *roachpb.Error:
return val.GoError()
case nil:
return nil
default:
return errors.Errorf("unexpected response value %T %s", val, val)
}
}
5 changes: 5 additions & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {

// setReplicaIDLocked requires that the replica lock is held.
func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
if replicaID == 0 {
// If the incoming message didn't give us a new replica ID,
// there's nothing to do (this is only expected for preemptive snapshots).
return nil
}
if r.mu.replicaID == replicaID {
return nil
} else if r.mu.replicaID > replicaID {
Expand Down
7 changes: 5 additions & 2 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3016,7 +3016,7 @@ func (r *Replica) ChangeReplicas(
repDesc.ReplicaID,
)
}
r.store.ctx.Transport.SendAsync(&RaftMessageRequest{
req := &RaftMessageRequest{
RangeID: r.RangeID,
FromReplica: fromRepDesc,
ToReplica: repDesc,
Expand All @@ -3027,7 +3027,10 @@ func (r *Replica) ChangeReplicas(
Term: snap.Metadata.Term,
Snapshot: snap,
},
})
}
if err := r.store.ctx.Transport.SendSync(ctx, req); err != nil {
return errors.Wrapf(err, "change replicas of range %d aborted due to failed preemptive snapshot", rangeID)
}

repDesc.ReplicaID = updatedDesc.NextReplicaID
updatedDesc.NextReplicaID++
Expand Down
Loading

0 comments on commit 0d11f31

Please sign in to comment.