Skip to content

Commit

Permalink
Merge #55935
Browse files Browse the repository at this point in the history
55935: sql/tenant: synch clear range data on tenant drop request r=spaskob a=spaskob

Informs #47904.

This is a stop gap solution that will perform the operation
synchronously inside the transaction that marks the tenant row
as DROP and finally deletes the row.

This PR is intended to be backported to 20.2 release branch.
The long-term solution that implements #48775 is done in #55756
and will be our long term approach for release 21.0 and beyond.

Release note: none.

Co-authored-by: Spas Bojanov <[email protected]>
  • Loading branch information
craig[bot] and Spas Bojanov committed Oct 29, 2020
2 parents f945cae + 3145bc4 commit aba7e3a
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 23 deletions.
4 changes: 4 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,8 @@ type TestTenantArgs struct {
// TenantIDCodecOverride overrides the tenant ID used to construct the SQL
// server's codec, but nothing else (e.g. its certs). Used for testing.
TenantIDCodecOverride roachpb.TenantID

// Stopper, if not nil, is used to stop the tenant manually otherwise the
// TestServer stopper will be used.
Stopper *stop.Stopper
}
40 changes: 36 additions & 4 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5773,6 +5773,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
// Ensure that backing up and restoring tenants succeeds.
func TestBackupRestoreTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

const numAccounts = 1
ctx, tc, systemDB, dir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
Expand Down Expand Up @@ -5817,9 +5818,6 @@ func TestBackupRestoreTenant(t *testing.T) {
systemDB.Exec(t, `BACKUP TENANT 11 TO 'nodelocal://1/t11'`)
systemDB.Exec(t, `BACKUP TENANT 20 TO 'nodelocal://1/t20'`)

// TODO(dt): test destroying a tenant and RESTORE'ing over a destroyed tenant
// once tenant destruction actually clears their key-space. See #48775.

t.Run("non-existent", func(t *testing.T) {
systemDB.ExpectErr(t, "tenant 123 does not exist", `BACKUP TENANT 123 TO 'nodelocal://1/t1'`)
systemDB.ExpectErr(t, "tenant 21 does not exist", `BACKUP TENANT 21 TO 'nodelocal://1/t20'`)
Expand Down Expand Up @@ -5847,11 +5845,45 @@ func TestBackupRestoreTenant(t *testing.T) {
[][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}},
)

ten10Stopper := stop.NewStopper()
restoreConn10 := serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10), Existing: true, Stopper: ten10Stopper,
},
)
restoreTenant10 := sqlutils.MakeSQLRunner(restoreConn10)
restoreTenant10.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))
restoreTenant10.CheckQueryResults(t, `select * from foo.bar2`, tenant10.QueryStr(t, `select * from foo.bar2`))

// Stop the tenant process before destroying the tenant.
restoreConn10.Close()
ten10Stopper.Stop(ctx)
restoreConn10 = nil

restoreDB.Exec(t, `SELECT crdb_internal.destroy_tenant(10)`)
restoreDB.CheckQueryResults(t,
`select id, active, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info) from system.tenants`,
[][]string{{`10`, `false`, `{"id": "10", "state": "DROP"}`}},
)

restoreDB.Exec(t, `SELECT crdb_internal.gc_tenant(10)`)
ten10Prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(10))
ten10PrefixEnd := ten10Prefix.PrefixEnd()
rows, err := restoreTC.Server(0).DB().Scan(ctx, ten10Prefix, ten10PrefixEnd, 0 /* maxRows */)
require.NoError(t, err)
require.Equal(t, []kv.KeyValue{}, rows)

restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
restoreDB.CheckQueryResults(t,
`select id, active, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info) from system.tenants`,
[][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}},
)

restoreConn10 = serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), Existing: true},
)
defer restoreConn10.Close()
restoreTenant10 := sqlutils.MakeSQLRunner(restoreConn10)
restoreTenant10 = sqlutils.MakeSQLRunner(restoreConn10)

