Skip to content

Commit

Permalink
storage: increase snapshot operation timeout
Browse files Browse the repository at this point in the history
Bump the snapshot operation timeout from 1m to 5m. We've seen in
production that sending O(64MB) can take longer than 1m on a
cross-region link. (Need to investigate why this is happening, though).

Properly notice cancellation of snapshot streams on the
recipient. Failing to do this would cause a reception of a snapshot to
wedge permanently if the associated operation timed out.

See cockroachdb#13687
  • Loading branch information
petermattis committed Feb 24, 2017
1 parent 602e690 commit a456503
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
11 changes: 7 additions & 4 deletions pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip, clock *hlc.Clock) *raf
// reason Raft snapshots cannot be performed by the replicateQueue.
needsLease: false,
acceptsUnsplitRanges: true,
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
// Allow plenty of time for sending snapshots (O(64MB)) over potentially
// slow links.
processTimeout: 5 * time.Minute,
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
},
)
return rq
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
// RaftSnapshot handles incoming streaming snapshot requests.
func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error {
errCh := make(chan error, 1)
if err := t.rpcContext.Stopper.RunAsyncTask(stream.Context(), func(ctx context.Context) {
ctx := stream.Context()
if err := t.rpcContext.Stopper.RunAsyncTask(ctx, func(ctx context.Context) {
errCh <- func() error {
req, err := stream.Recv()
if err != nil {
Expand All @@ -365,6 +366,8 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-t.rpcContext.Stopper.ShouldStop():
return nil
case err := <-errCh:
Expand Down

0 comments on commit a456503

Please sign in to comment.