Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ce-trace-part3
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate committed Dec 9, 2021
2 parents 93a10a0 + c89c473 commit 1ee14eb
Show file tree
Hide file tree
Showing 28 changed files with 1,016 additions and 736 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ gotest_in_verify_ci_part_2: failpoint-enable tools/bin/gotestsum tools/bin/gocov

race: failpoint-enable
@export log_level=debug; \
$(GOTEST) -timeout 20m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -timeout 25m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

leak: failpoint-enable
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ func (c *pdClient) getMaxReplica(ctx context.Context) (int, error) {
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
var conf config.Config
if err := json.NewDecoder(res.Body).Decode(&conf); err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -482,11 +487,15 @@ func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string)
if err != nil {
return rule, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
b, err := io.ReadAll(res.Body)
if err != nil {
return rule, errors.Trace(err)
}
res.Body.Close()
err = json.Unmarshal(b, &rule)
if err != nil {
return rule, errors.Trace(err)
Expand Down
2 changes: 0 additions & 2 deletions br/tests/lightning_error_summary/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

set -eux

# skip for temporary due to checksum for table a,c succeed, but expect to fail.
exit 0
# Check that error summary are written at the bottom of import.
run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_error_summary;'

Expand Down
49 changes: 41 additions & 8 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,25 +454,33 @@ func adjustTableCollation(tctx *tcontext.Context, parser *parser.Parser, originS
return originSQL, nil
}
var charset string
var collation string
for _, createOption := range createStmt.Options {
// already have 'Collation'
if createOption.Tp == ast.TableOptionCollate {
return originSQL, nil
collation = createOption.StrValue
break
}
if createOption.Tp == ast.TableOptionCharset {
charset = createOption.StrValue
}
}

// get db collation
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(charset)]
if !ok {
tctx.L().Warn("not found table charset default collation.", zap.String("originSQL", originSQL), zap.String("charset", strings.ToLower(charset)))
return originSQL, nil
if collation == "" && charset != "" {
// get db collation
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(charset)]
if !ok {
tctx.L().Warn("not found table charset default collation.", zap.String("originSQL", originSQL), zap.String("charset", strings.ToLower(charset)))
return originSQL, nil
}

// add collation
createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation})
}

// add collation
createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation})
// adjust columns collation
adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap)

// rewrite sql
var b []byte
bf := bytes.NewBuffer(b)
Expand All @@ -486,6 +494,31 @@ func adjustTableCollation(tctx *tcontext.Context, parser *parser.Parser, originS
return bf.String(), nil
}

// adjustColumnsCollation adds column's collation.
func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableStmt, charsetAndDefaultCollationMap map[string]string) {
for _, col := range createStmt.Cols {
for _, options := range col.Options {
// already have 'Collation'
if options.Tp == ast.ColumnOptionCollate {
continue
}
}
fieldType := col.Tp
if fieldType.Collate != "" {
continue
}
if fieldType.Charset != "" {
// just have charset
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(fieldType.Charset)]
if !ok {
tctx.L().Warn("not found charset default collation for column.", zap.String("table", createStmt.Table.Name.String()), zap.String("column", col.Name.String()), zap.String("charset", strings.ToLower(fieldType.Charset)))
continue
}
fieldType.Collate = collation
}
}
}

