Skip to content

Commit

Permalink
Merge branch 'master' into placement_14
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 7, 2022
2 parents ecc9ce7 + c27f8f6 commit 2b6d702
Show file tree
Hide file tree
Showing 303 changed files with 11,228 additions and 4,657 deletions.
7 changes: 4 additions & 3 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
}

exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
stmt, err := exec.ParseWithParams(context.TODO(), true, `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time, create_time`, updateTime)
if err != nil {
return err
Expand Down Expand Up @@ -697,7 +697,7 @@ func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
tables: make(map[stmtctx.TableEntry]struct{}),
}
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParamsInternal(context.TODO(), `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
stmt, err := exec.ParseWithParams(context.TODO(), true, `SELECT filter_type, filter_value FROM mysql.capture_plan_baselines_blacklist order by filter_type`)
if err != nil {
logutil.BgLogger().Warn("[sql-bind] failed to parse query for mysql.capture_plan_baselines_blacklist load", zap.Error(err))
return
Expand Down Expand Up @@ -923,8 +923,9 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParamsInternal(
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
context.TODO(),
true,
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
variable.TiDBEvolvePlanTaskStartTime,
Expand Down
14 changes: 12 additions & 2 deletions br/cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"syscall"

"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"go.uber.org/zap"
Expand Down Expand Up @@ -89,12 +90,21 @@ func main() {
return app.RunOnce(context.Background(), cfg, nil)
}()

finished := true
if common.IsContextCanceledError(err) {
err = nil
finished = false
}
if err != nil {
logger.Error("tidb lightning encountered error stack info", zap.Error(err))
fmt.Fprintln(os.Stderr, "tidb lightning encountered error: ", err)
} else {
logger.Info("tidb lightning exit")
fmt.Fprintln(os.Stdout, "tidb lightning exit")
logger.Info("tidb lightning exit", zap.Bool("finished", finished))
exitMsg := "tidb lightning exit successfully"
if !finished {
exitMsg = "tidb lightning canceled"
}
fmt.Fprintln(os.Stdout, exitMsg)
}

// call Sync() with log to stdout may return error in some case, so just skip it
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
if err != nil {
return errors.Trace(err)
}
metaWriter.Send(jobBytes, metautil.AppendDDL)
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
}
}
Expand Down Expand Up @@ -951,26 +954,29 @@ backupLoop:
zap.Int("retry time", retry))
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
defer bcli.CloseSend()

for {
resp, err := bcli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Info("backup streaming finish",
zap.Int("retry-time", retry))
_ = bcli.CloseSend()
break backupLoop
}
if isRetryableError(err) {
time.Sleep(3 * time.Second)
// current tikv is unavailable
client, errReset = resetFn()
if errReset != nil {
_ = bcli.CloseSend()
return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+
"please check the tikv status", storeID)
}
_ = bcli.CloseSend()
break
}
_ = bcli.CloseSend()
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry)
}

Expand All @@ -980,6 +986,7 @@ backupLoop:
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
_ = bcli.CloseSend()
return errors.Trace(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true)
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
}

// CreateTable implements glue.Session.
Expand All @@ -143,7 +143,7 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true)
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore)
}

// Close implements glue.Session.
Expand Down
9 changes: 1 addition & 8 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package kv
import (
"fmt"

"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
Expand All @@ -38,15 +37,10 @@ func (t *TableKVDecoder) Name() string {
return t.tableName
}

func (t *TableKVDecoder) DecodeHandleFromTable(key []byte) (kv.Handle, error) {
func (t *TableKVDecoder) DecodeHandleFromRowKey(key []byte) (kv.Handle, error) {
return tablecodec.DecodeRowKey(key)
}

func (t *TableKVDecoder) EncodeHandleKey(tableID int64, h kv.Handle) kv.Key {
// do not ever ever use tbl.Meta().ID, we need to deal with partitioned tables!
return tablecodec.EncodeRowKeyWithHandle(tableID, h)
}

func (t *TableKVDecoder) DecodeHandleFromIndex(indexInfo *model.IndexInfo, key []byte, value []byte) (kv.Handle, error) {
cols := tables.BuildRowcodecColInfoForIndexColumns(indexInfo, t.tbl.Meta())
return tablecodec.DecodeIndexHandle(key, value, len(cols))
Expand Down Expand Up @@ -111,7 +105,6 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
}

func NewTableKVDecoder(tbl table.Table, tableName string, options *SessionOptions) (*TableKVDecoder, error) {
metric.KvEncoderCounter.WithLabelValues("open").Inc()
se := newSession(options)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
Expand Down
14 changes: 3 additions & 11 deletions br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@ package kv
import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

type kvSuite struct{}

var _ = Suite(&kvSuite{})

func TestKV(t *testing.T) {
TestingT(t)
}

func (s *kvSuite) TestSession(c *C) {
func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
_, err := session.Txn(true)
c.Assert(err, IsNil)
require.NoError(t, err)
}
Loading

0 comments on commit 2b6d702

Please sign in to comment.