Skip to content

Commit

Permalink
ddl:fix create partitioned table with bigint column fail
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll committed Aug 30, 2018
1 parent 67d7544 commit 2c1fb5b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
16 changes: 16 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,22 @@ func (s *testDBSuite) TestCreateTableWithPartition(c *C) {
partition p1 values less than (to_seconds('2005-01-01')));`)
s.tk.MustQuery("show create table t26").Check(
testkit.Rows("t26 CREATE TABLE `t26` (\n `a` date DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin\nPARTITION BY RANGE ( to_seconds(`a`) ) (\n PARTITION p0 VALUES LESS THAN (63240134400),\n PARTITION p1 VALUES LESS THAN (63271756800)\n)"))
s.tk.MustExec(`create table t27 (a bigint unsigned not null)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (100),
partition p2 values less than (1000),
partition p3 values less than (18446744073709551000),
partition p4 values less than (18446744073709551614)
);`)
s.tk.MustExec(`create table t28 (a bigint unsigned not null)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (100),
partition p2 values less than (1000),
partition p3 values less than (18446744073709551000 + 1),
partition p4 values less than (18446744073709551000 + 10)
);`)
}

func (s *testDBSuite) TestTableDDLWithFloatType(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

if err = checkCreatePartitionValue(ctx, tbInfo, pi); err != nil {
if err = checkCreatePartitionValue(ctx, tbInfo, pi, cols); err != nil {
return errors.Trace(err)
}

Expand Down
68 changes: 49 additions & 19 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func checkPartitionFuncType(ctx sessionctx.Context, s *ast.CreateTableStmt, cols

// checkCreatePartitionValue checks whether `less than value` is strictly increasing for each partition.
// Side effect: it may simplify the partition range definition from a constant expression to an integer.
func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo, pi *model.PartitionInfo) error {
func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo, pi *model.PartitionInfo, cols []*table.Column) error {
defs := pi.Definitions
if len(defs) <= 1 {
return nil
Expand All @@ -187,13 +187,14 @@ func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo,
if strings.EqualFold(defs[len(defs)-1].LessThan[0], partitionMaxValue) {
defs = defs[:len(defs)-1]
}
var prevRangeValue int64
isUnsignedBigint := FindRangePartitionColTp(cols, pi)
var prevRangeValue interface{}
for i := 0; i < len(defs); i++ {
if strings.EqualFold(defs[i].LessThan[0], partitionMaxValue) {
return errors.Trace(ErrPartitionMaxvalue)
}

currentRangeValue, fromExpr, err := getRangeValue(ctx, tblInfo, defs[i].LessThan[0])
currentRangeValue, fromExpr, err := getRangeValue(ctx, tblInfo, defs[i].LessThan[0], isUnsignedBigint)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -207,8 +208,14 @@ func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo,
continue
}

if currentRangeValue <= prevRangeValue {
return errors.Trace(ErrRangeNotIncreasing)
if isUnsignedBigint {
if currentRangeValue.(uint64) <= prevRangeValue.(uint64) {
return errors.Trace(ErrRangeNotIncreasing)
}
} else {
if currentRangeValue.(int64) <= prevRangeValue.(int64) {
return errors.Trace(ErrRangeNotIncreasing)
}
}
prevRangeValue = currentRangeValue
}
Expand All @@ -217,23 +224,34 @@ func checkCreatePartitionValue(ctx sessionctx.Context, tblInfo *model.TableInfo,

// getRangeValue gets an integer from the range value string.
// The returned boolean value indicates whether the input string is a constant expression.
func getRangeValue(ctx sessionctx.Context, tblInfo *model.TableInfo, str string) (int64, bool, error) {

if value, err := strconv.ParseInt(str, 10, 64); err == nil {
return value, false, nil
}
func getRangeValue(ctx sessionctx.Context, tblInfo *model.TableInfo, str string, ifUnsignedBigint bool) (interface{}, bool, error) {
// Unsigned bigint was converted to uint64 handle.
if ifUnsignedBigint {
if value, err := strconv.ParseUint(str, 10, 64); err == nil {
return value, false, nil
}

// The range value maybe not an integer, it could be a constant expression.
// For example, the following two cases are the same:
// PARTITION p0 VALUES LESS THAN (TO_SECONDS('2004-01-01'))
// PARTITION p0 VALUES LESS THAN (63340531200)
if e, err1 := expression.ParseSimpleExprWithTableInfo(ctx, str, tblInfo); err1 == nil {
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
if err2 == nil && isNull == false {
return res, true, nil
if e, err1 := expression.ParseSimpleExprWithTableInfo(ctx, str, tblInfo); err1 == nil {
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
if err2 == nil && isNull == false {
return uint64(res), true, nil
}
}
} else {
if value, err := strconv.ParseInt(str, 10, 64); err == nil {
return value, false, nil
}
// The range value maybe not an integer, it could be a constant expression.
// For example, the following two cases are the same:
// PARTITION p0 VALUES LESS THAN (TO_SECONDS('2004-01-01'))
// PARTITION p0 VALUES LESS THAN (63340531200)
if e, err1 := expression.ParseSimpleExprWithTableInfo(ctx, str, tblInfo); err1 == nil {
res, isNull, err2 := e.EvalInt(ctx, chunk.Row{})
if err2 == nil && isNull == false {
return res, true, nil
}
}
}

return 0, false, ErrNotAllowedTypeInPartition.GenByArgs(str)
}

Expand Down Expand Up @@ -388,3 +406,15 @@ func checkConstraintIncludePartKey(partkeys []string, constraints map[string]str
}
return true
}

// FindRangePartitionColTp finds the type of the partitioning key column type.
// The returned boolean value indicates whether the partitioning key column type is unsigned bigint type.
func FindRangePartitionColTp(cols []*table.Column, pi *model.PartitionInfo) bool {
for _, col := range cols {
isUnsigned := col.Tp == mysql.TypeLonglong && mysql.HasUnsignedFlag(col.Flag)
if isUnsigned && strings.Contains(strings.ToLower(pi.Expr), col.Name.L) {
return true
}
}
return false
}

0 comments on commit 2c1fb5b

Please sign in to comment.