diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index f4db78b6a616..1b02b41d473b 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -214,6 +214,9 @@ message SnapshotResponse { Status status = 1; string message = 2; reserved 3; + + // Traces from snapshot processing, returned on status APPLIED or ERROR. + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false]; } // DelegateSnapshotRequest is the request used to delegate send snapshot requests. diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 0cdfbdd0002c..a876b768dbeb 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -17,6 +17,14 @@ import "kv/kvserver/api.proto"; service MultiRaft { rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {} + // RaftSnapshot asks the server to accept and apply a range snapshot. + // The client is expected to initially send a message consisting solely of + // a Header, upon which the server will respond with a message with status + // ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the + // client will send multiple messages with KVBatch data followed by a + // terminal message with the final flag set to true. Once finalized, + // the server will ultimately send a message back with status APPLIED, or + // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target // (so the client delegates the sending of the snapshot to the server). The diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 7803070a5f62..5f9082ae3e39 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -906,6 +907,8 @@ func (s *Store) checkSnapshotOverlapLocked( func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + sp := tracing.SpanFromContext(ctx) + // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. @@ -1028,29 +1031,43 @@ func (s *Store) receiveSnapshot( s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc) } } - ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") + ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") + defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) - sp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors if err != nil { return err } inSnap.placeholder = placeholder + rec := sp.GetConfiguredRecording() + // Use a background context for applying the snapshot, as handleRaftReady is // not prepared to deal with arbitrary context cancellation. Also, we've // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + return sendSnapshotErrorWithTrace(stream, + errors.Wrap(err.GoError(), "failed to apply snapshot"), rec, + ) } - return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED}) + return stream.Send(&kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_APPLIED, + CollectedSpans: rec, + }) } func sendSnapshotError(stream incomingSnapshotStream, err error) error { + return sendSnapshotErrorWithTrace(stream, err, nil /* trace */) +} + +func sendSnapshotErrorWithTrace( + stream incomingSnapshotStream, err error, trace tracingpb.Recording, +) error { return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), + Status: kvserverpb.SnapshotResponse_ERROR, + Message: err.Error(), + CollectedSpans: trace, }) } @@ -1449,6 +1466,7 @@ func sendSnapshot( } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: + sp.ImportRemoteRecording(resp.CollectedSpans) storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID) return errors.Errorf("%s: remote couldn't accept %s with error: %s", to, snap, resp.Message) @@ -1526,6 +1544,7 @@ func sendSnapshot( if err != nil { return errors.Wrapf(err, "%s: remote failed to apply snapshot", to) } + sp.ImportRemoteRecording(resp.CollectedSpans) // NB: wait for EOF which ensures that all processing on the server side has // completed (such as defers that might be run after the previous message was // received). @@ -1601,17 +1620,9 @@ func delegateSnapshot( unexpectedResp, ) } - // Import the remotely collected spans, if any. - if len(resp.CollectedSpans) != 0 { - span := tracing.SpanFromContext(ctx) - if span == nil { - log.Warningf( - ctx, - "trying to ingest remote spans but there is no recording span set up", - ) - } else { - span.ImportRemoteRecording(resp.CollectedSpans) - } + sp := tracing.SpanFromContext(ctx) + if sp != nil { + sp.ImportRemoteRecording(resp.CollectedSpans) } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: