Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78980: kvserver: check GC threshold after acquiring a storage snapshot r=aayushshah15 a=aayushshah15

Previously, we would check if a given read could proceed with execution
_before_ we acquired a snapshot of the storage engine state. In particular, we
would check the replica's in-memory GC threshold before the state of the engine
had been pinned.

This meant that the following scenario was possible:
1. Request comes in, checks the replica's in-memory GC threshold and determines
that it is safe to proceed.
2. A pair of GC requests bump the GC threshold and garbage-collect the expired data.
3. The read acquires a snapshot of the storage engine state.
4. The read continues with its execution and sees an incorrect result.

This commit makes it such that we now check the GC threshold after we acquire a
snapshot of the storage engine state.

NB: Note that this commit only fixes our current model of issuing `GCRequest`s
-- which is that we first issue a GCRequest that simply bumps the GC threshold
and then issue another GCRequest that actually performs the garbage collection.
If a single GCRequest were to do both of these things, we'd still have an issue
with reads potentially seeing incorrect results since, currently, the in-memory
GC threshold is bumped as a "post-apply" side effect that takes effect after
the expired data has already been garbage collected. This will be handled in a
future patch.

Release note: none

Relates to #55293.

83409: tracing: add ChildrenMetadata to String and JSON traces r=andreimatei a=adityamaru

This change teaches the Recording stringer about the
ChildrenMetadata map, that contains per operation metadata
about the spans' children.

This change also adds ChildrenMetadata to the JSON view of
a recording so that it shows up in the `trace.json` file that
is included in a statement bundle.

Informs: #80391

Release note: a trace recording will now output an additional field
per tracing span that corresponds to metadata bucketed by operation
name of the spans' children.

83669: build: allow overriding TESTS for roachtest CI job r=rickystewart a=tbg

This allows us to invoke the roachtest CI jobs with an overridden TESTS
variable.

Release note: None


Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
4 people committed Jun 30, 2022
4 parents 05ca68a + efa68b9 + 500b57a + 240870a commit b630c08
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 71 deletions.
2 changes: 1 addition & 1 deletion build/teamcity/util/roachtest_util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ trap upload_stats EXIT
# Set up the parameters for the roachtest invocation.
PARALLELISM=16
CPUQUOTA=1024
TESTS=""
TESTS="${TESTS-}"
case "${CLOUD}" in
gce)
;;
Expand Down
68 changes: 52 additions & 16 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,10 +1345,12 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(

// TODO(nvanbenschoten): move the following 5 methods to replica_send.go.

// checkExecutionCanProceed returns an error if a batch request cannot be
// executed by the Replica. An error indicates that the Replica is not live and
// able to serve traffic or that the request is not compatible with the state of
// the Range due to the range's key bounds, the range's lease, the range's GC
// checkExecutionCanProceedBeforeStorageSnapshot returns an error if a batch
// request cannot be executed by the Replica. For read-only requests, the method
// is called before the state of the storage engine is pinned (via an iterator
// or a snapshot). An error indicates that the Replica is not live and able to
// serve traffic or that the request is not compatible with the state of the
// Range due to the range's key bounds, the range's lease, the range's GC
// threshold, or due to a pending merge. On success, returns nil and either a
// zero LeaseStatus (indicating that the request was permitted to skip the lease
// checks) or a LeaseStatus in LeaseState_VALID (indicating that the Replica is
Expand All @@ -1359,7 +1361,7 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
// will not wait for a pending merge to conclude before proceeding. Callers
// might be ok with this if they know that they will end up checking for a
// pending merge at some later time.
func (r *Replica) checkExecutionCanProceed(
func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) (kvserverpb.LeaseStatus, error) {
rSpan, err := keys.Range(ba.Requests)
Expand Down Expand Up @@ -1398,7 +1400,7 @@ func (r *Replica) checkExecutionCanProceed(
return kvserverpb.LeaseStatus{}, err
}

st, shouldExtend, err := r.checkGCThresholdAndLeaseRLocked(ctx, ba)
st, shouldExtend, err := r.checkLeaseRLocked(ctx, ba)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
Expand Down Expand Up @@ -1432,13 +1434,54 @@ func (r *Replica) checkExecutionCanProceed(
return st, nil
}

// checkGCThresholdAndLeaseRLocked checks the provided batch against the GC
// checkExecutionCanProceedAfterStorageSnapshot returns an error if a batch
// request cannot be executed by the Replica. For read-only requests, this
// method is called after the state of the storage engine is pinned via an
// iterator. An error indicates that the request's timestamp is below the
// Replica's GC threshold.
func (r *Replica) checkExecutionCanProceedAfterStorageSnapshot(
ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
) error {
r.mu.RLock()
defer r.mu.RUnlock()
// NB: For read-only requests, the GC threshold check is performed after the
// state of the storage engine has been pinned by the iterator. This is
// because GC requests don't acquire latches at the timestamp they are garbage
// collecting, so read traffic at / around the GC threshold will not be
// serialized with GC requests. Thus, we must check the in-memory GC threshold
// after we pin the state of the storage engine [1].
//
// [1]: This relies on the invariant that the in-memory GC threshold is bumped
// _before_ the actual garbage collection happens.
//
// TODO(aayush): The above description intentionally omits some details, as
// they are going to be changed as part of
// https://github.com/cockroachdb/cockroach/issues/55293.
return r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin())
}

// checkExecutionCanProceedRWOrAdmin returns an error if a batch request going
// through the RW or admin paths cannot be executed by the Replica.
func (r *Replica) checkExecutionCanProceedRWOrAdmin(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) (kvserverpb.LeaseStatus, error) {
st, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, g)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
return kvserverpb.LeaseStatus{}, err
}
return st, nil
}

