Skip to content

Commit

Permalink
Merge #83757
Browse files Browse the repository at this point in the history
83757: kv/bulk: skip exact matches in SSTBatcher when ingestAll is true r=erikgrinaker a=stevendanna

This changes the semantics of ingestAll to skip ingesting exact
matches. We expect to see such exact matches when processing duplicate
events from a tenant replication stream.

Previously, such exact matches didn't cause a problem other than
wasted work.

However, recently a number of streaming tests are recently failing
under race with

    stream_ingestion_processor_test.go:464: unexpected meta error
    flushing: addsstable
    [/Tenant/30/Table/3/1/52/2/1,/Tenant/30/Table/53/1/96/0/NULL): SST
    stats are incorrect: diff(given, actual) = [KeyBytes: 5924 != 5036
    ValBytes: 3523 != 2989 ValCount: 395 != 321]

When under race, AddSST re-calculates any MVCCStats sent to
it. However, it does so using a different iterator than we do in
SSTBatcher. The new iterators used during the check treats duplicates
in the SST differently -- leading to a difference in the stats
computation.

The difference in iterator behaviour is arguable a bug (#83766); but
since the duplicates serve no purpose, removing them during batching
seems prudent.

Fixes #83740
Fixes #83741
Fixes #83742

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Jul 14, 2022
2 parents a47c057 + 23e7c91 commit 840d146
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -42,8 +41,6 @@ type partitionToEvent map[string][]streamingccl.Event
func TestStreamIngestionFrontierProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -159,8 +158,6 @@ func TestStreamIngestionProcessor(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down Expand Up @@ -473,8 +470,6 @@ func TestRandomClientGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRaceWithIssue(t, 83867)

dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
if _, err := w.Write([]byte("42,42\n43,43\n")); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/testutils/serverutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/tracing",
Expand Down
70 changes: 55 additions & 15 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ type SSTBatcher struct {
// will not raise a DuplicateKeyError.
skipDuplicates bool
// ingestAll can only be set when disallowShadowingBelow is empty and
// skipDuplicates is false. It will never return a duplicateKey error and
// continue ingesting all data provided to it.
// skipDuplicates is false.
//
// It will only ever return a duplicateKey error if the key and timestamp are
// the same as en existing key but the value differs.
ingestAll bool

// batchTS is the timestamp that will be set on batch requests used to send
Expand All @@ -137,13 +139,14 @@ type SSTBatcher struct {

// The rest of the fields are per-batch and are reset via Reset() before each
// batch is started.
sstWriter storage.SSTWriter
sstFile *storage.MemFile
batchStartKey []byte
batchEndKey []byte
batchEndValue []byte
flushKeyChecked bool
flushKey roachpb.Key
sstWriter storage.SSTWriter
sstFile *storage.MemFile
batchStartKey []byte
batchEndKey []byte
batchEndValue []byte
batchEndTimestamp hlc.Timestamp
flushKeyChecked bool
flushKey roachpb.Key
// lastRange is the span and remaining capacity of the last range added to,
// for checking if the next addition would overfill it.
lastRange struct {
Expand Down Expand Up @@ -207,6 +210,29 @@ func MakeStreamSSTBatcher(
return b, err
}

// MakeTestingSSTBatcher creates a batcher for testing, allowing setting options
// that are typically only set when constructing a batcher in BufferingAdder.
func MakeTestingSSTBatcher(
ctx context.Context,
db *kv.DB,
settings *cluster.Settings,
skipDuplicates bool,
ingestAll bool,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{
db: db,
settings: settings,
skipDuplicates: skipDuplicates,
ingestAll: ingestAll,
mem: mem,
limiter: sendLimiter,
}
err := b.Reset(ctx)
return b, err
}

func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) {
metaKeySize := int64(len(key.Key)) + 1
metaValSize := int64(0)
Expand All @@ -228,15 +254,26 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) {
// keys -- like RESTORE where we want the restored data to look like the backup.
// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) && !b.ingestAll {
if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) {
if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) {
if b.ingestAll && key.Timestamp.Equal(b.batchEndTimestamp) {
if bytes.Equal(b.batchEndValue, value) {
// If ingestAll is set, we allow and skip (key, timestamp, value)
// matches. We expect this from callers who may need to deal with
// duplicates caused by retransmission.
return nil
}
// Despite ingestAll, we raise an error in the case of a because a new value
// at the exact key and timestamp is unexpected and one of the two values
// would be completely lost.
return kvserverbase.NewDuplicateKeyError(key.Key, value)
} else if b.skipDuplicates && bytes.Equal(b.batchEndValue, value) {
// If skipDuplicates is set, we allow and skip (key, value) matches.
return nil
}

err := &kvserverbase.DuplicateKeyError{}
err.Key = append(err.Key, key.Key...)
err.Value = append(err.Value, value...)
return err
if !b.ingestAll {
return kvserverbase.NewDuplicateKeyError(key.Key, value)
}
}
// Check if we need to flush current batch *before* adding the next k/v --
// the batcher may want to flush the keys it already has, either because it
Expand All @@ -256,6 +293,8 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if len(b.batchStartKey) == 0 {
b.batchStartKey = append(b.batchStartKey[:0], key.Key...)
}

b.batchEndTimestamp = key.Timestamp
b.batchEndKey = append(b.batchEndKey[:0], key.Key...)
b.batchEndValue = append(b.batchEndValue[:0], value...)

Expand Down Expand Up @@ -286,6 +325,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) error {
b.batchStartKey = b.batchStartKey[:0]
b.batchEndKey = b.batchEndKey[:0]
b.batchEndValue = b.batchEndValue[:0]
b.batchEndTimestamp = hlc.Timestamp{}
b.flushKey = nil
b.flushKeyChecked = false
b.ms.Reset()
Expand Down
Loading

0 comments on commit 840d146

Please sign in to comment.