Skip to content

Commit

Permalink
log-backup: add the switch for log backup (#36115)
Browse files Browse the repository at this point in the history
ref #29501
  • Loading branch information
3pointer authored Jul 15, 2022
1 parent ce4edc9 commit f47978c
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 151 deletions.
2 changes: 2 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -39,6 +40,7 @@ type Session interface {
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
GetSessionCtx() sessionctx.Context
}

// BatchCreateTableSession is an interface to batch create table parallelly
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (g Glue) GetVersion() string {
return g.tikvGlue.GetVersion()
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
}

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
return gs.ExecuteInternal(ctx, sql)
Expand Down
36 changes: 7 additions & 29 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package task
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -45,6 +44,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -402,33 +402,6 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro
return rs, nil
}

// checkRequirements will check some requirements before stream starts.
func (s *streamMgr) checkRequirements(ctx context.Context) (bool, error) {
type backupStream struct {
EnableStreaming bool `json:"enable"`
}
type config struct {
BackupStream backupStream `json:"log-backup"`
}

supportBackupStream := true
hasTiKV := false
err := s.mgr.GetConfigFromTiKV(ctx, s.httpCli, func(resp *http.Response) error {
hasTiKV = true
c := &config{}
e := json.NewDecoder(resp.Body).Decode(c)
if e != nil {
return e
}
supportBackupStream = supportBackupStream && c.BackupStream.EnableStreaming
return nil
})
if err != nil {
return false, errors.Trace(err)
}
return hasTiKV && supportBackupStream, err
}

func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
Expand Down Expand Up @@ -504,7 +477,12 @@ func RunStreamStart(
}
defer streamMgr.close()

supportStream, err := streamMgr.checkRequirements(ctx)
se, err := g.CreateSession(streamMgr.mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor)
supportStream, err := utils.IsLogBackupEnabled(execCtx)
if err != nil {
return errors.Trace(err)
}
Expand Down
117 changes: 0 additions & 117 deletions br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,134 +17,17 @@ package task
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
)

func newMockStreamMgr(pdCli pd.Client, httpCli *http.Client) *streamMgr {
pdController := &pdutil.PdController{}
pdController.SetPDClient(pdCli)
mgr := &conn.Mgr{PdController: pdController}
return &streamMgr{mgr: mgr, httpCli: httpCli}
}

func TestStreamStartChecks(t *testing.T) {
cases := []struct {
stores []*metapb.Store
content []string
supportStream bool
}{
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
},
content: []string{""},
// no tikv detected in this case, so support is false.
supportStream: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
},
// one tikv detected in this case and `enable-streaming` is true.
supportStream: true,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
// two tikv detected in this case and one of them's `enable-streaming` is false.
supportStream: false,
},
}

ctx := context.Background()
for _, ca := range cases {
pdCli := utils.FakePDClient{Stores: ca.stores}
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/config":
_, _ = fmt.Fprint(w, ca.content[count])
default:
http.NotFoundHandler().ServeHTTP(w, r)
}
count++
}))

for _, s := range ca.stores {
s.StatusAddress = mockServer.URL
}

httpCli := mockServer.Client()
sMgr := newMockStreamMgr(pdCli, httpCli)
support, err := sMgr.checkRequirements(ctx)
require.NoError(t, err)
require.Equal(t, ca.supportStream, support)
mockServer.Close()
}
}

func TestShiftTS(t *testing.T) {
var startTS uint64 = 433155751280640000
shiftTS := ShiftTS(startTS)
Expand Down
59 changes: 59 additions & 0 deletions br/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ package utils
import (
"context"
"database/sql"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

var (
Expand All @@ -30,3 +37,55 @@ type DBExecutor interface {
StmtExecutor
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
}

// CheckLogBackupEnabled checks if LogBackup is enabled in cluster.
// this mainly used in three places.
// 1. GC worker resolve locks to scan more locks after safepoint. (every minute)
// 2. Add index skipping use lightning.(every add index ddl)
// 3. Telemetry of log backup feature usage (every 6 hours).
// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster.
func CheckLogBackupEnabled(ctx sessionctx.Context) bool {
executor, ok := ctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
// shouldn't happen
log.Error("[backup] unable to translate executor from sessionctx")
return false
}
enabled, err := IsLogBackupEnabled(executor)
if err != nil {
// if it failed by any reason. we can simply return true this time.
// for GC worker it will scan more locks in one tick.
// for Add index it will skip using lightning this time.
// for Telemetry it will get a false positive usage count.
log.Warn("[backup] check log backup config failed, ignore it", zap.Error(err))
return true
}
return enabled
}

// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup.
// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock.
// it should return error.
func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) {
valStr := "show config where name = 'log-backup.enable'"
internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR)
rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr)
if errSQL != nil {
return false, errSQL
}
if len(rows) == 0 {
// no rows mean not support log backup.
return false, nil
}
for _, row := range rows {
d := row.GetDatum(3, &fields[3].Column.FieldType)
value, errField := d.ToString()
if errField != nil {
return false, errField
}
if strings.ToLower(value) == "false" {
return false, nil
}
}
return true, nil
}
Loading

0 comments on commit f47978c

Please sign in to comment.