Skip to content

Commit

Permalink
spanconfigreconciler{ccl}: apply system span config diffs to the store
Browse files Browse the repository at this point in the history
This change teaches the reconciler about system span configs. Concretely,
we make the following changes:

- A full reconciliation when checking for existing span configurations now
asks for SpanConfigs corresponding to the SystemTargets relevant to the tenant.

For the host tenant this includes the SystemTarget for the `entire-keyspace` as
well as the SystemTarget for span configs installed by the host tenant on its
tenant keyspace, and on other secondary tenant keyspaces.

For secondary tenants this only includes the SystemTarget for span configs installed
by it on its own tenant keyspace.

- During incremental reconciliation, before applying our updates to the Store,
we now also check for "missing protected timestamp system targets". These correspond
to protected timestamp records that target a `Cluster` or a `Tenant` but no longer
exist in the system.protected_ts_records table as they have been released by the client.
For every such unique missing system target we apply a spanconfig.Deletion to the Store.

In order to make the above possible, this change moves the ptsStateReader from the
`spanconfigsqltranslator` package, to the top level `spanconfig` package.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Feb 25, 2022
1 parent 715c1eb commit c300ec2
Show file tree
Hide file tree
Showing 21 changed files with 596 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestDataDriven(t *testing.T) {
// configuration changes
testutils.SucceedsSoon(t, func() error {
for _, tenant := range spanConfigTestCluster.Tenants() {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.IsEmpty() {
continue // reconciler wasn't started
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestDataDriven(t *testing.T) {
case "exec-sql":
// Run under an explicit transaction -- we rely on having a
// single timestamp for the statements (see
// tenant.TimestampAfterLastExec) for ordering guarantees.
// tenant.TimestampAfterLastSQLChange) for ordering guarantees.
tenant.Exec(fmt.Sprintf("BEGIN; %s; COMMIT;", d.Input))

case "query-sql":
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
45 changes: 39 additions & 6 deletions pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
Expand Down Expand Up @@ -60,6 +61,16 @@ import (
// Print out the contents of KVAccessor directly, skipping 'offset' entries,
// returning up to the specified limit if any.
//
// - "protect" [record-id=<int>] [ts=<int>]
// cluster OR
// tenants id1,id2... OR
// descs id1,id2...
// Creates and writes a protected timestamp record with id and ts with an
// appropriate ptpb.Target.
//
// - "release" [record-id=<int>]
// Releases the protected timestamp record with id.
//
// TODO(irfansharif): Provide a way to stop reconcilers and/or start them back
// up again. It would let us add simulate for suspended tenants, and behavior of
// the reconciler with existing kvaccessor state (populated by an earlier
Expand All @@ -85,11 +96,13 @@ func TestDataDriven(t *testing.T) {
// Checkpoint noops frequently; speeds this test up.
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
}
ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: scKnobs,
ProtectedTS: ptsKnobs,
},
},
})
Expand All @@ -101,7 +114,7 @@ func TestDataDriven(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, ptsKnobs)
defer spanConfigTestCluster.Cleanup()

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
Expand Down Expand Up @@ -131,7 +144,7 @@ func TestDataDriven(t *testing.T) {
case "exec-sql":
// Run under an explicit transaction -- we rely on having a
// single timestamp for the statements (see
// tenant.TimestampAfterLastExec) for ordering guarantees.
// tenant.TimestampAfterLastSQLChange) for ordering guarantees.
tenant.Exec(fmt.Sprintf("BEGIN; %s; COMMIT;", d.Input))

case "query-sql":
Expand Down Expand Up @@ -159,7 +172,7 @@ func TestDataDriven(t *testing.T) {

case "mutations":
testutils.SucceedsSoon(t, func() error {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.Less(lastExec) {
return errors.Newf("last checkpoint timestamp (%s) lagging last sql execution (%s)",
lastCheckpoint.GoTime(), lastExec.GoTime())
Expand All @@ -179,7 +192,7 @@ func TestDataDriven(t *testing.T) {
// tenant checkpoints to cross their last execution
// timestamp.
for _, tenant := range spanConfigTestCluster.Tenants() {
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastExec()
lastCheckpoint, lastExec := tenant.LastCheckpoint(), tenant.TimestampAfterLastSQLChange()
if lastCheckpoint.IsEmpty() {
continue // reconciler wasn't started
}
Expand All @@ -200,11 +213,31 @@ func TestDataDriven(t *testing.T) {

lines := make([]string, len(records))
for i, record := range records {
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan().String(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
switch {
case record.Target.IsSpanTarget():
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSpan(),
spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(record.Config))
case record.Target.IsSystemTarget():
lines[i] = fmt.Sprintf("%-42s %s", record.Target.GetSystemTarget(),
spanconfigtestutils.PrintSystemSpanConfigDiffedAgainstDefault(record.Config))
default:
panic("unsupported target type")
}
}
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", lines)

case "protect":
var recordID string
var protectTS int
d.ScanArgs(t, "record-id", &recordID)
d.ScanArgs(t, "ts", &protectTS)
target := spanconfigtestutils.ParseProtectionTarget(t, d.Input)
tenant.MakeProtectedTimestampRecordAndProtect(ctx, recordID, protectTS, target)

case "release":
var recordID string
d.ScanArgs(t, "record-id", &recordID)
tenant.ReleaseProtectedTimestampRecord(ctx, recordID)
default:
t.Fatalf("unknown command: %s", d.Cmd)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Test span configs in the presence of multiple secondary tenants, each running
# their own reconciliation loop.

reconcile
----

mutations discard
----

initialize tenant=10
----

# Ensure that the host tenant observes no mutations.
mutations
----

# We should observe placeholder entries for both tenants (installed when
# creating tenant records).
state offset=47
----
...
/Table/5{0-1} database system (host)
/Tenant/10{-"\x00"} database system (tenant)

# Write a protected timestamp record on the system tenant cluster.
protect record-id=1 ts=1
cluster
----

# Write a protected timestamp record on the system tenant keyspace, and on the
# secondary tenant keyspace.
protect record-id=2 ts=2
tenants 1,10
----

# Start the reconciliation loop for the secondary tenant.
reconcile tenant=10
----

# We should see protected timestamp record mutations as the host tenant.
mutations
----
upsert {entire-keyspace} protection_policies=[{ts: 1}]
upsert {source=1,target=1} protection_policies=[{ts: 2}]
upsert {source=1,target=10} protection_policies=[{ts: 2}]

# We shouldn't see any protected timestamp record mutations as a secondary
# tenant.
mutations tenant=10
----
delete /Tenant/10{-"\x00"}
upsert /Tenant/10{-/Table/4} database system (tenant)
upsert /Tenant/10/Table/{4-5} database system (tenant)
upsert /Tenant/10/Table/{5-6} database system (tenant)
upsert /Tenant/10/Table/{6-7} database system (tenant)
upsert /Tenant/10/Table/{7-8} database system (tenant)
upsert /Tenant/10/Table/1{1-2} database system (tenant)
upsert /Tenant/10/Table/1{2-3} database system (tenant)
upsert /Tenant/10/Table/1{3-4} database system (tenant)
upsert /Tenant/10/Table/1{4-5} database system (tenant)
upsert /Tenant/10/Table/1{5-6} database system (tenant)
upsert /Tenant/10/Table/{19-20} database system (tenant)
upsert /Tenant/10/Table/2{0-1} database system (tenant)
upsert /Tenant/10/Table/2{1-2} database system (tenant)
upsert /Tenant/10/Table/2{3-4} database system (tenant)
upsert /Tenant/10/Table/2{4-5} database system (tenant)
upsert /Tenant/10/Table/2{5-6} database system (tenant)
upsert /Tenant/10/Table/2{6-7} database system (tenant)
upsert /Tenant/10/Table/2{7-8} database system (tenant)
upsert /Tenant/10/Table/2{8-9} database system (tenant)
upsert /Tenant/10/NamespaceTable/{30-Max} database system (tenant)
upsert /Tenant/10/{NamespaceTable/Max-Table/32} database system (tenant)
upsert /Tenant/10/Table/3{2-3} database system (tenant)
upsert /Tenant/10/Table/3{3-4} database system (tenant)
upsert /Tenant/10/Table/3{4-5} database system (tenant)
upsert /Tenant/10/Table/3{5-6} database system (tenant)
upsert /Tenant/10/Table/3{6-7} database system (tenant)
upsert /Tenant/10/Table/3{7-8} database system (tenant)
upsert /Tenant/10/Table/{39-40} database system (tenant)
upsert /Tenant/10/Table/4{0-1} database system (tenant)
upsert /Tenant/10/Table/4{1-2} database system (tenant)
upsert /Tenant/10/Table/4{2-3} database system (tenant)
upsert /Tenant/10/Table/4{3-4} database system (tenant)
upsert /Tenant/10/Table/4{4-5} database system (tenant)
upsert /Tenant/10/Table/4{6-7} database system (tenant)

exec-sql tenant=10
CREATE DATABASE db;
CREATE TABLE db.t1();
CREATE TABLE db.t2();
----

mutations tenant=10
----
upsert /Tenant/10/Table/10{6-7} range default
upsert /Tenant/10/Table/10{7-8} range default

# Write a protected timestamp record on the cluster as a secondary tenant.
protect record-id=3 ts=3 cluster tenant=10
cluster
----

# We expect to see no mutations on the host tenant.
mutations
----

mutations tenant=10
----
upsert {source=10,target=10} protection_policies=[{ts: 3}]

state limit=4
----
{entire-keyspace} protection_policies=[{ts: 1}]
{source=1,target=1} protection_policies=[{ts: 2}]
{source=1,target=10} protection_policies=[{ts: 2}]
{source=10,target=10} protection_policies=[{ts: 3}]
...

# Release all the protected timestamp records from the host tenant.
release record-id=1
----

release record-id=2
----

# We expect to see no mutations on the secondary tenant.
mutations tenant=10
----

mutations
----
delete {entire-keyspace}
delete {source=1,target=1}
delete {source=1,target=10}

# Release all the protected timestamp records from the secondary tenant.
release record-id=3 tenant=10
----

mutations tenant=10
----
delete {source=10,target=10}

# All system span config targets should have been removed at this point.
state limit=4
----
/{Min-System/NodeLiveness} ttl_seconds=3600 num_replicas=5
/System/NodeLiveness{-Max} ttl_seconds=600 num_replicas=5
/System/{NodeLivenessMax-tsd} range system
/System{/tsd-tse} range default
...
Loading

0 comments on commit c300ec2

Please sign in to comment.