Skip to content

Commit

Permalink
Merge #76942
Browse files Browse the repository at this point in the history
76942: spanconfigkvsubscriber: teach the KVSubscriber about system span configs  r=arulajmani a=arulajmani

See individual commits for details. 

Co-authored-by: arulajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Feb 25, 2022
2 parents 9aae74b + cc87693 commit 95b0430
Show file tree
Hide file tree
Showing 19 changed files with 436 additions and 104 deletions.
1 change: 1 addition & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go_library(
name = "spanconfigkvsubscriber",
srcs = [
"kvsubscriber.go",
"span_config_decoder.go",
"spanconfigdecoder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -37,7 +37,7 @@ go_test(
"datadriven_test.go",
"kvsubscriber_test.go",
"main_test.go",
"span_config_decoder_test.go",
"spanconfigdecoder_test.go",
],
data = glob(["testdata/**"]),
embed = [":spanconfigkvsubscriber"],
Expand Down
16 changes: 5 additions & 11 deletions pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
Expand Down Expand Up @@ -46,8 +45,9 @@ import (
// delete [c,e)
// upsert [c,d):C
// upsert [d,e):D
// upsert {entire-keyspace}:X
// delete {source=1,target=20}
// ----
// ok
//
// get
// span [a,b)
Expand Down Expand Up @@ -99,6 +99,7 @@ import (
// Text of the form [a,b) and [a,b):C correspond to spans and span config
// records; see spanconfigtestutils.Parse{Span,Config,SpanConfigRecord} for more
// details.
// TODO(arul): Add ability to express tenant spans to this datadriven test.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -142,7 +143,7 @@ func TestDataDriven(t *testing.T) {
ts.RangeFeedFactory().(*rangefeed.Factory),
dummyTableID,
10<<20, /* 10 MB */
spanconfigtestutils.ParseConfig(t, "MISSING"),
spanconfigtestutils.ParseConfig(t, "FALLBACK"),
&spanconfig.TestingKnobs{
KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{
OnTimestampAdvance: func(ts hlc.Timestamp) {
Expand Down Expand Up @@ -241,14 +242,7 @@ func TestDataDriven(t *testing.T) {
if i != 0 && receivedUpdates[i].Equal(receivedUpdates[i-1]) {
continue // de-dup updates
}

var spanStr string
if update.Equal(keys.EverythingSpan) {
spanStr = update.String()
} else {
spanStr = spanconfigtestutils.PrintSpan(update)
}
output.WriteString(fmt.Sprintf("%s\n", spanStr))
output.WriteString(fmt.Sprintf("%s\n", spanconfigtestutils.PrintSpan(update)))
}

return output.String()
Expand Down
9 changes: 2 additions & 7 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,10 @@ func (s *KVSubscriber) handlePartialUpdate(
handlers := s.mu.handlers
s.mu.Unlock()

for _, h := range handlers {
for _, handler := range handlers {
for _, ev := range events {
// TODO(arul): In the future, once we start reacting to system span
// configurations, we'll want to invoke handlers with the correct span
// here as well.
target := ev.(*bufferEvent).Update.Target
if target.IsSpanTarget() {
h.invoke(ctx, target.GetSpan())
}
handler.invoke(ctx, target.KeyspaceTargeted())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)

// TestSpanConfigDecoder verifies that we can decode rows stored in the
// system.span_configurations table.
func TestSpanConfigDecoder(t *testing.T) {
// TestSpanConfigDecoder verifies that we can decode span target rows stored in
// the system.span_configurations table.
func TestDecodeSpanTargets(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
Expand All @@ -46,16 +48,6 @@ func TestSpanConfigDecoder(t *testing.T) {
`SELECT table_id FROM crdb_internal.tables WHERE name = '%s'`, dummyTableName),
).Scan(&dummyTableID)

getCount := func() int {
q := tdb.Query(t, fmt.Sprintf(`SELECT count(*) FROM %s`, dummyTableName))
q.Next()
var c int
require.Nil(t, q.Scan(&c))
require.Nil(t, q.Close())
return c
}
initialCount := getCount()

key := tc.ScratchRange(t)
rng := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(key))
span := rng.Desc().RSpan().AsRawSpanWithNoLocals()
Expand All @@ -65,18 +57,17 @@ func TestSpanConfigDecoder(t *testing.T) {
require.NoError(t, err)
tdb.Exec(t, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`,
dummyTableName), span.Key, span.EndKey, buf)
require.Equal(t, initialCount+1, getCount())

k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID)
rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
require.Len(t, rows, initialCount+1)
require.Len(t, rows, 1)

last := rows[len(rows)-1]
row := rows[0]
got, err := spanconfigkvsubscriber.TestingDecoderFn()(
roachpb.KeyValue{
Key: last.Key,
Value: *last.Value,
Key: row.Key,
Value: *row.Value,
},
)
require.NoError(t, err)
Expand All @@ -86,6 +77,84 @@ func TestSpanConfigDecoder(t *testing.T) {
"expected config=%s, got config=%s", conf, got.Config)
}

// TestSpanConfigDecoder verifies that we can decode system target rows stored
// in the system.span_configurations table.
func TestDecodeSystemTargets(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
const dummyTableName = "dummy_span_configurations"
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))

ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{
WallTime: int64(nanos),
}
}

for i, systemTarget := range []spanconfig.SystemTarget{
// Tenant targeting its keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.MakeTenantID(10), roachpb.MakeTenantID(10),
),
// System tenant targeting its keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.SystemTenantID, roachpb.SystemTenantID,
),
// System tenant targeting a secondary tenant's keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.SystemTenantID, roachpb.MakeTenantID(10),
),
// System tenant targeting the entire keyspace.
spanconfig.MakeEntireKeyspaceTarget(),
} {
// We start with a fresh table for each run.
tdb.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %s", dummyTableName))
tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName))
var dummyTableID uint32
tdb.QueryRow(t, fmt.Sprintf(
`SELECT id FROM system.namespace WHERE name = '%s'`, dummyTableName),
).Scan(&dummyTableID)

// Write the record.
conf := roachpb.SpanConfig{
GCPolicy: roachpb.GCPolicy{
ProtectionPolicies: []roachpb.ProtectionPolicy{
{ProtectedTimestamp: ts(10)},
{ProtectedTimestamp: ts(i + 500)},
},
},
}
buf, err := protoutil.Marshal(&conf)
require.NoError(t, err)
encodedSp := spanconfig.MakeTargetFromSystemTarget(systemTarget).Encode()
tdb.Exec(t, fmt.Sprintf(`INSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`,
dummyTableName), encodedSp.Key, encodedSp.EndKey, buf)

// Read the record.
k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID)
rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
require.Len(t, rows, 1)

// Decode and verify.
row := rows[0]
got, err := spanconfigkvsubscriber.TestingDecoderFn()(
roachpb.KeyValue{
Key: row.Key,
Value: *row.Value,
},
)
require.NoError(t, err)

require.Equal(t, conf, got.Config)
require.True(t, got.Target.IsSystemTarget())
require.Equal(t, systemTarget, got.Target.GetSystemTarget())
}
}

func BenchmarkSpanConfigDecoder(b *testing.B) {
defer log.Scope(b).Close(b)

Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigkvsubscriber/testdata/basic
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ span [a,f)

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand All @@ -40,9 +40,9 @@ delete [d,f)

updates
----
/M{in-ax}
[/Min,/Max)
[d,f)

store-reader key=d
----
conf=MISSING
conf=FALLBACK
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ upsert [d,f):D

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand Down Expand Up @@ -59,23 +59,23 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
conf=B

store-reader key=d
----
conf=MISSING
conf=FALLBACK

update
upsert [a,c):C
----

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)

store-reader key=a
Expand Down
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigkvsubscriber/testdata/initial_state
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
conf=B

store-reader key=d
----
conf=MISSING
conf=FALLBACK
Loading

0 comments on commit 95b0430

Please sign in to comment.