restoreTenant10.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))
restoreTenant10.CheckQueryResults(t, `select * from foo.bar2`, tenant10.QueryStr(t, `select * from foo.bar2`))
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,9 +1491,8 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor,
execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
for _, tenant := range details.Tenants {
// TODO(dt): this is a noop since the tenant is already active=false but
// that should be fixed in DestroyTenant.
if err := sql.DestroyTenant(ctx, execCfg, txn, tenant.ID); err != nil {
tenant.State = descpb.TenantInfo_DROP
if err := sql.GCTenant(ctx, execCfg, &tenant); err != nil {
return err
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,13 @@ func (ts *TestServer) StartTenant(
ClusterSettingsUpdater: st.MakeUpdater(),
}
}
stopper := params.Stopper
if stopper == nil {
stopper = ts.Stopper()
}
return StartTenant(
ctx,
ts.Stopper(),
stopper,
ts.Cfg.ClusterName,
baseCfg,
sqlCfg,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,8 @@ func (c *DummyTenantOperator) CreateTenant(_ context.Context, _ uint64) error {
func (c *DummyTenantOperator) DestroyTenant(_ context.Context, _ uint64) error {
return errors.WithStack(errEvalTenant)
}

// GCTenant is part of the tree.TenantOperator interface.
func (c *DummyTenantOperator) GCTenant(_ context.Context, _ uint64) error {
return errors.WithStack(errEvalTenant)
}
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/tenant
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ id active crdb_internal.pb_to_json

# Destroy a tenant.

query error tenant 5 is not in state DROP
SELECT crdb_internal.gc_tenant(5)

query I
SELECT crdb_internal.destroy_tenant(5)
----
Expand All @@ -46,6 +49,19 @@ id active crdb_internal.pb_to_json
query error pgcode 42710 tenant "5" already exists
SELECT crdb_internal.create_tenant(5)

query I
SELECT crdb_internal.gc_tenant(5)
----
5

query error pgcode 42704 tenant "5" does not exist
SELECT crdb_internal.gc_tenant(5)

query I
SELECT crdb_internal.create_tenant(5)
----
5

query error pgcode 42710 tenant "10" already exists
SELECT crdb_internal.create_tenant(10)

Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -4293,6 +4293,31 @@ may increase either contention or retry errors, or both.`,
},
),

"crdb_internal.gc_tenant": makeBuiltin(
tree.FunctionProperties{
Category: categoryMultiTenancy,
Undocumented: true,
},
tree.Overload{
Types: tree.ArgTypes{
{"id", types.Int},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
sTenID := int64(tree.MustBeDInt(args[0]))
if sTenID <= 0 {
return nil, pgerror.New(pgcode.InvalidParameterValue, "tenant ID must be positive")
}
if err := ctx.Tenant.GCTenant(ctx.Context, uint64(sTenID)); err != nil {
return nil, err
}
return args[0], nil
},
Info: "Garbage collects a tenant with the provided ID. Must be run by the System tenant.",
Volatility: tree.VolatilityVolatile,
},
),

"num_nulls": makeBuiltin(
tree.FunctionProperties{
Category: categoryComparison,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3120,6 +3120,11 @@ type TenantOperator interface {
// DestroyTenant attempts to uninstall an existing tenant from the system.
// It returns an error if the tenant does not exist.
DestroyTenant(ctx context.Context, tenantID uint64) error

// GCTenant attempts to garbage collect a DROP tenant from the system. Upon
// success it also removes the tenant record.
// It returns an error if the tenant does not exist.
GCTenant(ctx context.Context, tenantID uint64) error
}

// EvalContextTestingKnobs contains test knobs.
Expand Down
89 changes: 75 additions & 14 deletions pkg/sql/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,38 +211,99 @@ func ActivateTenant(ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, t
return nil
}

// clearTenant deletes the tenant's data.
func clearTenant(ctx context.Context, execCfg *ExecutorConfig, info *descpb.TenantInfo) error {
// Confirm tenant is ready to be cleared.
if info.State != descpb.TenantInfo_DROP {
return errors.Errorf("tenant %d is not in state DROP", info.ID)
}

log.Infof(ctx, "clearing data for tenant %d", info.ID)

prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(info.ID))
prefixEnd := prefix.PrefixEnd()

log.VEventf(ctx, 2, "ClearRange %s - %s", prefix, prefixEnd)
// ClearRange cannot be run in a transaction, so create a non-transactional
// batch to send the request.
b := &kv.Batch{}
b.AddRawRequest(&roachpb.ClearRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: prefix, EndKey: prefixEnd},
})

return errors.Wrapf(execCfg.DB.Run(ctx, b), "clearing tenant %d data", info.ID)
}

// DestroyTenant implements the tree.TenantOperator interface.
func DestroyTenant(ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, tenID uint64) error {
func (p *planner) DestroyTenant(ctx context.Context, tenID uint64) error {
const op = "destroy"
if err := rejectIfCantCoordinateMultiTenancy(execCfg.Codec, op); err != nil {
if err := rejectIfCantCoordinateMultiTenancy(p.execCfg.Codec, op); err != nil {
return err
}
if err := rejectIfSystemTenant(tenID, op); err != nil {
return err
}

// Retrieve the tenant's info.
info, err := getTenantRecord(ctx, execCfg, txn, tenID)
info, err := getTenantRecord(ctx, p.execCfg, p.txn, tenID)
if err != nil {
return errors.Wrap(err, "destroying tenant")
}

if info.State == descpb.TenantInfo_DROP {
return errors.Errorf("tenant %d is already in state DROP", tenID)
}

// Mark the tenant as dropping.
info.State = descpb.TenantInfo_DROP
if err := updateTenantRecord(ctx, execCfg, txn, info); err != nil {
return errors.Wrap(err, "destroying tenant")
return errors.Wrap(updateTenantRecord(ctx, p.execCfg, p.txn, info), "destroying tenant")
}

// GCTenant clears the tenant's data and removes its record.
func GCTenant(ctx context.Context, execCfg *ExecutorConfig, info *descpb.TenantInfo) error {
const op = "gc"
if err := rejectIfCantCoordinateMultiTenancy(execCfg.Codec, op); err != nil {
return err
}
if err := rejectIfSystemTenant(info.ID, op); err != nil {
return err
}

// TODO(nvanbenschoten): actually clear tenant keyspace. We don't want to do
// this synchronously in the same transaction, because we could be deleting
// a very large amount of data. Instead, we should kick off a job that picks
// up the DROP state of the tenant, clears the tenant keyspace, and deletes
// the row in the system.tenants table. Tracked in #48775.
if err := clearTenant(ctx, execCfg, info); err != nil {
return errors.Wrap(err, "clear tenant")
}

return nil
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if num, err := execCfg.InternalExecutor.ExecEx(
ctx, "delete-tenant", txn, sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.tenants WHERE id = $1`, info.ID,
); err != nil {
return errors.Wrapf(err, "deleting tenant %d", info.ID)
} else if num != 1 {
log.Fatalf(ctx, "unexpected number of rows affected: %d", num)
}
return nil
})
return errors.Wrapf(err, "deleting tenant %d record", info.ID)
}