func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error {
conf := d.conf
if conf.NoData {
Expand Down
18 changes: 17 additions & 1 deletion dumpling/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,27 @@ func TestAdjustTableCollation(t *testing.T) {
originSQLs := []string{
"create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int) CHARSET=utf8mb4",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ",
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 ",
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4",
}

expectedSQLs := []string{
"create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"CREATE TABLE `test`.`t1` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t1` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
}

charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
Expand Down
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9487,3 +9487,17 @@ func (s *testSerialSuite) TestIssue28650(c *C) {
}()
}
}

func (s *testSerialSuite) TestIssue30289(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
fpName := "github.com/pingcap/tidb/executor/issue30289"
c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}
13 changes: 12 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
if probeSideResult.NumRows() == 0 && !e.useOuterToBuild {
e.finished.Store(true)
return
}
emptyBuild, buildErr := e.wait4BuildSide()
if buildErr != nil {
Expand Down Expand Up @@ -258,6 +262,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
return
Expand Down
28 changes: 21 additions & 7 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,39 @@ func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
if r.inputHolderCh != nil {
close(r.inputHolderCh)
}
if r.inputCh != nil {
close(r.inputCh)
}
}
close(w.outputHolderCh)
if w.outputHolderCh != nil {
close(w.outputHolderCh)
}
}
if e.outputCh != nil {
close(e.outputCh)
}
close(e.outputCh)
}
close(e.finishCh)
if e.finishCh != nil {
close(e.finishCh)
}
for _, w := range e.workers {
for _, r := range w.receivers {
for range r.inputCh {
if r.inputCh != nil {
for range r.inputCh {
}
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
if e.outputCh != nil {
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
}
e.executed = false

Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -4856,7 +4856,7 @@ func (b *builtinUnixTimestampIntSig) evalIntWithCtx(ctx sessionctx.Context, row
}

tz := ctx.GetSessionVars().Location()
t, err := val.GoTime(tz)
t, err := val.AdjustedGoTime(tz)
if err != nil {
return 0, false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_time_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2390,7 +2390,7 @@ func (b *builtinUnixTimestampIntSig) vecEvalInt(input *chunk.Chunk, result *chun
continue
}

t, err := buf.GetTime(i).GoTime(getTimeZone(b.ctx))
t, err := buf.GetTime(i).AdjustedGoTime(getTimeZone(b.ctx))
if err != nil {
i64s[i] = 0
continue
Expand Down
5 changes: 3 additions & 2 deletions expression/distsql_builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -40,7 +41,7 @@ func PbTypeToFieldType(tp *tipb.FieldType) *types.FieldType {
Flen: int(tp.Flen),
Decimal: int(tp.Decimal),
Charset: tp.Charset,
Collate: protoToCollation(tp.Collate),
Collate: collate.ProtoToCollation(tp.Collate),
Elems: tp.Elems,
}
}
Expand Down Expand Up @@ -1216,7 +1217,7 @@ func convertUint(val []byte) (*Constant, error) {

func convertString(val []byte, tp *tipb.FieldType) (*Constant, error) {
var d types.Datum
d.SetBytesAsString(val, protoToCollation(tp.Collate), uint32(tp.Flen))
d.SetBytesAsString(val, collate.ProtoToCollation(tp.Collate), uint32(tp.Flen))
return &Constant{Value: d, RetType: types.NewFieldType(mysql.TypeVarString)}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion expression/distsql_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func toPBFieldType(ft *types.FieldType) *tipb.FieldType {
Flen: int32(ft.Flen),
Decimal: int32(ft.Decimal),
Charset: ft.Charset,
Collate: collationToProto(ft.Collate),
Collate: collate.CollationToProto(ft.Collate),
Elems: ft.Elems,
}
}
Expand Down
33 changes: 2 additions & 31 deletions expression/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -157,7 +156,7 @@ func ToPBFieldType(ft *types.FieldType) *tipb.FieldType {
Flen: int32(ft.Flen),
Decimal: int32(ft.Decimal),
Charset: ft.Charset,
Collate: collationToProto(ft.Collate),
Collate: collate.CollationToProto(ft.Collate),
Elems: ft.Elems,
}
}
Expand All @@ -170,39 +169,11 @@ func FieldTypeFromPB(ft *tipb.FieldType) *types.FieldType {
Flen: int(ft.Flen),
Decimal: int(ft.Decimal),
Charset: ft.Charset,
Collate: protoToCollation(ft.Collate),
Collate: collate.ProtoToCollation(ft.Collate),
Elems: ft.Elems,
}
}

func collationToProto(c string) int32 {
if coll, err := charset.GetCollationByName(c); err == nil {
return collate.RewriteNewCollationIDIfNeeded(int32(coll.ID))
}
v := collate.RewriteNewCollationIDIfNeeded(int32(mysql.DefaultCollationID))
logutil.BgLogger().Warn(
"Unable to get collation ID by name, use ID of the default collation instead",
zap.String("name", c),
zap.Int32("default collation ID", v),
zap.String("default collation", mysql.DefaultCollationName),
)
return v
}

func protoToCollation(c int32) string {
coll, err := charset.GetCollationByID(int(collate.RestoreCollationIDIfNeeded(c)))
if err == nil {
return coll.Name
}
logutil.BgLogger().Warn(
"Unable to get collation name from ID, use name of the default collation instead",
zap.Int32("id", c),
zap.Int("default collation ID", mysql.DefaultCollationID),
zap.String("default collation", mysql.DefaultCollationName),
)
return mysql.DefaultCollationName
}

func (pc PbConverter) columnToPBExpr(column *Column) *tipb.Expr {
if !pc.client.IsRequestTypeSupported(kv.ReqTypeSelect, int64(tipb.ExprType_ColumnRef)) {
return nil
Expand Down
21 changes: 21 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3978,6 +3978,8 @@ func (s *testIntegrationSuite) TestCompareBuiltin(c *C) {
result.Check(testkit.Rows("1 1 1"))
result = tk.MustQuery(`select INTERVAL(100, NULL, NULL, NULL, NULL, NULL, 100);`)
result.Check(testkit.Rows("6"))
result = tk.MustQuery(`SELECT INTERVAL(0,(1*5)/2) + INTERVAL(5,4,3);`)
result.Check(testkit.Rows("2"))

// for greatest
result = tk.MustQuery(`select greatest(1, 2, 3), greatest("a", "b", "c"), greatest(1.1, 1.2, 1.3), greatest("123a", 1, 2)`)
Expand Down Expand Up @@ -9365,6 +9367,25 @@ func (s *testIntegrationSuite) TestIssue30101(c *C) {
tk.MustQuery("select greatest(c1, c2) from t1;").Sort().Check(testkit.Rows("9223372036854775809"))
}

func (s *testIntegrationSuite) TestIssue28739(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`USE test`)
tk.MustExec("SET time_zone = 'Europe/Vilnius'")
tk.MustQuery("SELECT UNIX_TIMESTAMP('2020-03-29 03:45:00')").Check(testkit.Rows("1585443600"))
tk.MustQuery("SELECT FROM_UNIXTIME(UNIX_TIMESTAMP('2020-03-29 03:45:00'))").Check(testkit.Rows("2020-03-29 04:00:00"))
tk.MustExec(`DROP TABLE IF EXISTS t`)
tk.MustExec(`CREATE TABLE t (dt DATETIME NULL)`)
defer tk.MustExec(`DROP TABLE t`)
// Test the vector implememtation
tk.MustExec(`INSERT INTO t VALUES ('2021-10-31 02:30:00'), ('2021-03-28 02:30:00'), ('2020-10-04 02:15:00'), ('2020-03-29 03:45:00'), (NULL)`)
tk.MustQuery(`SELECT dt, UNIX_TIMESTAMP(dt) FROM t`).Sort().Check(testkit.Rows(
"2020-03-29 03:45:00 1585443600",
"2020-10-04 02:15:00 1601766900",
"2021-03-28 02:30:00 1616891400",
"2021-10-31 02:30:00 1635636600",
"<nil> <nil>"))
}

func (s *testIntegrationSuite) TestIssue30326(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
Loading

0 comments on commit 1ee14eb

Please sign in to comment.