Skip to content

Commit

Permalink
kv: add read spans to ReadSummary structure
Browse files Browse the repository at this point in the history
Informs cockroachdb#61986.

This PR adds read spans to the ReadSummary message, which is a compressed
summary all read requests served on a range. The reads are held in sorted order
and are non-overlapping, permitting a linear-time merge operation. With the
inclusion of read spans, the ReadSummary message can now be used to serialize
a leaseholder's timestamp cache.

The merge operation is implemented in a subsequent commit.

Release note: None
  • Loading branch information
nvanbenschoten committed Jan 23, 2024
1 parent e6a9c92 commit 0950a1e
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 24 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/rangefeed:rangefeed_test",
"//pkg/kv/kvserver/rangelog:rangelog_test",
"//pkg/kv/kvserver/rditer:rditer_test",
"//pkg/kv/kvserver/readsummary/rspb:rspb_test",
"//pkg/kv/kvserver/replicastats:replicastats_test",
"//pkg/kv/kvserver/reports:reports_test",
"//pkg/kv/kvserver/spanlatch:spanlatch_test",
Expand Down Expand Up @@ -1456,6 +1457,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/rditer:rditer",
"//pkg/kv/kvserver/rditer:rditer_test",
"//pkg/kv/kvserver/readsummary/rspb:rspb",
"//pkg/kv/kvserver/readsummary/rspb:rspb_test",
"//pkg/kv/kvserver/readsummary:readsummary",
"//pkg/kv/kvserver/replicastats:replicastats",
"//pkg/kv/kvserver/replicastats:replicastats_test",
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/readsummary/rspb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
Expand All @@ -21,6 +21,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/uuid", # keep
"@com_github_gogo_protobuf//gogoproto",
],
)
Expand All @@ -32,7 +33,17 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/log",
],
)

go_test(
name = "rspb_test",
srcs = ["summary_test.go"],
embed = [":rspb"],
deps = [
"//pkg/util/hlc",
"@com_github_stretchr_testify//require",
],
)
65 changes: 44 additions & 21 deletions pkg/kv/kvserver/readsummary/rspb/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
package rspb

