From cb36c9aa08896740d315d67ebc2ebe523080ab53 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 21 Jun 2021 09:35:07 +0300 Subject: [PATCH] experimental: streamer to analyze textual columns Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/schema.go | 12 +++ go/vt/vttablet/onlineddl/vrepl.go | 1 + .../vreplication/replicator_plan.go | 1 + .../vreplication/table_plan_builder.go | 11 --- .../tabletmanager/vreplication/vcopier.go | 9 +++ .../tabletserver/vstreamer/planbuilder.go | 31 -------- .../tabletserver/vstreamer/rowstreamer.go | 79 +++++++++++++++---- 7 files changed, 85 insertions(+), 59 deletions(-) diff --git a/go/mysql/schema.go b/go/mysql/schema.go index 59f98f0dbf1..def90b17d5e 100644 --- a/go/mysql/schema.go +++ b/go/mysql/schema.go @@ -83,6 +83,18 @@ from information_schema.columns as ISC ISC.ordinal_position = c.ordinal_position where c.table_schema = database() AND ISC.table_schema is null` + // FetchTableColumn queries information about columns in a given table + FetchTableColumns = `select + column_name as column_name, data_type as data_type, character_set_name as character_set_name, + (data_type rlike '.*int') as is_int, + (data_type rlike '.*text' or data_type rlike '.*char') as is_string, + (character_set_name rlike 'utf8.*') as is_utf8, + (data_type='enum') as is_enum +from information_schema.columns +where table_schema = database() and + table_name=%a +order by ordinal_position` + // DetectSchemaChange query detects if there is any schema change from previous copy. DetectSchemaChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index e10443ae157..e5092711250 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -472,6 +472,7 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error { } } // We will always read strings as utf8mb4. + // sb.WriteString(escapeName(name)) sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name))) default: sb.WriteString(escapeName(name)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 029c92fffe4..a719b75eb40 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -297,6 +297,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable, // - ...any other future possible values func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) { if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() { + fmt.Printf("============== conversion for %v: from %v to %v\n", field.Name, conversion.FromCharset, conversion.ToCharset) // Non-null string value, for which we have a charset conversion instruction valString := val.ToString() fromEncoding, encodingOK := mysql.CharacterSetEncoding[conversion.FromCharset] diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 290041a28ad..b721b94f748 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -383,17 +383,6 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr colName: as, references: make(map[string]bool), } - if expr, ok := aliased.Expr.(*sqlparser.ConvertUsingExpr); ok { - selExpr := &sqlparser.ConvertUsingExpr{ - Type: "utf8mb4", - Expr: &sqlparser.ColName{Name: as}, - } - cexpr.expr = expr - cexpr.operation = opExpr - tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: selExpr, As: as}) - cexpr.references[as.Lowered()] = true - return cexpr, nil - } if expr, ok := aliased.Expr.(*sqlparser.FuncExpr); ok { if expr.Distinct { return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index b7282579490..92abb13ed69 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -250,11 +250,20 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma fieldEvent := &binlogdatapb.FieldEvent{ TableName: initialPlan.SendRule.Match, } + + for _, f := range rows.Fields { + fmt.Printf("========= vcopier field: name=%v\n", f.Name) + fmt.Printf("========= vcopier field: %v\n", f) + } fieldEvent.Fields = append(fieldEvent.Fields, rows.Fields...) vc.tablePlan, err = plan.buildExecutionPlan(fieldEvent) if err != nil { return err } + for _, f := range rows.Pkfields { + fmt.Printf("========= vcopier pkfields: name=%v\n", f.Name) + fmt.Printf("========= vcopier pkfields: %v\n", f) + } pkfields = append(pkfields, rows.Pkfields...) buf := sqlparser.NewTrackedBuffer(nil) buf.Myprintf("update _vt.copy_state set lastpk=%a where vrepl_id=%s and table_name=%s", ":lastpk", strconv.Itoa(int(vc.vr.id)), encodeString(tableName)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 8eabce49642..33c5319cd49 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -48,8 +48,6 @@ type Plan struct { // in the stream. ColExprs []ColExpr - convertUsingUTF8Columns map[string]bool - // Filters is the list of filters to be applied to the columns // of the table. Filters []Filter @@ -442,24 +440,6 @@ func analyzeSelect(query string) (sel *sqlparser.Select, fromTable sqlparser.Tab return sel, fromTable, nil } -// isConvertColumnUsingUTF8 returns 'true' when given column needs to be converted as UTF8 -// while read from source table -func (plan *Plan) isConvertColumnUsingUTF8(columnName string) bool { - if plan.convertUsingUTF8Columns == nil { - return false - } - return plan.convertUsingUTF8Columns[columnName] -} - -// setConvertColumnUsingUTF8 marks given column as needs to be converted as UTF8 -// while read from source table -func (plan *Plan) setConvertColumnUsingUTF8(columnName string) { - if plan.convertUsingUTF8Columns == nil { - plan.convertUsingUTF8Columns = map[string]bool{} - } - plan.convertUsingUTF8Columns[columnName] = true -} - func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error { if where == nil { return nil @@ -620,17 +600,6 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp ColNum: -1, FixedValue: sqltypes.NewInt64(num), }, nil - case *sqlparser.ConvertUsingExpr: - colnum, err := findColumn(plan.Table, aliased.As) - if err != nil { - return ColExpr{}, err - } - field := plan.Table.Fields[colnum] - plan.setConvertColumnUsingUTF8(field.Name) - return ColExpr{ - ColNum: colnum, - Field: field, - }, nil default: log.Infof("Unsupported expression: %v", inner) return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(aliased.Expr)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index d1e5aaec9f6..1fc264df19c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -19,8 +19,10 @@ package vstreamer import ( "context" "fmt" + "math" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" @@ -62,26 +64,28 @@ type rowStreamer struct { send func(*binlogdatapb.VStreamRowsResponse) error vschema *localVSchema - plan *Plan - pkColumns []int - sendQuery string - vse *Engine - pktsize PacketSizer + plan *Plan + pkColumns []int + toUTF8Columns map[string]bool + sendQuery string + vse *Engine + pktsize PacketSizer } func newRowStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, query string, lastpk []sqltypes.Value, vschema *localVSchema, send func(*binlogdatapb.VStreamRowsResponse) error, vse *Engine) *rowStreamer { ctx, cancel := context.WithCancel(ctx) return &rowStreamer{ - ctx: ctx, - cancel: cancel, - cp: cp, - se: se, - query: query, - lastpk: lastpk, - send: send, - vschema: vschema, - vse: vse, - pktsize: DefaultPacketSizer(), + ctx: ctx, + cancel: cancel, + cp: cp, + se: se, + query: query, + lastpk: lastpk, + send: send, + vschema: vschema, + vse: vse, + pktsize: DefaultPacketSizer(), + toUTF8Columns: make(map[string]bool), } } @@ -134,6 +138,10 @@ func (rs *rowStreamer) buildPlan() error { log.Errorf("%s", err.Error()) return err } + err = rs.buildTextualColumns() + if err != nil { + return err + } rs.pkColumns, err = buildPKColumns(st) if err != nil { return err @@ -145,6 +153,38 @@ func (rs *rowStreamer) buildPlan() error { return err } +func (rs *rowStreamer) buildTextualColumns() error { + fmt.Printf("========== buildTextualColumns\n") + conn, err := snapshotConnect(rs.ctx, rs.cp) + if err != nil { + return err + } + defer conn.Close() + + query, err := sqlparser.ParseAndBind(mysql.FetchTableColumns, + sqltypes.StringBindVariable(rs.plan.Table.Name), + ) + if err != nil { + return err + } + fmt.Printf("========== query=%s\n", query) + r, err := conn.ExecuteFetch(query, math.MaxInt64, true) + if err != nil { + return err + } + + for _, row := range r.Named().Rows { + colName := row["column_name"].ToString() + isString := row.AsBool("is_string", false) + isUtf8 := row.AsBool("is_utf8", false) + if isString && !isUtf8 { + log.Infof("========== rs.toUTF8Columns[colName]:%s\n", colName) + rs.toUTF8Columns[colName] = true + } + } + return nil +} + func buildPKColumns(st *binlogdatapb.MinimalTable) ([]int, error) { var pkColumns = make([]int, 0) if len(st.PKColumns) == 0 { @@ -169,8 +209,8 @@ func (rs *rowStreamer) buildSelect() (string, error) { buf.Myprintf("select ") prefix := "" for _, col := range rs.plan.Table.Fields { - if rs.plan.isConvertColumnUsingUTF8(col.Name) { - buf.Myprintf("%sconvert(%v using utf8mb4)", prefix, sqlparser.NewColIdent(col.Name)) + if rs.toUTF8Columns[col.Name] { + buf.Myprintf("%sconvert(%v using utf8mb4) as %v", prefix, sqlparser.NewColIdent(col.Name), sqlparser.NewColIdent(col.Name)) } else { buf.Myprintf("%s%v", prefix, sqlparser.NewColIdent(col.Name)) } @@ -212,6 +252,7 @@ func (rs *rowStreamer) buildSelect() (string, error) { func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.VStreamRowsResponse) error) error { log.Infof("Streaming query: %v\n", rs.sendQuery) + fmt.Printf("============== Streaming query: %v\n", rs.sendQuery) gtid, err := conn.streamWithSnapshot(rs.ctx, rs.plan.Table.Name, rs.sendQuery) if err != nil { return err @@ -230,6 +271,10 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V } } + for _, f := range rs.plan.fields() { + fmt.Printf("========= streamrows field: name=%v\n", f.Name) + fmt.Printf("========= streamrows field: %v\n", f) + } err = send(&binlogdatapb.VStreamRowsResponse{ Fields: rs.plan.fields(), Pkfields: pkfields,