Skip to content

Commit

Permalink
GetSchema: limit concurrent operations (#13617)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jul 27, 2023
1 parent ca6bc6b commit dc60c50
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,29 @@ import (
"regexp"
"sort"
"strings"
"sync"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

const (
// In a local environment and without latency, we have seen that an unbounded concurrency still translates to less than
// 20 concurrent MySQL connections. Which is why placing a limit of 20 concurrent goroutines (each mapped to a MySQL connection)
// is unlikely to affect optimal environments.
// In high latency environments, unbounded concurrency can translate to a very high number of concurrent MySQL connections. This
// is an undesirable behavior. We prefer to push back on GetSchema and make it run over longer time, instead.
getSchemaConcurrency = 20
)

var autoIncr = regexp.MustCompile(` AUTO_INCREMENT=\d+`)
Expand Down Expand Up @@ -102,18 +110,18 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(getSchemaConcurrency)

// Get per-table schema concurrently.
tableNames := make([]string, 0, len(tds))
for _, td := range tds {
tableNames = append(tableNames, td.Name)
td := td

wg.Add(1)
go func(td *tabletmanagerdatapb.TableDefinition) {
defer wg.Done()

eg.Go(func() error {
fields, columns, schema, err := mysqld.collectSchema(ctx, dbName, td.Name, td.Type, request.TableSchemaOnly)
if err != nil {
// There's a possible race condition: it could happen that a table was dropped in between reading
Expand All @@ -122,40 +130,39 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab
// This is fine. We identify the situation and keep the table without any fields/columns/key information
sqlErr, isSQLErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError)
if isSQLErr && sqlErr != nil && sqlErr.Number() == mysql.ERNoSuchTable {
return
return nil
}

allErrors.RecordError(err)
cancel()
return
return err
}

td.Fields = fields
td.Columns = columns
td.Schema = schema
}(td)
return nil
})
}

colMap := map[string][]string{}
// Get primary columns concurrently.
// The below runs a single query on `INFORMATION_SCHEMA` and does not interact with the actual tables.
// It is therefore safe to run even if some tables are dropped in the interim.
colMap := map[string][]string{}
if len(tableNames) > 0 {
wg.Add(1)
go func() {
defer wg.Done()

if len(tableNames) > 0 && !request.TableSchemaOnly {
eg.Go(func() error {
var err error
colMap, err = mysqld.getPrimaryKeyColumns(ctx, dbName, tableNames...)
if err != nil {
allErrors.RecordError(err)
cancel()
return
return err
}
}()
return nil
})
}

wg.Wait()
eg.Wait()
if err := allErrors.AggrError(vterrors.Aggregate); err != nil {
return nil, err
}
Expand Down

0 comments on commit dc60c50

Please sign in to comment.