diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 316bf71f70a3..13232574717e 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -57,10 +57,13 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip, clock *hlc.Clock) *raf // reason Raft snapshots cannot be performed by the replicateQueue. needsLease: false, acceptsUnsplitRanges: true, - successes: store.metrics.RaftSnapshotQueueSuccesses, - failures: store.metrics.RaftSnapshotQueueFailures, - pending: store.metrics.RaftSnapshotQueuePending, - processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, + // Allow plenty of time for sending snapshots (O(64MB)) over potentially + // slow links. + processTimeout: 5 * time.Minute, + successes: store.metrics.RaftSnapshotQueueSuccesses, + failures: store.metrics.RaftSnapshotQueueFailures, + pending: store.metrics.RaftSnapshotQueuePending, + processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, }, ) return rq diff --git a/pkg/storage/raft_transport.go b/pkg/storage/raft_transport.go index e228fb0a006b..68f8b14c5c08 100644 --- a/pkg/storage/raft_transport.go +++ b/pkg/storage/raft_transport.go @@ -338,7 +338,8 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer // RaftSnapshot handles incoming streaming snapshot requests. func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error { errCh := make(chan error, 1) - if err := t.rpcContext.Stopper.RunAsyncTask(stream.Context(), func(ctx context.Context) { + ctx := stream.Context() + if err := t.rpcContext.Stopper.RunAsyncTask(ctx, func(ctx context.Context) { errCh <- func() error { req, err := stream.Recv() if err != nil { @@ -365,6 +366,8 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return err } select { + case <-ctx.Done(): + return ctx.Err() case <-t.rpcContext.Stopper.ShouldStop(): return nil case err := <-errCh: