From 0950a1e09e74e4bed0893fa10c86d68de064f8d2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 2 Jan 2024 18:25:25 -0500 Subject: [PATCH] kv: add read spans to ReadSummary structure Informs #61986. This PR adds read spans to the ReadSummary message, which is a compressed summary all read requests served on a range. The reads are held in sorted order and are non-overlapping, permitting a linear-time merge operation. With the inclusion of read spans, the ReadSummary message can now be used to serialize a leaseholder's timestamp cache. The merge operation is implemented in a subsequent commit. Release note: None --- pkg/BUILD.bazel | 2 + pkg/kv/kvserver/readsummary/rspb/BUILD.bazel | 15 ++- pkg/kv/kvserver/readsummary/rspb/summary.go | 65 +++++++---- .../kvserver/readsummary/rspb/summary.proto | 19 +++- .../kvserver/readsummary/rspb/summary_test.go | 104 ++++++++++++++++++ 5 files changed, 181 insertions(+), 24 deletions(-) create mode 100644 pkg/kv/kvserver/readsummary/rspb/summary_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 44c0c7ee9f9c..dde583f7af40 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -258,6 +258,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/rangefeed:rangefeed_test", "//pkg/kv/kvserver/rangelog:rangelog_test", "//pkg/kv/kvserver/rditer:rditer_test", + "//pkg/kv/kvserver/readsummary/rspb:rspb_test", "//pkg/kv/kvserver/replicastats:replicastats_test", "//pkg/kv/kvserver/reports:reports_test", "//pkg/kv/kvserver/spanlatch:spanlatch_test", @@ -1456,6 +1457,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/rditer:rditer", "//pkg/kv/kvserver/rditer:rditer_test", "//pkg/kv/kvserver/readsummary/rspb:rspb", + "//pkg/kv/kvserver/readsummary/rspb:rspb_test", "//pkg/kv/kvserver/readsummary:readsummary", "//pkg/kv/kvserver/replicastats:replicastats", "//pkg/kv/kvserver/replicastats:replicastats_test", diff --git a/pkg/kv/kvserver/readsummary/rspb/BUILD.bazel b/pkg/kv/kvserver/readsummary/rspb/BUILD.bazel index acc363d5d27e..3757f412d9ad 100644 --- a/pkg/kv/kvserver/readsummary/rspb/BUILD.bazel +++ b/pkg/kv/kvserver/readsummary/rspb/BUILD.bazel @@ -1,5 +1,5 @@ load("@rules_proto//proto:defs.bzl", "proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") proto_library( @@ -21,6 +21,7 @@ go_proto_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/hlc", + "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", ], ) @@ -32,7 +33,17 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb", visibility = ["//visibility:public"], deps = [ + "//pkg/util/buildutil", "//pkg/util/hlc", - "//pkg/util/log", + ], +) + +go_test( + name = "rspb_test", + srcs = ["summary_test.go"], + embed = [":rspb"], + deps = [ + "//pkg/util/hlc", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/readsummary/rspb/summary.go b/pkg/kv/kvserver/readsummary/rspb/summary.go index 45cbd0b4cd8d..9f9f1d266a8b 100644 --- a/pkg/kv/kvserver/readsummary/rspb/summary.go +++ b/pkg/kv/kvserver/readsummary/rspb/summary.go @@ -11,12 +11,15 @@ package rspb import ( - "context" + "bytes" + "fmt" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" ) +const assertEnabled = buildutil.CrdbTestBuild + // FromTimestamp constructs a read summary from the provided timestamp, treating // the argument as the low water mark of each segment in the summary. func FromTimestamp(ts hlc.Timestamp) ReadSummary { @@ -28,10 +31,11 @@ func FromTimestamp(ts hlc.Timestamp) ReadSummary { } // Clone performs a deep-copy of the receiver. -func (c ReadSummary) Clone() *ReadSummary { - // NOTE: When ReadSummary is updated to include pointers to non-contiguous - // memory, this will need to be updated. - return &c +func (c *ReadSummary) Clone() *ReadSummary { + res := new(ReadSummary) + res.Local = c.Local.Clone() + res.Global = c.Global.Clone() + return res } // Merge combines two read summaries, resulting in a single summary that @@ -42,23 +46,42 @@ func (c *ReadSummary) Merge(o ReadSummary) { c.Global.merge(o.Global) } -func (c *Segment) merge(o Segment) { - c.LowWater.Forward(o.LowWater) -} - -// AssertNoRegression asserts that all reads in the parameter's summary are -// reflected in the receiver's summary with at least as high of a timestamp. -func (c *ReadSummary) AssertNoRegression(ctx context.Context, o ReadSummary) { - c.Local.assertNoRegression(ctx, o.Local, "local") - c.Global.assertNoRegression(ctx, o.Global, "global") +// AddReadSpan adds a read span to the segment. The span must be sorted after +// all existing spans in the segment and must not overlap with any existing +// spans. +func (c *Segment) AddReadSpan(s ReadSpan) { + if assertEnabled { + if len(s.EndKey) != 0 { + if bytes.Compare(s.Key, s.EndKey) >= 0 { + panic(fmt.Sprintf("inverted span: %v", s)) + } + } + if len(c.ReadSpans) > 0 { + last := c.ReadSpans[len(c.ReadSpans)-1] + if bytes.Compare(last.Key, s.Key) >= 0 { + panic(fmt.Sprintf("out of order spans: %v %v", last, s)) + } + if len(last.EndKey) != 0 && bytes.Compare(last.EndKey, s.Key) > 0 { + panic(fmt.Sprintf("overlapping spans: %v %v", last, s)) + } + } + } + if s.Timestamp.LessEq(c.LowWater) { + return // ignore + } + c.ReadSpans = append(c.ReadSpans, s) } -func (c *Segment) assertNoRegression(ctx context.Context, o Segment, name string) { - if c.LowWater.Less(o.LowWater) { - log.Fatalf(ctx, "read summary regression in %s segment, was %s, now %s", - name, o.LowWater, c.LowWater) +// Clone performs a deep-copy of the receiver. +func (c *Segment) Clone() Segment { + res := *c + if len(c.ReadSpans) != 0 { + res.ReadSpans = make([]ReadSpan, len(c.ReadSpans)) + copy(res.ReadSpans, c.ReadSpans) } + return res } -// Ignore unused warning. -var _ = (*ReadSummary).AssertNoRegression +func (c *Segment) merge(o Segment) { + c.LowWater.Forward(o.LowWater) +} diff --git a/pkg/kv/kvserver/readsummary/rspb/summary.proto b/pkg/kv/kvserver/readsummary/rspb/summary.proto index b2745769d931..9833e53b4135 100644 --- a/pkg/kv/kvserver/readsummary/rspb/summary.proto +++ b/pkg/kv/kvserver/readsummary/rspb/summary.proto @@ -58,5 +58,22 @@ message Segment { option (gogoproto.equal) = true; util.hlc.Timestamp low_water = 1 [(gogoproto.nullable) = false]; - // TODO(nvanbenschoten): add higher resolution portion. + // The spans are sorted by key and are non-overlapping. + repeated ReadSpan read_spans = 2 [(gogoproto.nullable) = false]; +} + +// ReadSpan is a key span that has been read at a timestamp. The span carries an +// optional transaction ID that signifies which transaction performed the read. +// This is used to exclude a given read from being considered as a source of a +// read-write conflict if a transaction returns to write to a key that it had +// previously read from. +message ReadSpan { + option (gogoproto.equal) = true; + + bytes key = 1; + bytes end_key = 2; + util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; + bytes txn_id = 4 [(gogoproto.customname) = "TxnID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false]; } diff --git a/pkg/kv/kvserver/readsummary/rspb/summary_test.go b/pkg/kv/kvserver/readsummary/rspb/summary_test.go new file mode 100644 index 000000000000..7332e0beeae7 --- /dev/null +++ b/pkg/kv/kvserver/readsummary/rspb/summary_test.go @@ -0,0 +1,104 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rspb + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/stretchr/testify/require" +) + +var ( + keyA = []byte("a") + keyB = []byte("b") + keyC = []byte("c") + keyD = []byte("d") + keyE = []byte("e") + ts10 = hlc.Timestamp{WallTime: 10} + ts20 = hlc.Timestamp{WallTime: 20} + ts30 = hlc.Timestamp{WallTime: 30} + ts40 = hlc.Timestamp{WallTime: 40} +) + +func TestSegmentAddReadSpan(t *testing.T) { + seg := Segment{LowWater: ts20} + + // Span at timestamp below low water mark is ignored. + seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts10}) + require.Len(t, seg.ReadSpans, 0) + + // Span at timestamp equal to low water mark is ignored. + seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts20}) + require.Len(t, seg.ReadSpans, 0) + + // Spans at timestamps above low water mark are included. + seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts30}) + require.Len(t, seg.ReadSpans, 1) + + // Under test builds, assertions are enabled. + if assertEnabled { + // ["e", "d") is inverted. + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyE, EndKey: keyD, Timestamp: ts30}) + }) + // ["e", "e") is inverted. + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyE, EndKey: keyE, Timestamp: ts30}) + }) + // ["a"] before ["b", "d"). + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyA, Timestamp: ts30}) + }) + // ["b"] overlaps ["b", "d"). + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyB, Timestamp: ts30}) + }) + // ["c"] overlaps ["b", "d"). + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyC, Timestamp: ts30}) + }) + // ["c", "e") overlaps ["b", "d"). + require.Panics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyC, EndKey: keyE, Timestamp: ts30}) + }) + // ["d", "e") does not overlap ["b", "d"). + require.NotPanics(t, func() { + seg.AddReadSpan(ReadSpan{Key: keyD, EndKey: keyE, Timestamp: ts30}) + }) + } +} + +func TestSegmentClone(t *testing.T) { + var orig Segment + orig.LowWater = ts10 + orig.AddReadSpan(ReadSpan{Key: keyB, Timestamp: ts30}) + + requireOrig := func(c Segment) { + require.Equal(t, ts10, c.LowWater) + require.Len(t, c.ReadSpans, 1) + require.Equal(t, ReadSpan{Key: keyB, Timestamp: ts30}, c.ReadSpans[0]) + } + requireOrig(orig) + + // Clone and assert equality. + clone := orig.Clone() + require.Equal(t, orig, clone) + requireOrig(clone) + + // Mutate the original. + orig.LowWater = ts20 + orig.AddReadSpan(ReadSpan{Key: keyE, Timestamp: ts30}) + + // Assert clone has not changed. + require.NotEqual(t, orig, clone) + requireOrig(clone) +}