From 2116b85c8b9cf1126d649c3a32143bf20ce1e646 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 10 May 2021 09:32:26 +0200 Subject: [PATCH] Use Stream instead of Exec to not have maxrows Signed-off-by: Andres Taylor --- .../vttablet/tabletserver/health_streamer.go | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index 0f054f86343..b2800fbf254 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/dbconfigs" @@ -326,26 +328,32 @@ func (hs *healthStreamer) reload() error { } } - maxrows := 10000 - qr, err := conn.Exec(ctx, mysql.DetectSchemaChange, maxrows, false) + var tables []string + var tableNames []string + + callback := func(qr *sqltypes.Result) error { + for _, row := range qr.Rows { + table := row[0].ToString() + tables = append(tables, table) + + escapedTblName := sqlparser.String(sqlparser.NewStrLiteral(table)) + tableNames = append(tableNames, escapedTblName) + } + + return nil + } + alloc := func() *sqltypes.Result { return &sqltypes.Result{} } + bufferSize := 1000 + err = conn.Stream(ctx, mysql.DetectSchemaChange, callback, alloc, bufferSize, 0) if err != nil { return err } // If no change detected, then return - if len(qr.Rows) == 0 { + if len(tables) == 0 { return nil } - var tables []string - var tableNames []string - for _, row := range qr.Rows { - table := row[0].ToString() - tables = append(tables, table) - - escapedTblName := sqlparser.String(sqlparser.NewStrLiteral(table)) - tableNames = append(tableNames, escapedTblName) - } tableNamePredicate := fmt.Sprintf("table_name IN (%s)", strings.Join(tableNames, ", ")) del := fmt.Sprintf("%s WHERE %s", mysql.ClearSchemaCopy, tableNamePredicate) upd := fmt.Sprintf("%s AND %s", mysql.InsertIntoSchemaCopy, tableNamePredicate)