diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 24b0852d31290..4c3f18714f9a3 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" pd "github.com/tikv/pd/client" ) @@ -39,6 +40,7 @@ type Session interface { CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error Close() GetGlobalVariable(name string) (string, error) + GetSessionCtx() sessionctx.Context } // BatchCreateTableSession is an interface to batch create table parallelly diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 48d5b05b56a63..d07594f1f842d 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -114,6 +114,11 @@ func (g Glue) GetVersion() string { return g.tikvGlue.GetVersion() } +// GetSessionCtx implements glue.Glue +func (gs *tidbSession) GetSessionCtx() sessionctx.Context { + return gs.se +} + // Execute implements glue.Session. func (gs *tidbSession) Execute(ctx context.Context, sql string) error { return gs.ExecuteInternal(ctx, sql) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index c78351e9e0459..04b52adec2154 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -17,7 +17,6 @@ package task import ( "bytes" "context" - "encoding/json" "fmt" "net/http" "strings" @@ -45,6 +44,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/sqlexec" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/oracle" @@ -402,33 +402,6 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro return rs, nil } -// checkRequirements will check some requirements before stream starts. -func (s *streamMgr) checkRequirements(ctx context.Context) (bool, error) { - type backupStream struct { - EnableStreaming bool `json:"enable"` - } - type config struct { - BackupStream backupStream `json:"log-backup"` - } - - supportBackupStream := true - hasTiKV := false - err := s.mgr.GetConfigFromTiKV(ctx, s.httpCli, func(resp *http.Response) error { - hasTiKV = true - c := &config{} - e := json.NewDecoder(resp.Body).Decode(c) - if e != nil { - return e - } - supportBackupStream = supportBackupStream && c.BackupStream.EnableStreaming - return nil - }) - if err != nil { - return false, errors.Trace(err) - } - return hasTiKV && supportBackupStream, err -} - func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil) metaWriter.Update(func(m *backuppb.BackupMeta) { @@ -504,7 +477,12 @@ func RunStreamStart( } defer streamMgr.close() - supportStream, err := streamMgr.checkRequirements(ctx) + se, err := g.CreateSession(streamMgr.mgr.GetStorage()) + if err != nil { + return errors.Trace(err) + } + execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) + supportStream, err := utils.IsLogBackupEnabled(execCtx) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index e82f2eccb8cc3..a6e7e6fcbe854 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -17,134 +17,17 @@ package task import ( "context" "fmt" - "net/http" - "net/http/httptest" "path/filepath" - "strings" "testing" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/conn" - "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" - pd "github.com/tikv/pd/client" ) -func newMockStreamMgr(pdCli pd.Client, httpCli *http.Client) *streamMgr { - pdController := &pdutil.PdController{} - pdController.SetPDClient(pdCli) - mgr := &conn.Mgr{PdController: pdController} - return &streamMgr{mgr: mgr, httpCli: httpCli} -} - -func TestStreamStartChecks(t *testing.T) { - cases := []struct { - stores []*metapb.Store - content []string - supportStream bool - }{ - { - stores: []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash", - }, - }, - }, - }, - content: []string{""}, - // no tikv detected in this case, so support is false. - supportStream: false, - }, - { - stores: []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tikv", - }, - }, - }, - }, - content: []string{ - "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", - }, - // one tikv detected in this case and `enable-streaming` is true. - supportStream: true, - }, - { - stores: []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tikv", - }, - }, - }, - { - Id: 2, - State: metapb.StoreState_Up, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tikv", - }, - }, - }, - }, - content: []string{ - "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", - "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", - }, - // two tikv detected in this case and one of them's `enable-streaming` is false. - supportStream: false, - }, - } - - ctx := context.Background() - for _, ca := range cases { - pdCli := utils.FakePDClient{Stores: ca.stores} - require.Equal(t, len(ca.content), len(ca.stores)) - count := 0 - mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch strings.TrimSpace(r.URL.Path) { - case "/config": - _, _ = fmt.Fprint(w, ca.content[count]) - default: - http.NotFoundHandler().ServeHTTP(w, r) - } - count++ - })) - - for _, s := range ca.stores { - s.StatusAddress = mockServer.URL - } - - httpCli := mockServer.Client() - sMgr := newMockStreamMgr(pdCli, httpCli) - support, err := sMgr.checkRequirements(ctx) - require.NoError(t, err) - require.Equal(t, ca.supportStream, support) - mockServer.Close() - } -} - func TestShiftTS(t *testing.T) { var startTS uint64 = 433155751280640000 shiftTS := ShiftTS(startTS) diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 346aca6157dbb..d689d7101ed64 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -5,6 +5,13 @@ package utils import ( "context" "database/sql" + "strings" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" ) var ( @@ -30,3 +37,55 @@ type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) } + +// CheckLogBackupEnabled checks if LogBackup is enabled in cluster. +// this mainly used in three places. +// 1. GC worker resolve locks to scan more locks after safepoint. (every minute) +// 2. Add index skipping use lightning.(every add index ddl) +// 3. Telemetry of log backup feature usage (every 6 hours). +// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster. +func CheckLogBackupEnabled(ctx sessionctx.Context) bool { + executor, ok := ctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + // shouldn't happen + log.Error("[backup] unable to translate executor from sessionctx") + return false + } + enabled, err := IsLogBackupEnabled(executor) + if err != nil { + // if it failed by any reason. we can simply return true this time. + // for GC worker it will scan more locks in one tick. + // for Add index it will skip using lightning this time. + // for Telemetry it will get a false positive usage count. + log.Warn("[backup] check log backup config failed, ignore it", zap.Error(err)) + return true + } + return enabled +} + +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. +// it should return error. +func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { + valStr := "show config where name = 'log-backup.enable'" + internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) + rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr) + if errSQL != nil { + return false, errSQL + } + if len(rows) == 0 { + // no rows mean not support log backup. + return false, nil + } + for _, row := range rows { + d := row.GetDatum(3, &fields[3].Column.FieldType) + value, errField := d.ToString() + if errField != nil { + return false, errField + } + if strings.ToLower(value) == "false" { + return false, nil + } + } + return true, nil +} diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go new file mode 100644 index 0000000000000..a28799932f02a --- /dev/null +++ b/br/pkg/utils/db_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package utils_test + +import ( + "context" + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" +) + +type mockRestrictedSQLExecutor struct { + rows []chunk.Row + fields []*ast.ResultField + errHappen bool +} + +func (m *mockRestrictedSQLExecutor) ParseWithParams(ctx context.Context, sql string, args ...interface{}) (ast.StmtNode, error) { + return nil, nil +} + +func (m *mockRestrictedSQLExecutor) ExecRestrictedStmt(ctx context.Context, stmt ast.StmtNode, opts ...sqlexec.OptionFuncAlias) ([]chunk.Row, []*ast.ResultField, error) { + return nil, nil, nil +} + +func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + if m.errHappen { + return nil, nil, errors.New("injected error") + } + return m.rows, m.fields, nil +} + +func TestIsLogBackupEnabled(t *testing.T) { + // config format: + // MySQL [(none)]> show config where name="log-backup.enable"; + // +------+-----------------+-------------------+-------+ + // | Type | Instance | Name | Value | + // +------+-----------------+-------------------+-------+ + // | tikv | 127.0.0.1:20161 | log-backup.enable | false | + // | tikv | 127.0.0.1:20162 | log-backup.enable | false | + // | tikv | 127.0.0.1:20160 | log-backup.enable | false | + // +------+-----------------+-------------------+-------+ + fields := make([]*ast.ResultField, 4) + tps := []*types.FieldType{ + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + } + for i := 0; i < len(tps); i++ { + rf := new(ast.ResultField) + rf.Column = new(model.ColumnInfo) + rf.Column.FieldType = *tps[i] + fields[i] = rf + } + rows := make([]chunk.Row, 0, 1) + + // case 1: non of tikvs enabled log-backup expected false + // tikv | 127.0.0.1:20161 | log-backup.enable | false | + row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() + rows = append(rows, row) + s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} + enabled, err := utils.IsLogBackupEnabled(s) + require.NoError(t, err) + require.False(t, enabled) + + // case 2: one of tikvs enabled log-backup expected false + // tikv | 127.0.0.1:20161 | log-backup.enable | false | + // tikv | 127.0.0.1:20162 | log-backup.enable | true | + rows = nil + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() + rows = append(rows, row) + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() + rows = append(rows, row) + s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} + enabled, err = utils.IsLogBackupEnabled(s) + require.NoError(t, err) + require.False(t, enabled) + + // case 3: all of tikvs enabled log-backup expected true + // tikv | 127.0.0.1:20161 | log-backup.enable | true | + // tikv | 127.0.0.1:20162 | log-backup.enable | true | + // tikv | 127.0.0.1:20163 | log-backup.enable | true | + rows = nil + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "true").ToRow() + rows = append(rows, row) + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() + rows = append(rows, row) + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20163", "log-backup.enable", "true").ToRow() + rows = append(rows, row) + s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} + enabled, err = utils.IsLogBackupEnabled(s) + require.NoError(t, err) + require.True(t, enabled) + + // case 4: met error and expected false. + s = &mockRestrictedSQLExecutor{errHappen: true} + enabled, err = utils.IsLogBackupEnabled(s) + require.Error(t, err) + require.False(t, enabled) +} diff --git a/ddl/util/main_test.go b/ddl/util/main_test.go index a28cdcb4b5bfc..fe13e958c6f7c 100644 --- a/ddl/util/main_test.go +++ b/ddl/util/main_test.go @@ -25,6 +25,7 @@ func TestMain(m *testing.M) { testsetup.SetupForCommonTest() opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), } goleak.VerifyTestMain(m, opts...) diff --git a/executor/brie.go b/executor/brie.go index 6dd1d9053a4ac..690497da83b54 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -448,7 +448,12 @@ type tidbGlueSession struct { info *brieTaskInfo } -// BootstrapSession implements glue.Glue +// GetSessionCtx implements glue.Glue +func (gs *tidbGlueSession) GetSessionCtx() sessionctx.Context { + return gs.se +} + +// GetDomain implements glue.Glue func (gs *tidbGlueSession) GetDomain(store kv.Storage) (*domain.Domain, error) { return domain.GetDomain(gs.se), nil } diff --git a/owner/main_test.go b/owner/main_test.go index 501cae73e4c5b..57bc7021c5ca9 100644 --- a/owner/main_test.go +++ b/owner/main_test.go @@ -25,6 +25,7 @@ func TestMain(m *testing.M) { testsetup.SetupForCommonTest() opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), } goleak.VerifyTestMain(m, opts...) diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index e07536e599795..4bd1d491d6671 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/label" "github.com/pingcap/tidb/ddl/placement" @@ -78,6 +79,7 @@ type GCWorker struct { batchResolveLocks func(locks []*txnlock.Lock, regionID tikv.RegionVerID, safepoint uint64) (ok bool, err error) resolveLocks func(locks []*txnlock.Lock, lowResolutionTS uint64) (int64, error) } + logBackupEnabled bool } // NewGCWorker creates a GCWorker instance. @@ -1027,9 +1029,8 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren if tryResolveLocksTS < safePoint { tryResolveLocksTS = safePoint - } else { - // to do: add a switch for tryResolveLocksTS. - // if the config log-backup.enable is false in PiTR, set safePoint to tryResolveLocksTS directly. + } else if !w.logBackupEnabled { + tryResolveLocksTS = safePoint } if !usePhysical { @@ -1778,6 +1779,7 @@ func (w *GCWorker) checkLeader(ctx context.Context) (bool, error) { se := createSession(w.store) defer se.Close() + w.logBackupEnabled = utils.CheckLogBackupEnabled(se) _, err := se.ExecuteInternal(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 362c304efafe5..886ad9d7cf886 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -1820,6 +1820,8 @@ func TestGCLabelRules(t *testing.T) { func TestGCWithPendingTxn(t *testing.T) { s, clean := createGCWorkerSuite(t) defer clean() + // set to false gc worker won't resolve locks after safepoint. + s.gcWorker.logBackupEnabled = false ctx := gcContext() gcSafePointCacheInterval = 0 @@ -1866,11 +1868,13 @@ func TestGCWithPendingTxn(t *testing.T) { require.NoError(t, err) err = txn.Commit(ctx) - require.Error(t, err) + require.NoError(t, err) } func TestGCWithPendingTxn2(t *testing.T) { s, clean := createGCWorkerSuite(t) + // only when log backup enabled will scan locks after safepoint. + s.gcWorker.logBackupEnabled = true defer clean() ctx := gcContext() diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index bc73153a84e15..9020a7d10651b 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -18,6 +18,7 @@ import ( "context" "errors" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" m "github.com/pingcap/tidb/metrics" @@ -47,6 +48,7 @@ type featureUsage struct { NonTransactionalUsage *m.NonTransactionalStmtCounter `json:"nonTransactional"` GlobalKill bool `json:"globalKill"` MultiSchemaChange *m.MultiSchemaChangeUsageCounter `json:"multiSchemaChange"` + LogBackup bool `json:"logBackup"` } type placementPolicyUsage struct { @@ -81,6 +83,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.GlobalKill = getGlobalKillUsageInfo() + usage.LogBackup = getLogBackupUsageInfo(sctx) + return &usage, nil } @@ -279,3 +283,7 @@ func postReportNonTransactionalCounter() { func getGlobalKillUsageInfo() bool { return config.GetGlobalConfig().EnableGlobalKill } + +func getLogBackupUsageInfo(ctx sessionctx.Context) bool { + return utils.CheckLogBackupEnabled(ctx) +}