Skip to content

Commit

Permalink
Use Stream instead of Exec to not have maxrows
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed May 10, 2021
1 parent 1939293 commit 2116b85
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/sqltypes"

"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -326,26 +328,32 @@ func (hs *healthStreamer) reload() error {
}
}

maxrows := 10000
qr, err := conn.Exec(ctx, mysql.DetectSchemaChange, maxrows, false)
var tables []string
var tableNames []string

callback := func(qr *sqltypes.Result) error {
for _, row := range qr.Rows {
table := row[0].ToString()
tables = append(tables, table)

escapedTblName := sqlparser.String(sqlparser.NewStrLiteral(table))
tableNames = append(tableNames, escapedTblName)
}

return nil
}
alloc := func() *sqltypes.Result { return &sqltypes.Result{} }
bufferSize := 1000
err = conn.Stream(ctx, mysql.DetectSchemaChange, callback, alloc, bufferSize, 0)
if err != nil {
return err
}

// If no change detected, then return
if len(qr.Rows) == 0 {
if len(tables) == 0 {
return nil
}

var tables []string
var tableNames []string
for _, row := range qr.Rows {
table := row[0].ToString()
tables = append(tables, table)

escapedTblName := sqlparser.String(sqlparser.NewStrLiteral(table))
tableNames = append(tableNames, escapedTblName)
}
tableNamePredicate := fmt.Sprintf("table_name IN (%s)", strings.Join(tableNames, ", "))
del := fmt.Sprintf("%s WHERE %s", mysql.ClearSchemaCopy, tableNamePredicate)
upd := fmt.Sprintf("%s AND %s", mysql.InsertIntoSchemaCopy, tableNamePredicate)
Expand Down

0 comments on commit 2116b85

Please sign in to comment.