// DestroyTenant implements the tree.TenantOperator interface.
func (p *planner) DestroyTenant(ctx context.Context, tenID uint64) error {
return DestroyTenant(ctx, p.ExecCfg(), p.Txn(), tenID)
// GCTenant implements the tree.TenantOperator interface.
func (p *planner) GCTenant(ctx context.Context, tenID uint64) error {
if !p.ExtendedEvalContext().TxnImplicit {
return errors.Errorf("gc_tenant cannot be used inside a transaction")
}

var info *descpb.TenantInfo
var err error
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
info, err = getTenantRecord(ctx, p.execCfg, p.txn, tenID)
if err != nil {
return errors.Wrapf(err, "retrieving tenant %d", tenID)
}
return nil
}); err != nil {
return err
}

return GCTenant(ctx, p.ExecCfg(), info)
}
6 changes: 5 additions & 1 deletion pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ func StartTenant(t testing.TB, ts TestServerInterface, params base.TestTenantArg
if err != nil {
t.Fatal(err)
}
ts.Stopper().AddCloser(stop.CloserFn(func() {
stopper := params.Stopper
if stopper == nil {
stopper = ts.Stopper()
}
stopper.AddCloser(stop.CloserFn(func() {
cleanupGoDB()
}))
return db
Expand Down

0 comments on commit aba7e3a

Please sign in to comment.