Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Vreplication streamer to convert textual columns to UTF8 #8356

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/mysql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ from information_schema.columns as ISC
ISC.ordinal_position = c.ordinal_position
where c.table_schema = database() AND ISC.table_schema is null`

// FetchTableColumn queries information about columns in a given table
FetchTableColumns = `select
column_name as column_name, data_type as data_type, character_set_name as character_set_name,
(data_type rlike '.*int') as is_int,
(data_type rlike '.*text' or data_type rlike '.*char') as is_string,
(character_set_name rlike 'utf8.*') as is_utf8,
(data_type='enum') as is_enum
from information_schema.columns
where table_schema = database() and
table_name=%a
order by ordinal_position`

// DetectSchemaChange query detects if there is any schema change from previous copy.
DetectSchemaChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns

Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
}
}
// We will always read strings as utf8mb4.
// sb.WriteString(escapeName(name))
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
// - ...any other future possible values
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
fmt.Printf("============== conversion for %v: from %v to %v\n", field.Name, conversion.FromCharset, conversion.ToCharset)
// Non-null string value, for which we have a charset conversion instruction
valString := val.ToString()
fromEncoding, encodingOK := mysql.CharacterSetEncoding[conversion.FromCharset]
Expand Down
11 changes: 0 additions & 11 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,6 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
colName: as,
references: make(map[string]bool),
}
if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok {
selExpr := &sqlparser.ConvertUsingExpr{
Type: "utf8mb4",
Expr: &sqlparser.ColName{Name: as},
}
cexpr.expr = expr
cexpr.operation = opExpr
tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: selExpr, As: as})
cexpr.references[as.Lowered()] = true
return cexpr, nil
}
if expr, ok := aliased.Expr.(*sqlparser.FuncExpr); ok {
if expr.Distinct {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,20 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
fieldEvent := &binlogdatapb.FieldEvent{
TableName: initialPlan.SendRule.Match,
}

for _, f := range rows.Fields {
fmt.Printf("========= vcopier field: name=%v\n", f.Name)
fmt.Printf("========= vcopier field: %v\n", f)
}
fieldEvent.Fields = append(fieldEvent.Fields, rows.Fields...)
vc.tablePlan, err = plan.buildExecutionPlan(fieldEvent)
if err != nil {
return err
}
for _, f := range rows.Pkfields {
fmt.Printf("========= vcopier pkfields: name=%v\n", f.Name)
fmt.Printf("========= vcopier pkfields: %v\n", f)
}
pkfields = append(pkfields, rows.Pkfields...)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("update _vt.copy_state set lastpk=%a where vrepl_id=%s and table_name=%s", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName))
Expand Down
31 changes: 0 additions & 31 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ type Plan struct {
// in the stream.
ColExprs []ColExpr

convertUsingUTF8Columns map[string]bool

// Filters is the list of filters to be applied to the columns
// of the table.
Filters []Filter
Expand Down Expand Up @@ -442,24 +440,6 @@ func analyzeSelect(query string) (sel *sqlparser.Select, fromTable sqlparser.Tab
return sel, fromTable, nil
}

// isConvertColumnUsingUTF8 returns 'true' when given column needs to be converted as UTF8
// while read from source table
func (plan *Plan) isConvertColumnUsingUTF8(columnName string) bool {
if plan.convertUsingUTF8Columns == nil {
return false
}
return plan.convertUsingUTF8Columns[columnName]
}

// setConvertColumnUsingUTF8 marks given column as needs to be converted as UTF8
// while read from source table
func (plan *Plan) setConvertColumnUsingUTF8(columnName string) {
if plan.convertUsingUTF8Columns == nil {
plan.convertUsingUTF8Columns = map[string]bool{}
}
plan.convertUsingUTF8Columns[columnName] = true
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
Expand Down Expand Up @@ -620,17 +600,6 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
ColNum: -1,
FixedValue: sqltypes.NewInt64(num),
}, nil
case *sqlparser.ConvertUsingExpr:
colnum, err := findColumn(plan.Table, aliased.As)
if err != nil {
return ColExpr{}, err
}
field := plan.Table.Fields[colnum]
plan.setConvertColumnUsingUTF8(field.Name)
return ColExpr{
ColNum: colnum,
Field: field,
}, nil
default:
log.Infof("Unsupported expression: %v", inner)
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(aliased.Expr))
Expand Down
79 changes: 62 additions & 17 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package vstreamer
import (
"context"
"fmt"
"math"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -62,26 +64,28 @@ type rowStreamer struct {
send func(*binlogdatapb.VStreamRowsResponse) error
vschema *localVSchema

plan *Plan
pkColumns []int
sendQuery string
vse *Engine
pktsize PacketSizer
plan *Plan
pkColumns []int
toUTF8Columns map[string]bool
sendQuery string
vse *Engine
pktsize PacketSizer
}

func newRowStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, query string, lastpk []sqltypes.Value, vschema *localVSchema, send func(*binlogdatapb.VStreamRowsResponse) error, vse *Engine) *rowStreamer {
ctx, cancel := context.WithCancel(ctx)
return &rowStreamer{
ctx: ctx,
cancel: cancel,
cp: cp,
se: se,
query: query,
lastpk: lastpk,
send: send,
vschema: vschema,
vse: vse,
pktsize: DefaultPacketSizer(),
ctx: ctx,
cancel: cancel,
cp: cp,
se: se,
query: query,
lastpk: lastpk,
send: send,
vschema: vschema,
vse: vse,
pktsize: DefaultPacketSizer(),
toUTF8Columns: make(map[string]bool),
}
}

Expand Down Expand Up @@ -134,6 +138,10 @@ func (rs *rowStreamer) buildPlan() error {
log.Errorf("%s", err.Error())
return err
}
err = rs.buildTextualColumns()
if err != nil {
return err
}
rs.pkColumns, err = buildPKColumns(st)
if err != nil {
return err
Expand All @@ -145,6 +153,38 @@ func (rs *rowStreamer) buildPlan() error {
return err
}

func (rs *rowStreamer) buildTextualColumns() error {
fmt.Printf("========== buildTextualColumns\n")
conn, err := snapshotConnect(rs.ctx, rs.cp)
if err != nil {
return err
}
defer conn.Close()

query, err := sqlparser.ParseAndBind(mysql.FetchTableColumns,
sqltypes.StringBindVariable(rs.plan.Table.Name),
)
if err != nil {
return err
}
fmt.Printf("========== query=%s\n", query)
r, err := conn.ExecuteFetch(query, math.MaxInt64, true)
if err != nil {
return err
}

for _, row := range r.Named().Rows {
colName := row["column_name"].ToString()
isString := row.AsBool("is_string", false)
isUtf8 := row.AsBool("is_utf8", false)
if isString && !isUtf8 {
log.Infof("========== rs.toUTF8Columns[colName]:%s\n", colName)
rs.toUTF8Columns[colName] = true
}
}
return nil
}

func buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, error) {
var pkColumns = make([]int, 0)
if len(st.PKColumns) == 0 {
Expand All @@ -169,8 +209,8 @@ func (rs *rowStreamer) buildSelect() (string, error) {
buf.Myprintf("select ")
prefix := ""
for _, col := range rs.plan.Table.Fields {
if rs.plan.isConvertColumnUsingUTF8(col.Name) {
buf.Myprintf("%sconvert(%v using utf8mb4)", prefix, sqlparser.NewColIdent(col.Name))
if rs.toUTF8Columns[col.Name] {
buf.Myprintf("%sconvert(%v using utf8mb4) as %v", prefix, sqlparser.NewColIdent(col.Name), sqlparser.NewColIdent(col.Name))
} else {
buf.Myprintf("%s%v", prefix, sqlparser.NewColIdent(col.Name))
}
Expand Down Expand Up @@ -212,6 +252,7 @@ func (rs *rowStreamer) buildSelect() (string, error) {

func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.VStreamRowsResponse) error) error {
log.Infof("Streaming query: %v\n", rs.sendQuery)
fmt.Printf("============== Streaming query: %v\n", rs.sendQuery)
gtid, err := conn.streamWithSnapshot(rs.ctx, rs.plan.Table.Name, rs.sendQuery)
if err != nil {
return err
Expand All @@ -230,6 +271,10 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V
}
}

for _, f := range rs.plan.fields() {
fmt.Printf("========= streamrows field: name=%v\n", f.Name)
fmt.Printf("========= streamrows field: %v\n", f)
}
err = send(&binlogdatapb.VStreamRowsResponse{
Fields: rs.plan.fields(),
Pkfields: pkfields,
Expand Down