Skip to content

Commit

Permalink
support weekly rotations, support YEARWEEK function, support 'mode' a…
Browse files Browse the repository at this point in the history
…rgument

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Jan 2, 2025
1 parent b1d4715 commit 087af5f
Show file tree
Hide file tree
Showing 2 changed files with 451 additions and 39 deletions.
142 changes: 117 additions & 25 deletions go/vt/schemadiff/partitioning_analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package schemadiff
import (
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand All @@ -28,6 +29,26 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
)

const (
ModeUndefined = math.MinInt
)

// TemporalRangePartitioningAnalysis is the result of analyzing a table for temporal range partitioning.
type TemporalRangePartitioningAnalysis struct {
IsRangePartitioned bool // Is the table at all partitioned by RANGE?
IsTemporalRangePartitioned bool // Is the table range partitioned using temporal values?
IsRangeColumns bool // Is RANGE COLUMNS used?
MinimalInterval datetime.IntervalType // The minimal interval that the table is partitioned by (e.g. if partitioned by TO_DAYS, the minimal interval is 1 day)
Col *ColumnDefinitionEntity // The column used in the RANGE expression
FuncExpr *sqlparser.FuncExpr // The function used in the RANGE expression, if any
Mode int // The mode used in the WEEK function, if that's what's used
MaxvaluePartition *sqlparser.PartitionDefinition // The partition that has MAXVALUE, if any
HighestValueDateTime datetime.DateTime // The datetime value of the highest partition (excluding MAXVALUE)
HighestValueIntVal int64 // The integer value of the highest partition (excluding MAXVALUE)
Reason string // Why IsTemporalRangePartitioned is false
Error error // If there was an error during analysis
}