import (
"context"
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

const assertEnabled = buildutil.CrdbTestBuild

// FromTimestamp constructs a read summary from the provided timestamp, treating
// the argument as the low water mark of each segment in the summary.
func FromTimestamp(ts hlc.Timestamp) ReadSummary {
Expand All @@ -28,10 +31,11 @@ func FromTimestamp(ts hlc.Timestamp) ReadSummary {
}

// Clone performs a deep-copy of the receiver.
func (c ReadSummary) Clone() *ReadSummary {
// NOTE: When ReadSummary is updated to include pointers to non-contiguous
// memory, this will need to be updated.
return &c
func (c *ReadSummary) Clone() *ReadSummary {
res := new(ReadSummary)
res.Local = c.Local.Clone()
res.Global = c.Global.Clone()
return res
}

// Merge combines two read summaries, resulting in a single summary that
Expand All @@ -42,23 +46,42 @@ func (c *ReadSummary) Merge(o ReadSummary) {
c.Global.merge(o.Global)
}

func (c *Segment) merge(o Segment) {
c.LowWater.Forward(o.LowWater)
}

// AssertNoRegression asserts that all reads in the parameter's summary are
// reflected in the receiver's summary with at least as high of a timestamp.
func (c *ReadSummary) AssertNoRegression(ctx context.Context, o ReadSummary) {
c.Local.assertNoRegression(ctx, o.Local, "local")
c.Global.assertNoRegression(ctx, o.Global, "global")
// AddReadSpan adds a read span to the segment. The span must be sorted after
// all existing spans in the segment and must not overlap with any existing
// spans.
func (c *Segment) AddReadSpan(s ReadSpan) {
if assertEnabled {
if len(s.EndKey) != 0 {
if bytes.Compare(s.Key, s.EndKey) >= 0 {
panic(fmt.Sprintf("inverted span: %v", s))
}
}
if len(c.ReadSpans) > 0 {
last := c.ReadSpans[len(c.ReadSpans)-1]
if bytes.Compare(last.Key, s.Key) >= 0 {
panic(fmt.Sprintf("out of order spans: %v %v", last, s))
}
if len(last.EndKey) != 0 && bytes.Compare(last.EndKey, s.Key) > 0 {
panic(fmt.Sprintf("overlapping spans: %v %v", last, s))
}
}
}
if s.Timestamp.LessEq(c.LowWater) {
return // ignore
}
c.ReadSpans = append(c.ReadSpans, s)
}

func (c *Segment) assertNoRegression(ctx context.Context, o Segment, name string) {
if c.LowWater.Less(o.LowWater) {
log.Fatalf(ctx, "read summary regression in %s segment, was %s, now %s",
name, o.LowWater, c.LowWater)
// Clone performs a deep-copy of the receiver.
func (c *Segment) Clone() Segment {
res := *c
if len(c.ReadSpans) != 0 {
res.ReadSpans = make([]ReadSpan, len(c.ReadSpans))
copy(res.ReadSpans, c.ReadSpans)
}
return res
}

// Ignore unused warning.
var _ = (*ReadSummary).AssertNoRegression
func (c *Segment) merge(o Segment) {
c.LowWater.Forward(o.LowWater)
}
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/readsummary/rspb/summary.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,22 @@ message Segment {
option (gogoproto.equal) = true;

util.hlc.Timestamp low_water = 1 [(gogoproto.nullable) = false];
// TODO(nvanbenschoten): add higher resolution portion.
// The spans are sorted by key and are non-overlapping.
repeated ReadSpan read_spans = 2 [(gogoproto.nullable) = false];
}

// ReadSpan is a key span that has been read at a timestamp. The span carries an
// optional transaction ID that signifies which transaction performed the read.
// This is used to exclude a given read from being considered as a source of a
// read-write conflict if a transaction returns to write to a key that it had
// previously read from.
message ReadSpan {
option (gogoproto.equal) = true;

bytes key = 1;
bytes end_key = 2;
util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false];
bytes txn_id = 4 [(gogoproto.customname) = "TxnID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.nullable) = false];
}
104 changes: 104 additions & 0 deletions pkg/kv/kvserver/readsummary/rspb/summary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rspb

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/stretchr/testify/require"
)

var (
keyA = []byte("a")
keyB = []byte("b")
keyC = []byte("c")
keyD = []byte("d")
keyE = []byte("e")
ts10 = hlc.Timestamp{WallTime: 10}
ts20 = hlc.Timestamp{WallTime: 20}
ts30 = hlc.Timestamp{WallTime: 30}
ts40 = hlc.Timestamp{WallTime: 40}
)

func TestSegmentAddReadSpan(t *testing.T) {
seg := Segment{LowWater: ts20}

// Span at timestamp below low water mark is ignored.
seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts10})
require.Len(t, seg.ReadSpans, 0)

// Span at timestamp equal to low water mark is ignored.
seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts20})
require.Len(t, seg.ReadSpans, 0)

// Spans at timestamps above low water mark are included.
seg.AddReadSpan(ReadSpan{Key: keyB, EndKey: keyD, Timestamp: ts30})
require.Len(t, seg.ReadSpans, 1)

// Under test builds, assertions are enabled.
if assertEnabled {
// ["e", "d") is inverted.
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyE, EndKey: keyD, Timestamp: ts30})
})
// ["e", "e") is inverted.
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyE, EndKey: keyE, Timestamp: ts30})
})
// ["a"] before ["b", "d").
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyA, Timestamp: ts30})
})
// ["b"] overlaps ["b", "d").
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyB, Timestamp: ts30})
})
// ["c"] overlaps ["b", "d").
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyC, Timestamp: ts30})
})
// ["c", "e") overlaps ["b", "d").
require.Panics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyC, EndKey: keyE, Timestamp: ts30})
})
// ["d", "e") does not overlap ["b", "d").
require.NotPanics(t, func() {
seg.AddReadSpan(ReadSpan{Key: keyD, EndKey: keyE, Timestamp: ts30})
})
}
}

func TestSegmentClone(t *testing.T) {
var orig Segment
orig.LowWater = ts10
orig.AddReadSpan(ReadSpan{Key: keyB, Timestamp: ts30})

requireOrig := func(c Segment) {
require.Equal(t, ts10, c.LowWater)
require.Len(t, c.ReadSpans, 1)
require.Equal(t, ReadSpan{Key: keyB, Timestamp: ts30}, c.ReadSpans[0])
}
requireOrig(orig)

// Clone and assert equality.
clone := orig.Clone()
require.Equal(t, orig, clone)
requireOrig(clone)

// Mutate the original.
orig.LowWater = ts20
orig.AddReadSpan(ReadSpan{Key: keyE, Timestamp: ts30})

// Assert clone has not changed.
require.NotEqual(t, orig, clone)
requireOrig(clone)
}

0 comments on commit 0950a1e

Please sign in to comment.