Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfigkvsubscriber: teach the KVSubscriber about system span configs #76942

Merged
merged 4 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go_library(
name = "spanconfigkvsubscriber",
srcs = [
"kvsubscriber.go",
"span_config_decoder.go",
"spanconfigdecoder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -37,7 +37,7 @@ go_test(
"datadriven_test.go",
"kvsubscriber_test.go",
"main_test.go",
"span_config_decoder_test.go",
"spanconfigdecoder_test.go",
],
data = glob(["testdata/**"]),
embed = [":spanconfigkvsubscriber"],
Expand Down
16 changes: 5 additions & 11 deletions pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
Expand Down Expand Up @@ -46,8 +45,9 @@ import (
// delete [c,e)
// upsert [c,d):C
// upsert [d,e):D
// upsert {entire-keyspace}:X
// delete {source=1,target=20}
// ----
// ok
//
// get
// span [a,b)
Expand Down Expand Up @@ -99,6 +99,7 @@ import (
// Text of the form [a,b) and [a,b):C correspond to spans and span config
// records; see spanconfigtestutils.Parse{Span,Config,SpanConfigRecord} for more
// details.
// TODO(arul): Add ability to express tenant spans to this datadriven test.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -142,7 +143,7 @@ func TestDataDriven(t *testing.T) {
ts.RangeFeedFactory().(*rangefeed.Factory),
dummyTableID,
10<<20, /* 10 MB */
spanconfigtestutils.ParseConfig(t, "MISSING"),
spanconfigtestutils.ParseConfig(t, "FALLBACK"),
&spanconfig.TestingKnobs{
KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{
OnTimestampAdvance: func(ts hlc.Timestamp) {
Expand Down Expand Up @@ -241,14 +242,7 @@ func TestDataDriven(t *testing.T) {
if i != 0 && receivedUpdates[i].Equal(receivedUpdates[i-1]) {
continue // de-dup updates
}

var spanStr string
if update.Equal(keys.EverythingSpan) {
spanStr = update.String()
} else {
spanStr = spanconfigtestutils.PrintSpan(update)
}
output.WriteString(fmt.Sprintf("%s\n", spanStr))
output.WriteString(fmt.Sprintf("%s\n", spanconfigtestutils.PrintSpan(update)))
}

return output.String()
Expand Down
9 changes: 2 additions & 7 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,10 @@ func (s *KVSubscriber) handlePartialUpdate(
handlers := s.mu.handlers
s.mu.Unlock()

for _, h := range handlers {
for _, handler := range handlers {
for _, ev := range events {
// TODO(arul): In the future, once we start reacting to system span
// configurations, we'll want to invoke handlers with the correct span
// here as well.
target := ev.(*bufferEvent).Update.Target
if target.IsSpanTarget() {
h.invoke(ctx, target.GetSpan())
}
handler.invoke(ctx, target.KeyspaceTargeted())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)

// TestSpanConfigDecoder verifies that we can decode rows stored in the
// system.span_configurations table.
func TestSpanConfigDecoder(t *testing.T) {
// TestSpanConfigDecoder verifies that we can decode span target rows stored in
// the system.span_configurations table.
func TestDecodeSpanTargets(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
Expand All @@ -46,16 +48,6 @@ func TestSpanConfigDecoder(t *testing.T) {
`SELECT table_id FROM crdb_internal.tables WHERE name = '%s'`, dummyTableName),
).Scan(&dummyTableID)

getCount := func() int {
q := tdb.Query(t, fmt.Sprintf(`SELECT count(*) FROM %s`, dummyTableName))
q.Next()
var c int
require.Nil(t, q.Scan(&c))
require.Nil(t, q.Close())
return c
}
initialCount := getCount()

key := tc.ScratchRange(t)
rng := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(key))
span := rng.Desc().RSpan().AsRawSpanWithNoLocals()
Expand All @@ -65,18 +57,17 @@ func TestSpanConfigDecoder(t *testing.T) {
require.NoError(t, err)
tdb.Exec(t, fmt.Sprintf(`UPSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`,
dummyTableName), span.Key, span.EndKey, buf)
require.Equal(t, initialCount+1, getCount())

k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID)
rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
require.Len(t, rows, initialCount+1)
require.Len(t, rows, 1)

last := rows[len(rows)-1]
row := rows[0]
got, err := spanconfigkvsubscriber.TestingDecoderFn()(
roachpb.KeyValue{
Key: last.Key,
Value: *last.Value,
Key: row.Key,
Value: *row.Value,
},
)
require.NoError(t, err)
Expand All @@ -86,6 +77,84 @@ func TestSpanConfigDecoder(t *testing.T) {
"expected config=%s, got config=%s", conf, got.Config)
}

// TestSpanConfigDecoder verifies that we can decode system target rows stored
// in the system.span_configurations table.
func TestDecodeSystemTargets(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
const dummyTableName = "dummy_span_configurations"
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))

ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{
WallTime: int64(nanos),
}
}

for i, systemTarget := range []spanconfig.SystemTarget{
// Tenant targeting its keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.MakeTenantID(10), roachpb.MakeTenantID(10),
),
// System tenant targeting its keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.SystemTenantID, roachpb.SystemTenantID,
),
// System tenant targeting a secondary tenant's keyspace.
spanconfig.TestingMakeTenantKeyspaceTargetOrFatal(
t, roachpb.SystemTenantID, roachpb.MakeTenantID(10),
),
// System tenant targeting the entire keyspace.
spanconfig.MakeEntireKeyspaceTarget(),
} {
// We start with a fresh table for each run.
tdb.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %s", dummyTableName))
tdb.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.span_configurations INCLUDING ALL)", dummyTableName))
var dummyTableID uint32
tdb.QueryRow(t, fmt.Sprintf(
`SELECT id FROM system.namespace WHERE name = '%s'`, dummyTableName),
).Scan(&dummyTableID)

// Write the record.
conf := roachpb.SpanConfig{
GCPolicy: roachpb.GCPolicy{
ProtectionPolicies: []roachpb.ProtectionPolicy{
{ProtectedTimestamp: ts(10)},
{ProtectedTimestamp: ts(i + 500)},
},
},
}
buf, err := protoutil.Marshal(&conf)
require.NoError(t, err)
encodedSp := spanconfig.MakeTargetFromSystemTarget(systemTarget).Encode()
tdb.Exec(t, fmt.Sprintf(`INSERT INTO %s (start_key, end_key, config) VALUES ($1, $2, $3)`,
dummyTableName), encodedSp.Key, encodedSp.EndKey, buf)

// Read the record.
k := keys.SystemSQLCodec.IndexPrefix(dummyTableID, keys.SpanConfigurationsTablePrimaryKeyIndexID)
rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
require.Len(t, rows, 1)

// Decode and verify.
row := rows[0]
got, err := spanconfigkvsubscriber.TestingDecoderFn()(
roachpb.KeyValue{
Key: row.Key,
Value: *row.Value,
},
)
require.NoError(t, err)

require.Equal(t, conf, got.Config)
require.True(t, got.Target.IsSystemTarget())
require.Equal(t, systemTarget, got.Target.GetSystemTarget())
}
}

func BenchmarkSpanConfigDecoder(b *testing.B) {
defer log.Scope(b).Close(b)

Expand Down
6 changes: 3 additions & 3 deletions pkg/spanconfig/spanconfigkvsubscriber/testdata/basic
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ span [a,f)

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand All @@ -40,9 +40,9 @@ delete [d,f)

updates
----
/M{in-ax}
[/Min,/Max)
[d,f)

store-reader key=d
----
conf=MISSING
conf=FALLBACK
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ upsert [d,f):D

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand Down Expand Up @@ -59,23 +59,23 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
conf=B

store-reader key=d
----
conf=MISSING
conf=FALLBACK

update
upsert [a,c):C
----

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)

store-reader key=a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
conf=B

store-reader key=d
----
conf=MISSING
conf=FALLBACK
Loading