// IsRangePartitioned returns `true` when the given CREATE TABLE statement is partitioned by RANGE.
func IsRangePartitioned(createTable *sqlparser.CreateTable) bool {
if createTable.TableSpec.PartitionOption == nil {
Expand Down Expand Up @@ -83,31 +104,22 @@ func AlterTableRotatesRangePartition(createTable *sqlparser.CreateTable, alterTa
}
}

// TemporalRangePartitioningAnalysis is the result of analyzing a table for temporal range partitioning.
type TemporalRangePartitioningAnalysis struct {
IsRangePartitioned bool // Is the table at all partitioned by RANGE?
IsTemporalRangePartitioned bool // Is the table range partitioned using temporal values?
IsRangeColumns bool // Is RANGE COLUMNS used?
MinimalInterval datetime.IntervalType // The minimal interval that the table is partitioned by (e.g. if partitioned by TO_DAYS, the minimal interval is 1 day)
Col *ColumnDefinitionEntity // The column used in the RANGE expression
FuncExpr *sqlparser.FuncExpr // The function used in the RANGE expression, if any
MaxvaluePartition *sqlparser.PartitionDefinition // The partition that has MAXVALUE, if any
HighestValueDateTime datetime.DateTime // The datetime value of the highest partition (excluding MAXVALUE)
HighestValueIntVal int64 // The integer value of the highest partition (excluding MAXVALUE)
Reason string // Why IsTemporalRangePartitioned is false
Error error // If there was an error during analysis
}

// supportedPartitioningScheme checks whether the given expression is supported for temporal range partitioning.
// schemadiff only supports a subset of range partitioning expressions.
func supportedPartitioningScheme(expr sqlparser.Expr, createTableEntity *CreateTableEntity, colName sqlparser.IdentifierCI, is84 bool) (matchFound bool, hasFunction bool, err error) {
supportedVariations := []string{
"create table %s (id int) PARTITION BY RANGE (%s)",
"create table %s (id int) PARTITION BY RANGE (to_seconds(%s))",
"create table %s (id int) PARTITION BY RANGE (to_days(%s))",
// "create table %s (id int) PARTITION BY RANGE (yearweek(%s))",
// "create table %s (id int) PARTITION BY RANGE (yearweek(%s, 0))",
// "create table %s (id int) PARTITION BY RANGE (yearweek(%s, 1))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 0))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 1))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 2))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 3))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 4))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 5))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 6))",
"create table %s (id int) PARTITION BY RANGE (yearweek(%s, 7))",
"create table %s (id int) PARTITION BY RANGE (year(%s))",
}
if is84 {
Expand All @@ -133,6 +145,32 @@ func supportedPartitioningScheme(expr sqlparser.Expr, createTableEntity *CreateT
return false, false, nil
}

// extractFuncMode extracts the mode argument from a WEEK/YEARWEEK function, if applicable.
// It returns a ModeUndefined when not applicable.
func extractFuncMode(funcExpr *sqlparser.FuncExpr) (int, error) {
switch funcExpr.Name.Lowered() {
case "week", "yearweek":
default:
return ModeUndefined, nil
}
if len(funcExpr.Exprs) <= 1 {
return 0, nil
}
// There is a `mode` argument in the YEARWEEK function.
literal, ok := funcExpr.Exprs[1].(*sqlparser.Literal)
if !ok {
return 0, fmt.Errorf("expected literal value in %v function", sqlparser.CanonicalString(funcExpr))
}
if literal.Type != sqlparser.IntVal {
return 0, fmt.Errorf("expected integer literal argument in %v function", sqlparser.CanonicalString(funcExpr))
}
intval, err := strconv.ParseInt(literal.Val, 0, 64)
if err != nil {
return 0, err
}
return int(intval), nil
}

// AnalyzeTemporalRangePartitioning analyzes a table for temporal range partitioning.
func AnalyzeTemporalRangePartitioning(createTableEntity *CreateTableEntity) (*TemporalRangePartitioningAnalysis, error) {
analysis := &TemporalRangePartitioningAnalysis{}
Expand Down Expand Up @@ -162,6 +200,7 @@ func AnalyzeTemporalRangePartitioning(createTableEntity *CreateTableEntity) (*Te
}

analysis.IsRangeColumns = len(partitionOption.ColList) > 0
analysis.Mode = ModeUndefined
switch len(partitionOption.ColList) {
case 0:
// This is a PARTITION BY RANGE(expr), where "expr" can be just column name, or a complex expression.
Expand All @@ -175,17 +214,25 @@ func AnalyzeTemporalRangePartitioning(createTableEntity *CreateTableEntity) (*Te
// we create dummy statements with all supported variations, and check for equality.
var col *ColumnDefinitionEntity
expr := sqlparser.CloneExpr(partitionOption.Expr)
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ColName:
col = getColumn(node.Name.Lowered()) // known to be not-nil thanks to validate()
analysis.Col = col
case *sqlparser.FuncExpr:
analysis.FuncExpr = sqlparser.CloneRefOfFuncExpr(node)
node.Name = sqlparser.NewIdentifierCI(node.Name.Lowered())
mode, err := extractFuncMode(node)
if err != nil {
return false, err
}
analysis.Mode = mode
}
return true, nil
}, expr)
if err != nil {
return nil, err
}
matchFound, hasFunction, err := supportedPartitioningScheme(expr, createTableEntity, col.ColumnDefinition.Name, is84)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +292,8 @@ func AnalyzeTemporalRangePartitioning(createTableEntity *CreateTableEntity) (*Te
analysis.MinimalInterval = datetime.IntervalSecond
case "to_days":
analysis.MinimalInterval = datetime.IntervalDay
case "yearweek":
analysis.MinimalInterval = datetime.IntervalWeek
case "year":
analysis.MinimalInterval = datetime.IntervalYear
}
Expand All @@ -263,7 +312,7 @@ func AnalyzeTemporalRangePartitioning(createTableEntity *CreateTableEntity) (*Te
if err != nil {
return analysis, err
}
highestValueDateTime, err = truncateDateTime(highestValueDateTime, analysis.MinimalInterval)
highestValueDateTime, err = truncateDateTime(highestValueDateTime, analysis.MinimalInterval, analysis.Mode)
if err != nil {
return analysis, err
}
Expand Down Expand Up @@ -332,7 +381,9 @@ func computeDateTime(expr sqlparser.Expr, colType string, funcExpr *sqlparser.Fu
}
hasFuncExpr = true
case *sqlparser.Literal:
literal = node
if literal == nil {
literal = node
}
}
return true, nil
}, expr)
Expand Down Expand Up @@ -373,6 +424,12 @@ func applyFuncExprToDateTime(dt datetime.DateTime, funcExpr *sqlparser.FuncExpr)
intval = dt.ToSeconds()
case "to_days":
intval = int64(datetime.MysqlDayNumber(dt.Date.Year(), dt.Date.Month(), dt.Date.Day()))
case "yearweek":
mode, err := extractFuncMode(funcExpr)
if err != nil {
return 0, err
}
intval = int64(dt.Date.YearWeek(mode))
case "year":
intval = int64(dt.Date.Year())
default:
Expand All @@ -389,6 +446,7 @@ func temporalPartitionName(dt datetime.DateTime, resolution datetime.IntervalTyp
switch resolution {
case datetime.IntervalYear,
datetime.IntervalMonth,
datetime.IntervalWeek,
datetime.IntervalDay:
return "p" + string(datetime.Date_YYYYMMDD.Format(dt, 0)), nil
case datetime.IntervalHour,
Expand All @@ -403,7 +461,8 @@ func temporalPartitionName(dt datetime.DateTime, resolution datetime.IntervalTyp
// e.g. if resolution is IntervalDay, the time part is removed.
// If resolution is IntervalMonth, the day part is set to 1.
// etc.
func truncateDateTime(dt datetime.DateTime, interval datetime.IntervalType) (datetime.DateTime, error) {
// `mode` is used for WEEK calculations, see https://dev.mysql.com/doc/refman/8.4/en/date-and-time-functions.html#function_week
func truncateDateTime(dt datetime.DateTime, interval datetime.IntervalType, mode int) (datetime.DateTime, error) {
if interval >= datetime.IntervalHour {
// Remove the minutes, seconds, subseconds parts
hourInterval := datetime.ParseIntervalInt64(int64(dt.Time.Hour()), datetime.IntervalHour, false)
Expand All @@ -418,6 +477,25 @@ func truncateDateTime(dt datetime.DateTime, interval datetime.IntervalType) (dat
// Remove the Time part:
dt = datetime.DateTime{Date: dt.Date}
}
if interval == datetime.IntervalWeek {
// IntervalWeek = IntervalDay | intervalMulti, which is larger than IntervalYear, so we interject here
// Get back to the first day of the week:
var startOfWeekInterval *datetime.Interval
switch mode {
case 0, 2, 4, 6:
startOfWeekInterval = datetime.ParseIntervalInt64(-int64(dt.Date.Weekday()), datetime.IntervalDay, false)
case 1, 3, 5, 7:
startOfWeekInterval = datetime.ParseIntervalInt64(-int64(dt.Date.Weekday()-1), datetime.IntervalDay, false)
default:
return dt, fmt.Errorf("invalid mode value %d for WEEK/YEARWEEK function", mode)
}
var ok bool
dt, _, ok = dt.AddInterval(startOfWeekInterval, 0, false)
if !ok {
return dt, fmt.Errorf("failed to add interval %v to reference time %v", startOfWeekInterval, dt.Format(0))
}
return dt, nil
}
if interval >= datetime.IntervalMonth {
// Get back to the first day of the month:
dayInterval := datetime.ParseIntervalInt64(int64(-(dt.Date.Day() - 1)), datetime.IntervalDay, false)
Expand Down Expand Up @@ -446,18 +524,32 @@ func truncateDateTime(dt datetime.DateTime, interval datetime.IntervalType) (dat
// e.g. "prepare 7 days ahead, starting from today".
// The function computes values of existing partitions to determine how many new partitions are actually
// required to satisfy the terms.
func TemporalRangePartitioningNextRotation(createTableEntity *CreateTableEntity, interval datetime.IntervalType, prepareAheadCount int, reference time.Time) (diffs []*AlterTableEntityDiff, err error) {
func TemporalRangePartitioningNextRotation(createTableEntity *CreateTableEntity, interval datetime.IntervalType, mode int, prepareAheadCount int, reference time.Time) (diffs []*AlterTableEntityDiff, err error) {
analysis, err := AnalyzeTemporalRangePartitioning(createTableEntity)
if err != nil {
return nil, err
}
if !analysis.IsTemporalRangePartitioned {
return nil, errors.New(analysis.Reason)
}
if interval < analysis.MinimalInterval {
intervalIsTooSmall := false
// IntervalWeek = IntervalDay | intervalMulti, which is larger all of the rest of normal intervals,
// so we need special handling for IntervalWeek comparisons
switch {
case analysis.MinimalInterval == datetime.IntervalWeek:
intervalIsTooSmall = (interval <= datetime.IntervalDay)
case interval == datetime.IntervalWeek:
intervalIsTooSmall = (analysis.MinimalInterval >= datetime.IntervalMonth)
default:
intervalIsTooSmall = (interval < analysis.MinimalInterval)
}
if intervalIsTooSmall {
return nil, fmt.Errorf("interval %s is less than the minimal interval %s for table %s", interval.ToString(), analysis.MinimalInterval.ToString(), createTableEntity.Name())
}
referenceDatetime, err := truncateDateTime(datetime.NewDateTimeFromStd(reference), interval)
if analysis.Mode != ModeUndefined && mode != analysis.Mode {
return nil, fmt.Errorf("mode %d is different from the mode %d used in table %s", mode, analysis.Mode, createTableEntity.Name())
}
referenceDatetime, err := truncateDateTime(datetime.NewDateTimeFromStd(reference), interval, mode)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 087af5f

Please sign in to comment.