Skip to content

Commit

Permalink
spanconfigreconciler{ccl}: apply system span config diffs to the store
Browse files Browse the repository at this point in the history
This change teaches the reconciler about system span configs. Concretely,
we make the following changes:

- A full reconciliation when checking for existing span configurations now
asks for SpanConfigs corresponding to the SystemTargets relevant to the tenant.

For the host tenant this includes the SystemTarget for the `entire-keyspace` as
well as the SystemTarget for span configs installed by the host tenant on its
tenant keyspace, and on other secondary tenant keyspaces.

For secondary tenants this only includes the SystemTarget for span configs installed
by it on its own tenant keyspace.

- During incremental reconciliation, before applying our updates to the Store,
we now also check for "missing protected timestamp system targets". These correspond
to protected timestamp records that target a `Cluster` or a `Tenant` but no longer
exist in the system.protected_ts_records table as they have been released by the client.
For every such unique missing system target we apply a spanconfig.Deletion to the Store.

In order to make the above possible, this change moves the ptsStateReader from the
`spanconfigsqltranslator` package, to the top level `spanconfig` package.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Feb 23, 2022
1 parent ad0cdd0 commit 8b7d2e5
Show file tree
Hide file tree
Showing 18 changed files with 596 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestDataDriven(t *testing.T) {
// configuration changes
testutils.SucceedsSoon(t, func() error {
for _, tenant := range spanConfigTestCluster.Tenants() {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.IsEmpty() {
continue // reconciler wasn't started
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestDataDriven(t *testing.T) {
case "exec-sql":
// Run under an explicit transaction -- we rely on having a
// single timestamp for the statements (see
// tenant.TimestampAfterLastExec) for ordering guarantees.
// tenant.TimestampAfterLastSQLChange) for ordering guarantees.
tenant.Exec(fmt.Sprintf("BEGIN; %s; COMMIT;", d.Input))

case "query-sql":
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
35 changes: 29 additions & 6 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
Expand Down Expand Up @@ -85,11 +86,13 @@ func TestDataDriven(t *testing.T) {
// Checkpoint noops frequently; speeds this test up.
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
}
ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: scKnobs,
ProtectedTS: ptsKnobs,
},
},
})
Expand All @@ -101,7 +104,7 @@ func TestDataDriven(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, ptsKnobs)
defer spanConfigTestCluster.Cleanup()

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
Expand Down Expand Up @@ -131,7 +134,7 @@ func TestDataDriven(t *testing.T) {
case "exec-sql":
// Run under an explicit transaction -- we rely on having a
// single timestamp for the statements (see
// tenant.TimestampAfterLastExec) for ordering guarantees.
// tenant.TimestampAfterLastSQLChange) for ordering guarantees.
tenant.Exec(fmt.Sprintf("BEGIN; %s; COMMIT;", d.Input))

case "query-sql":
Expand Down Expand Up @@ -159,7 +162,7 @@ func TestDataDriven(t *testing.T) {

case "mutations":
testutils.SucceedsSoon(t, func() error {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.Less(lastExec) {
return errors.Newf("last checkpoint timestamp (%s) lagging last sql execution (%s)",
lastCheckpoint.GoTime(), lastExec.GoTime())
Expand All @@ -179,7 +182,7 @@ func TestDataDriven(t *testing.T) {
// tenant checkpoints to cross their last execution
// timestamp.
for _, tenant := range spanConfigTestCluster.Tenants() {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.IsEmpty() {
continue // reconciler wasn't started
}
Expand All @@ -200,11 +203,31 @@ func TestDataDriven(t *testing.T) {

lines := make([]string, len(records))
for i, record := range records {
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan().String(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
switch {
case record.Target.IsSpanTarget():
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
case record.Target.IsSystemTarget():
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSystemTarget(),
spanconfigtestutils.PrintSystemSpanConfigDiffedAgainstDefault(record.Config))
default:
panic("unsupported target type")
}
}
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", lines)

case "protect":
var recordID string
var protectTS int
d.ScanArgs(t, "record-id", &recordID)
d.ScanArgs(t, "ts", &protectTS)
target := spanconfigtestutils.ParseProtectionTarget(t, d.Input)
tenant.MakeProtectedTimestampRecordAndProtect(ctx, recordID, protectTS, target)

case "release":
var recordID string
d.ScanArgs(t, "record-id", &recordID)
tenant.ReleaseProtectedTimestampRecord(ctx, recordID)
default:
t.Fatalf("unknown command: %s", d.Cmd)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Test span configs in the presence of multiple secondary tenants, each running
# their own reconciliation loop.

reconcile
----

mutations discard
----

initialize tenant=10
----

# Ensure that the host tenant observes no mutations.
mutations
----

# We should observe placeholder entries for both tenants (installed when
# creating tenant records).
state offset=47
----
...
/Table/5{0-1} database system (host)
/Tenant/10{-"\x00"} database system (tenant)

# Write a protected timestamp record on the system tenant cluster.
protect record-id=1 ts=1
cluster
----

# Write a protected timestamp record on the system tenant keyspace, and on the
# secondary tenant keyspace.
protect record-id=2 ts=2
tenants 1,10
----

# Start the reconciliation loop for the secondary tenant.
reconcile tenant=10
----

# We should see protected timestamp record mutations as the host tenant.
mutations
----
upsert {entire-keyspace} pts=[1]
upsert {source=1,target=1} pts=[2]
upsert {source=1,target=10} pts=[2]

# We shouldn't see any protected timestamp record mutations as a secondary
# tenant.
mutations tenant=10
----
delete /Tenant/10{-"\x00"}
upsert /Tenant/10{-/Table/4} database system (tenant)
upsert /Tenant/10/Table/{4-5} database system (tenant)
upsert /Tenant/10/Table/{5-6} database system (tenant)
upsert /Tenant/10/Table/{6-7} database system (tenant)
upsert /Tenant/10/Table/{7-8} database system (tenant)
upsert /Tenant/10/Table/1{1-2} database system (tenant)
upsert /Tenant/10/Table/1{2-3} database system (tenant)
upsert /Tenant/10/Table/1{3-4} database system (tenant)
upsert /Tenant/10/Table/1{4-5} database system (tenant)
upsert /Tenant/10/Table/1{5-6} database system (tenant)
upsert /Tenant/10/Table/{19-20} database system (tenant)
upsert /Tenant/10/Table/2{0-1} database system (tenant)
upsert /Tenant/10/Table/2{1-2} database system (tenant)
upsert /Tenant/10/Table/2{3-4} database system (tenant)
upsert /Tenant/10/Table/2{4-5} database system (tenant)
upsert /Tenant/10/Table/2{5-6} database system (tenant)
upsert /Tenant/10/Table/2{6-7} database system (tenant)
upsert /Tenant/10/Table/2{7-8} database system (tenant)
upsert /Tenant/10/Table/2{8-9} database system (tenant)
upsert /Tenant/10/NamespaceTable/{30-Max} database system (tenant)
upsert /Tenant/10/{NamespaceTable/Max-Table/32} database system (tenant)
upsert /Tenant/10/Table/3{2-3} database system (tenant)
upsert /Tenant/10/Table/3{3-4} database system (tenant)
upsert /Tenant/10/Table/3{4-5} database system (tenant)
upsert /Tenant/10/Table/3{5-6} database system (tenant)
upsert /Tenant/10/Table/3{6-7} database system (tenant)
upsert /Tenant/10/Table/3{7-8} database system (tenant)
upsert /Tenant/10/Table/{39-40} database system (tenant)
upsert /Tenant/10/Table/4{0-1} database system (tenant)
upsert /Tenant/10/Table/4{1-2} database system (tenant)
upsert /Tenant/10/Table/4{2-3} database system (tenant)
upsert /Tenant/10/Table/4{3-4} database system (tenant)
upsert /Tenant/10/Table/4{4-5} database system (tenant)
upsert /Tenant/10/Table/4{6-7} database system (tenant)

exec-sql tenant=10
CREATE DATABASE db;
CREATE TABLE db.t1();
CREATE TABLE db.t2();
----

# Write a protected timestamp record on the cluster as a secondary tenant.
protect record-id=3 ts=3 cluster tenant=10
cluster
----

# We expect to see no mutations on the host tenant.
mutations
----

mutations tenant=10
----
upsert {source=10,target=10} pts=[3]
upsert /Tenant/10/Table/10{6-7} range default
upsert /Tenant/10/Table/10{7-8} range default

state limit=4
----
{entire-keyspace} pts=[1]
{source=1,target=1} pts=[2]
{source=1,target=10} pts=[2]
{source=10,target=10} pts=[3]
...

# Release all the protected timestamp records from the host tenant.
release record-id=1
----

release record-id=2
----

# We expect to see no mutations on the secondary tenant.
mutations tenant=10
----

mutations
----
delete {entire-keyspace}
delete {source=1,target=1}
delete {source=1,target=10}

# Release all the protected timestamp records from the secondary tenant.
release record-id=3 tenant=10
----

mutations tenant=10
----
delete {source=10,target=10}

# All system span config targets should have been removed at this point.
state limit=4
----
/{Min-System/NodeLiveness} ttl_seconds=3600 num_replicas=5
/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=5
/System/{NodeLivenessMax-tsd} range system
/System{/tsd-tse} range default
...
Loading

0 comments on commit 8b7d2e5

Please sign in to comment.