// checkLeaseRLocked checks the provided batch against the GC
// threshold and lease. A nil error indicates to go ahead with the batch, and
// is accompanied either by a valid or zero lease status, the latter case
// indicating that the request was permitted to bypass the lease check. The
// returned bool indicates whether the lease should be extended (only on nil
// error).
func (r *Replica) checkGCThresholdAndLeaseRLocked(
func (r *Replica) checkLeaseRLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) (kvserverpb.LeaseStatus, bool, error) {
now := r.Clock().NowAsClockTimestamp()
Expand Down Expand Up @@ -1474,21 +1517,14 @@ func (r *Replica) checkGCThresholdAndLeaseRLocked(
}
// Otherwise, suppress the error. Also, remember that we're not serving
// this under the lease by zeroing out the status. We also intentionally
// do not pass the original status to checkTSAboveGCThresholdRLocked as
// do not pass the original status to checkTSAboveGCThreshold as
// this method assumes that a valid status indicates that this replica
// holds the lease (see #73123). `shouldExtend` is already false in this
// branch, but for completeness we zero it out as well.
st, shouldExtend, err = kvserverpb.LeaseStatus{}, false, nil
}
}

// Check if request is below the GC threshold and if so, error out. Note that
// this uses the lease status no matter whether it's valid or not, and the
// method is set up to handle that.
if err := r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin()); err != nil {
return kvserverpb.LeaseStatus{}, false, err
}

return st, shouldExtend, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_follower_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestCheckExecutionCanProceedAllowsFollowerReadWithInvalidLease(t *testing.T
}
ba.Add(&gArgs)

ls, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */)
ls, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, nil /* g */)
require.NoError(t, err)
require.Empty(t, ls)

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ func (r *Replica) evalAndPropose(
proposal.command.ProposerLeaseSequence = seq
} else if !st.Lease.OwnedBy(r.store.StoreID()) {
// Perform a sanity check that the lease is owned by this replica. This must
// have been ascertained by the callers in checkExecutionCanProceed.
// have been ascertained by the callers in
// checkExecutionCanProceedBeforeStorageSnapshot.
log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease)
} else {
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *Replica) executeReadOnlyBatch(
defer r.readOnlyCmdMu.RUnlock()

// Verify that the batch can be executed.
st, err := r.checkExecutionCanProceed(ctx, ba, g)
st, err := r.checkExecutionCanProceedBeforeStorageSnapshot(ctx, ba, g)
if err != nil {
return nil, g, roachpb.NewError(err)
}
Expand Down Expand Up @@ -75,6 +75,9 @@ func (r *Replica) executeReadOnlyBatch(
}
defer rw.Close()

if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
return nil, g, roachpb.NewError(err)
}
// TODO(nvanbenschoten): once all replicated intents are pulled into the
// concurrency manager's lock-table, we can be sure that if we reached this
// point, we will not conflict with any of them during evaluation. This in
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ func (r *Replica) executeAdminBatch(
return nil, roachpb.NewError(err)
}

