Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add ttl related jobs / execution #39298

Merged
merged 5 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"ddl/ttl.go": "ddl/ttl.go",
"ddl/ttl_test.go": "ddl/ttl_test.go",
"ddl/ingest/": "ddl/ingest/",
"expression/builtin_cast.go": "expression/builtin_cast code",
"server/conn.go": "server/conn.go",
Expand Down
2 changes: 2 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"stat.go",
"table.go",
"table_lock.go",
"ttl.go",
],
importpath = "github.com/pingcap/tidb/ddl",
visibility = [
Expand Down Expand Up @@ -195,6 +196,7 @@ go_test(
"table_split_test.go",
"table_test.go",
"tiflash_replica_test.go",
"ttl_test.go",
],
embed = [":ddl"],
flaky = True,
Expand Down
18 changes: 18 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ func checkDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo,
if err = checkDropColumnWithForeignKeyConstraintInOwner(d, t, job, tblInfo, colName.L); err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
if err = checkDropColumnWithTTLConfig(tblInfo, colName.L); err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
return tblInfo, colInfo, idxInfos, false, nil
}
Expand Down Expand Up @@ -858,6 +861,9 @@ func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast
indexesToRemove := filterIndexesToRemove(changingIdxs, newName, tblInfo)
replaceOldIndexes(tblInfo, indexesToRemove)
}
if tblInfo.TTLInfo != nil {
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, changingCol.Name)
}
// Move the new column to a correct offset.
destOffset, err := LocateOffsetToMove(changingCol.Offset, pos, tblInfo)
if err != nil {
Expand Down Expand Up @@ -932,6 +938,17 @@ func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model
}
}

func updateTTLInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) {
if oldCol.L == newCol.L {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
return
}
if tblInfo.TTLInfo != nil {
if tblInfo.TTLInfo.ColumnName.L == oldCol.L {
tblInfo.TTLInfo.ColumnName = newCol
}
}
}

// filterIndexesToRemove filters out the indexes that can be removed.
func filterIndexesToRemove(changingIdxs []*model.IndexInfo, colName model.CIStr, tblInfo *model.TableInfo) []*model.IndexInfo {
indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs))
Expand Down Expand Up @@ -1474,6 +1491,7 @@ func adjustTableInfoAfterModifyColumn(
tblInfo.MoveColumnInfo(oldCol.Offset, destOffset)
updateNewIdxColsNameOffset(tblInfo.Indices, oldCol.Name, newCol)
updateFKInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
return nil
}

Expand Down
150 changes: 150 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,11 @@ func checkTableInfoValidWithStmt(ctx sessionctx.Context, tbInfo *model.TableInfo
}
}
}
if tbInfo.TTLInfo != nil {
if err := checkTTLInfoValid(ctx, tbInfo); err != nil {
return errors.Trace(err)
}
}

return nil
}
Expand Down Expand Up @@ -2193,6 +2198,10 @@ func BuildTableInfoWithLike(ctx sessionctx.Context, ident ast.Ident, referTblInf
copy(pi.Definitions, referTblInfo.Partition.Definitions)
tblInfo.Partition = &pi
}

if referTblInfo.TTLInfo != nil {
tblInfo.TTLInfo = referTblInfo.TTLInfo.Clone()
}
return &tblInfo, nil
}

Expand Down Expand Up @@ -3000,6 +3009,8 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement

