Skip to content

Commit

Permalink
ttl: Add a http api to trigger TTL job manually for test (#41599)
Browse files Browse the repository at this point in the history
close #41597
  • Loading branch information
lcwangchao authored Feb 21, 2023
1 parent df530ad commit c1e4702
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_library(
"//table",
"//table/tables",
"//tablecodec",
"//ttl/client",
"//types",
"//util",
"//util/arena",
Expand Down
34 changes: 34 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
ttlcient "github.com/pingcap/tidb/ttl/client"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -2219,3 +2220,36 @@ func (h labelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

writeData(w, config.GetGlobalConfig().Labels)
}

// ttlJobTriggerHandler is used to trigger a TTL job manually
type ttlJobTriggerHandler struct {
store kv.Storage
}

// ServeHTTP handles request of triger a ttl job
func (h ttlJobTriggerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
writeError(w, errors.Errorf("This api only support POST method"))
return
}

params := mux.Vars(req)
dbName := strings.ToLower(params["db"])
tableName := strings.ToLower(params["table"])

ctx := req.Context()
dom, err := session.GetDomain(h.store)
if err != nil {
log.Error("failed to get session domain", zap.Error(err))
writeError(w, err)
}

cli := dom.TTLJobManager().GetCommandCli()
resp, err := ttlcient.TriggerNewTTLJob(ctx, cli, dbName, tableName)
if err != nil {
log.Error("failed to trigger new TTL job", zap.Error(err))
writeError(w, err)
}
writeData(w, resp)
logutil.Logger(ctx).Info("trigger TTL job manually successfully", zap.String("dbName", dbName), zap.String("tableName", tableName), zap.Any("response", resp))
}
85 changes: 85 additions & 0 deletions server/http_handler_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,88 @@ func TestGetSchemaStorage(t *testing.T) {
tables[0].DataFree,
})
}

func TestTTL(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ts.startServer(t)
defer ts.stopServer(t)

db, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err)
defer func() {
err := db.Close()
require.NoError(t, err)
}()
dbt := testkit.NewDBTestKit(t, db)
dbt.MustExec("create database test_ttl")
dbt.MustExec("use test_ttl")
dbt.MustExec("create table t1(t timestamp) TTL=`t` + interval 1 day")

getJobCnt := func(status string) int {
selectSQL := "select count(1) from mysql.tidb_ttl_job_history"
if status != "" {
selectSQL += " where status = '" + status + "'"
}

rs, err := db.Query(selectSQL)
require.NoError(t, err)
defer func() {
require.NoError(t, rs.Close())
}()

cnt := -1
rowNum := 0
for rs.Next() {
rowNum++
require.Equal(t, 1, rowNum)
require.NoError(t, rs.Scan(&cnt))
}
require.NoError(t, rs.Err())
return cnt
}

waitAllJobsFinish := func() {
start := time.Now()
for time.Since(start) < time.Minute {
cnt := getJobCnt("running")
if cnt == 0 {
return
}
}
require.Fail(t, "timeout for waiting job finished")
}

doTrigger := func() (map[string]interface{}, error) {
resp, err := ts.postStatus("/test/ttl/trigger/test_ttl/t1", "application/json", nil)
if err != nil {
return nil, err
}

defer func() {
require.NoError(t, resp.Body.Close())
}()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

var obj map[string]interface{}
require.NoError(t, json.Unmarshal(body, &obj))
return obj, nil
}

expectedJobCnt := 1
obj, err := doTrigger()
require.NoError(t, err)
if err != nil {
// if error returns, may be a job is running, we should skip it and have a next try when it stopped
require.Equal(t, expectedJobCnt, getJobCnt(""))
waitAllJobsFinish()
obj, err = doTrigger()
require.NoError(t, err)
expectedJobCnt++
}

_, ok := obj["table_result"]
require.True(t, ok)
require.Equal(t, expectedJobCnt, getJobCnt(""))
}
3 changes: 3 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (s *Server) startHTTPServer() {
// ddlHook is enabled only for tests so we can substitute the callback in the DDL.
router.Handle("/test/ddl/hook", &ddlHookHandler{tikvHandlerTool.Store.(kv.Storage)})

// ttlJobTriggerHandler is enabled only for tests, so we can accelerate the schedule of TTL job
router.Handle("/test/ttl/trigger/{db}/{table}", &ttlJobTriggerHandler{tikvHandlerTool.Store.(kv.Storage)})

var (
httpRouterPage bytes.Buffer
pathTemplate string
Expand Down

0 comments on commit c1e4702

Please sign in to comment.