_, err := r.checkExecutionCanProceed(ctx, ba, nil /* g */)
_, err := r.checkExecutionCanProceedRWOrAdmin(ctx, ba, nil /* g */)
if err == nil {
err = r.signallerForBatch(ba).Err()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8692,7 +8692,6 @@ func TestRefreshFromBelowGCThreshold(t *testing.T) {
func TestGCThresholdRacesWithRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 55293)

testutils.RunTrueAndFalse(t, "followerRead", func(t *testing.T, followerRead bool) {
testutils.RunTrueAndFalse(t, "thresholdFirst", func(t *testing.T, thresholdFirst bool) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ func (r *Replica) executeWriteBatch(
}()

// Verify that the batch can be executed.
st, err := r.checkExecutionCanProceed(ctx, ba, g)
st, err := r.checkExecutionCanProceedRWOrAdmin(ctx, ba, g)
if err != nil {
return nil, g, roachpb.NewError(err)
}

// Check the breaker. Note that we do this after checkExecutionCanProceed,
// so that NotLeaseholderError has precedence.
// Check the breaker. Note that we do this after
// checkExecutionCanProceedBeforeStorageSnapshot, so that NotLeaseholderError
// has precedence.
if err := r.signallerForBatch(ba).Err(); err != nil {
return nil, g, roachpb.NewError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) {
t.Fatal("replica was not marked as destroyed")
}

if _, err = repl1.checkExecutionCanProceed(ctx, &roachpb.BatchRequest{}, nil /* g */); !errors.Is(err, expErr) {
if _, err = repl1.checkExecutionCanProceedBeforeStorageSnapshot(ctx, &roachpb.BatchRequest{}, nil /* g */); !errors.Is(err, expErr) {
t.Fatalf("expected error %s, but got %v", expErr, err)
}
}
Expand Down
21 changes: 0 additions & 21 deletions pkg/server/node_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,6 @@ func redactRecordingForTenant(tenID roachpb.TenantID, rec tracingpb.Recording) e
"recording has non-redactable span with the Message field set: %s", sp)
}
record.Message = record.Message.Redact()

// For compatibility with old versions, also redact DeprecatedFields.
for k := range record.DeprecatedFields {
field := &record.DeprecatedFields[k]
if field.Key != tracingpb.LogMessageField {
// We don't have any of these fields, but let's not take any
// chances (our dependencies might slip them in).
field.Value = TraceRedactedMarker
continue
}
if !sp.RedactableLogs {
// If we're handling a span that originated from an (early patch
// release) 22.1 node, all the containing information will be
// stripped. Note that this is not the common path here, as most
// information in the trace will be from the local node, which
// always creates redactable logs.
field.Value = TraceRedactedMarker
continue
}
field.Value = field.Value.Redact()
}
}
}
return nil
Expand Down
4 changes: 0 additions & 4 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,10 +678,6 @@ func (s *crdbSpan) record(msg redact.RedactableString) {
logRecord := &tracingpb.LogRecord{
Time: now,
Message: msg,
// Compatibility with 21.2.
DeprecatedFields: []tracingpb.LogRecord_Field{
{Key: tracingpb.LogMessageField, Value: msg},
},
}

s.recordInternal(logRecord, &s.mu.recording.logs)
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func TestRecordingString(t *testing.T) {

require.NoError(t, CheckRecording(rec, `
=== operation:root _verbose:1
[remote child]
[local child]
event:root 1
=== operation:remote child _verbose:1
event:remote child 1
Expand All @@ -124,7 +126,7 @@ func TestRecordingString(t *testing.T) {
timeSincePrev: "0.000ms",
text: "=== operation:root _verbose:1",
}, l)
l, err = parseLine(lines[1])
l, err = parseLine(lines[3])
require.Equal(t, traceLine{
timeSinceTraceStart: "1.000ms",
timeSincePrev: "1.000ms",
Expand Down Expand Up @@ -183,6 +185,7 @@ func TestRecordingInRecording(t *testing.T) {

require.NoError(t, CheckRecording(childRec, `
=== operation:child _verbose:1
[grandchild]
=== operation:grandchild _verbose:1
`))
}
Expand All @@ -208,6 +211,7 @@ func TestImportRemoteRecording(t *testing.T) {
if verbose {
require.NoError(t, CheckRecording(sp.FinishAndGetRecording(tracingpb.RecordingVerbose), `
=== operation:root _verbose:1
[child]
=== operation:child _verbose:1
event:&Int32Value{Value:4,XXX_unrecognized:[],}
event:foo
Expand All @@ -216,6 +220,7 @@ func TestImportRemoteRecording(t *testing.T) {
} else {
require.NoError(t, CheckRecording(sp.FinishAndGetRecording(tracingpb.RecordingStructured), `
=== operation:root
[child]
structured:{"@type":"type.googleapis.com/google.protobuf.Int32Value","value":4}
`))
}
Expand Down Expand Up @@ -844,6 +849,7 @@ func TestOpenChildIncludedRecording(t *testing.T) {
rec := parent.FinishAndGetRecording(tracingpb.RecordingVerbose)
require.NoError(t, CheckRecording(rec, `
=== operation:parent _verbose:1
[child]
=== operation:child _unfinished:1 _verbose:1
`))
child.Finish()
Expand Down
41 changes: 37 additions & 4 deletions pkg/util/tracing/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"regexp"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -165,6 +166,7 @@ func CheckRecordedSpans(rec tracingpb.Recording, expected string) error {
//
// if err := CheckRecording(sp.GetRecording(), `
// === operation:root
// [childrenMetadata]
// event:root 1
// === operation:remote child
// event:remote child 1
Expand All @@ -186,19 +188,50 @@ func CheckRecording(rec tracingpb.Recording, expected string) error {
// After | "event:root 1"
re := regexp.MustCompile(`.*s.*s\s{4}`)
rec = string(re.ReplaceAll([]byte(rec), nil))
// 4. Change all tabs to four spaces.
// 4. Strip out all the metadata from each ChildrenMetadata entry.
//
// Before | [operation: {Count:<count>, Duration:<duration>}]
// After | [operation]
re = regexp.MustCompile(`:.*]`)
rec = string(re.ReplaceAll([]byte(rec), []byte("]")))
// 5. Change all tabs to four spaces.
rec = strings.ReplaceAll(rec, "\t", " ")
// 5. Compute the outermost indentation.
// 6. Compute the outermost indentation.
indent := strings.Repeat(" ", len(rec)-len(strings.TrimLeft(rec, " ")))
// 6. Outdent each line by that amount.
// 7. Outdent each line by that amount.
var lines []string
for _, line := range strings.Split(rec, "\n") {
lines = append(lines, strings.TrimPrefix(line, indent))
}
// 6. Stitch everything together.
// 8. Stitch everything together.
return strings.Join(lines, "\n")
}

sortChildrenMetadataByName := func(m map[string]tracingpb.OperationMetadata) {
// Sort the OperationMetadata of s' children alphabetically.
childrenMetadata := make([]tracingpb.OperationAndMetadata, 0, len(m))
for operation, metadata := range m {
childrenMetadata = append(childrenMetadata,
tracingpb.OperationAndMetadata{Operation: operation, Metadata: metadata})
}
sort.Slice(childrenMetadata, func(i, j int) bool {
return childrenMetadata[i].Operation < childrenMetadata[j].Operation
})

for i, cm := range childrenMetadata {
metadata := m[cm.Operation]
metadata.Duration = time.Duration(float64(i) * time.Second.Seconds())
m[cm.Operation] = metadata
}
}

// ChildrenMetadata are sorted in descending order of duration when returned.
// To ensure a stable sort in tests, we set the durations to sort in an
// alphabetical descending order.
for i := range rec {
sortChildrenMetadataByName(rec[i].ChildrenMetadata)
}

exp := normalize(expected)
got := normalize(rec.String())
if got != exp {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/tracing/tracingpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/humanizeutil",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
Loading

0 comments on commit b630c08

Please sign in to comment.