// handleTableOptions updates tableInfo according to table options.
func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) error {
var handledTTLOrTTLEnable bool

for _, op := range options {
switch op.Tp {
case ast.TableOptionAutoIncrement:
Expand Down Expand Up @@ -3036,6 +3047,23 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
tbInfo.PlacementPolicyRef = &model.PolicyRefInfo{
Name: model.NewCIStr(op.StrValue),
}
case ast.TableOptionTTL, ast.TableOptionTTLEnable:
if handledTTLOrTTLEnable {
continue
}

ttlInfo, ttlEnable, err := getTTLInfoInOptions(options)
if err != nil {
return err
}
// It's impossible that `ttlInfo` and `ttlEnable` are all nil, because we have met this option.
// After exclude the situation `ttlInfo == nil && ttlEnable != nil`, we could say `ttlInfo != nil`
if ttlInfo == nil && ttlEnable != nil {
return errors.Trace(dbterror.ErrSetTTLEnableForNonTTLTable)
}

tbInfo.TTLInfo = ttlInfo
handledTTLOrTTLEnable = true
}
}
shardingBits := shardingBits(tbInfo)
Expand Down Expand Up @@ -3227,6 +3255,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
}
for _, spec := range validSpecs {
var handledCharsetOrCollate bool
var handledTTLOrTTLEnable bool
switch spec.Tp {
case ast.AlterTableAddColumns:
err = d.AddColumn(sctx, ident, spec)
Expand Down Expand Up @@ -3363,6 +3392,20 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
Name: model.NewCIStr(opt.StrValue),
}
case ast.TableOptionEngine:
case ast.TableOptionTTL, ast.TableOptionTTLEnable:
var ttlInfo *model.TTLInfo
var ttlEnable *bool

if handledTTLOrTTLEnable {
continue
}
ttlInfo, ttlEnable, err = getTTLInfoInOptions(spec.Options)
if err != nil {
return err
}
err = d.AlterTableTTLInfoOrEnable(sctx, ident, ttlInfo, ttlEnable)

handledTTLOrTTLEnable = true
default:
err = dbterror.ErrUnsupportedAlterTableOption
}
Expand Down Expand Up @@ -3406,6 +3449,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
case ast.AlterTableDisableKeys, ast.AlterTableEnableKeys:
// Nothing to do now, see https://github.com/pingcap/tidb/issues/1051
// MyISAM specific
case ast.AlterTableRemoveTTL:
// the parser makes sure we have only one `ast.AlterTableRemoveTTL` in an alter statement
err = d.AlterTableRemoveTTL(sctx, ident)
default:
err = errors.Trace(dbterror.ErrUnsupportedAlterTableSpec)
}
Expand Down Expand Up @@ -4238,6 +4284,11 @@ func checkIsDroppableColumn(ctx sessionctx.Context, is infoschema.InfoSchema, sc
if err != nil {
return false, errors.Trace(err)
}
// Check the column with TTL config
err = checkDropColumnWithTTLConfig(tblInfo, colName.L)
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, errors.Trace(err)
}
// We don't support dropping column with PK handle covered now.
if col.IsPKHandleColumn(tblInfo) {
return false, dbterror.ErrUnsupportedPKHandle
Expand Down Expand Up @@ -4724,6 +4775,13 @@ func GetModifiableColumnJob(
return nil, errors.Trace(err)
}

if t.Meta().TTLInfo != nil {
// the column referenced by TTL should be a time type
if t.Meta().TTLInfo.ColumnName.L == originalColName.L && !types.IsTypeTime(newCol.ColumnInfo.FieldType.GetType()) {
return nil, errors.Trace(dbterror.ErrUnsupportedColumnInTTLConfig.GenWithStackByArgs(newCol.ColumnInfo.Name.O))
}
}

var newAutoRandBits uint64
if newAutoRandBits, err = checkAutoRandom(t.Meta(), col, specNewColumn); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -5262,6 +5320,98 @@ func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// AlterTableTTLInfoOrEnable submit ddl job to change table info according to the ttlInfo, or ttlEnable
// at least one of the `ttlInfo` or `ttlEnable` should be not nil.
// When `ttlInfo` is nil, and `ttlEnable` is not, it will use the original `.TTLInfo` in the table info and modify the
// `.Enable`. If the `.TTLInfo` in the table info is empty, this function will return an error.
// When `ttlInfo` is not nil, it simply submits the job with the `ttlInfo` and ignore the `ttlEnable`.
func (d *ddl) AlterTableTTLInfoOrEnable(ctx sessionctx.Context, ident ast.Ident, ttlInfo *model.TTLInfo, ttlEnable *bool) error {
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

tblInfo := tb.Meta().Clone()
tableID := tblInfo.ID
tableName := tblInfo.Name.L

var job *model.Job
if ttlInfo != nil {
tblInfo.TTLInfo = ttlInfo
err = checkTTLInfoValid(ctx, tblInfo)
if err != nil {
return err
}
job = &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{ttlInfo, ttlEnable},
}
} else {
if tblInfo.TTLInfo == nil {
return errors.Trace(dbterror.ErrSetTTLEnableForNonTTLTable)
}

job = &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{ttlInfo, ttlEnable},
}
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func (d *ddl) AlterTableRemoveTTL(ctx sessionctx.Context, ident ast.Ident) error {
is := d.infoCache.GetLatest()

schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

tblInfo := tb.Meta().Clone()
tableID := tblInfo.ID
tableName := tblInfo.Name.L

if tblInfo.TTLInfo != nil {
job := &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLRemove,
BinlogInfo: &model.HistoryInfo{},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

return nil
}

func isTableTiFlashSupported(schema *model.DBInfo, tb table.Table) error {
// Memory tables and system tables are not supported by TiFlash
if util.IsMemOrSysDB(schema.Name.L) {
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onFlashbackCluster(d, t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
case model.ActionAlterTTLInfo:
ver, err = onTTLInfoChange(d, t, job)
case model.ActionAlterTTLRemove:
ver, err = onTTLInfoRemove(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
81 changes: 81 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -371,3 +373,82 @@ func TestCreateTables(t *testing.T) {
testGetTable(t, domain, genIDs[1])
testGetTable(t, domain, genIDs[2])
}

func TestAlterTTL(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)

d := domain.DDL()

dbInfo, err := testSchemaInfo(store, "test_table")
require.NoError(t, err)
testCreateSchema(t, testkit.NewTestKit(t, store).Session(), d, dbInfo)

ctx := testkit.NewTestKit(t, store).Session()

// initialize a table with ttlInfo
tableName := "t"
tblInfo, err := testTableInfo(store, tableName, 2)
require.NoError(t, err)
tblInfo.Columns[0].FieldType = *types.NewFieldType(mysql.TypeDatetime)
tblInfo.Columns[1].FieldType = *types.NewFieldType(mysql.TypeDatetime)
tblInfo.TTLInfo = &model.TTLInfo{
ColumnName: tblInfo.Columns[0].Name,
IntervalExprStr: "5",
IntervalTimeUnit: int(ast.TimeUnitDay),
}

// create table
job := testCreateTable(t, ctx, d, dbInfo, tblInfo)
testCheckTableState(t, store, dbInfo, tblInfo, model.StatePublic)
testCheckJobDone(t, store, job.ID, true)

// submit ddl job to modify ttlInfo
tableInfoAfterAlterTTLInfo := tblInfo.Clone()
require.NoError(t, err)
tableInfoAfterAlterTTLInfo.TTLInfo = &model.TTLInfo{
ColumnName: tblInfo.Columns[1].Name,
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitYear),
}

job = &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{&model.TTLInfo{
ColumnName: tblInfo.Columns[1].Name,
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitYear),
}},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})

// assert the ddlInfo as expected
historyJob, err := ddl.GetHistoryJobByID(testkit.NewTestKit(t, store).Session(), job.ID)
require.NoError(t, err)
require.Equal(t, tableInfoAfterAlterTTLInfo.TTLInfo, historyJob.BinlogInfo.TableInfo.TTLInfo)

// submit a ddl job to modify ttlEnabled
job = &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAlterTTLRemove,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{true},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJob(ctx, job))

v = getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})

// assert the ddlInfo as expected
historyJob, err = ddl.GetHistoryJobByID(testkit.NewTestKit(t, store).Session(), job.ID)
require.NoError(t, err)
require.Empty(t, historyJob.BinlogInfo.TableInfo.TTLInfo)
}
Loading