diff --git a/pkg/cmd/roachtest/schemachange.go b/pkg/cmd/roachtest/schemachange.go index 305e06025f2e..8f1b6f361e14 100644 --- a/pkg/cmd/roachtest/schemachange.go +++ b/pkg/cmd/roachtest/schemachange.go @@ -423,7 +423,11 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration backoff := 30 * time.Second retryOpts = retry.Options{InitialBackoff: backoff, MaxBackoff: backoff, Multiplier: 1, MaxRetries: int(length / backoff)} - return jobutils.WaitForStatus(ctx, conn, rollbackID, jobs.StatusSucceeded, retryOpts) + if err := jobutils.WaitForStatus(ctx, conn, rollbackID, jobs.StatusSucceeded, retryOpts); err != nil { + return err + } + c.l.Printf("%s: rollback %d complete\n", prefix, rollbackID) + return nil }, Duration: length, }) diff --git a/pkg/workload/querybench/query_bench.go b/pkg/workload/querybench/query_bench.go index f783d830678a..adfb5156050e 100644 --- a/pkg/workload/querybench/query_bench.go +++ b/pkg/workload/querybench/query_bench.go @@ -35,6 +35,7 @@ type queryBench struct { connFlags *workload.ConnFlags queryFile string useOpt bool + verbose bool queries []string } @@ -56,6 +57,7 @@ var queryBenchMeta = workload.Meta{ } g.flags.StringVar(&g.queryFile, `query-file`, ``, `File of newline separated queries to run`) g.flags.BoolVar(&g.useOpt, `use-opt`, true, `Use cost-based optimizer`) + g.flags.BoolVar(&g.verbose, `verbose`, true, `Prints out the queries being run as well as histograms`) g.connFlags = workload.NewConnFlags(&g.flags) return g }, @@ -131,9 +133,10 @@ func (g *queryBench) Ops(urls []string, reg *histogram.Registry) (workload.Query ql := workload.QueryLoad{SQLDatabase: sqlDatabase} for i := 0; i < g.connFlags.Concurrency; i++ { op := queryBenchWorker{ - hists: reg.GetHandle(), - db: db, - stmts: stmts, + hists: reg.GetHandle(), + db: db, + stmts: stmts, + verbose: g.verbose, } ql.WorkerFns = append(ql.WorkerFns, op.run) } @@ -150,6 +153,8 @@ func getQueries(path string) ([]string, error) { defer file.Close() scanner := bufio.NewScanner(file) + // Read lines up to 1 MB in size. + scanner.Buffer(make([]byte, 64*1024), 1024*1024) var lines []string for scanner.Scan() { line := scanner.Text() @@ -174,6 +179,7 @@ type queryBenchWorker struct { stmts []namedStmt stmtIdx int + verbose bool } func (o *queryBenchWorker) run(ctx context.Context) error { @@ -193,6 +199,10 @@ func (o *queryBenchWorker) run(ctx context.Context) error { return err } elapsed := timeutil.Since(start) - o.hists.Get(stmt.name).Record(elapsed) + if o.verbose { + o.hists.Get(stmt.name).Record(elapsed) + } else { + o.hists.Get("").Record(elapsed) + } return nil } diff --git a/pkg/workload/querylog/querylog.go b/pkg/workload/querylog/querylog.go index a9638c339f41..bffc97856054 100644 --- a/pkg/workload/querylog/querylog.go +++ b/pkg/workload/querylog/querylog.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" workloadrand "github.com/cockroachdb/cockroach/pkg/workload/rand" @@ -58,7 +59,13 @@ type querylog struct { omitDeleteQueries bool omitWriteQueries bool probOfUsingSamples float64 - seed int64 + // querybenchPath (if set) tells the querylog where to write the generated + // queries to be later executed by querybench in round-robin fashion, i.e. + // querylog itself won't execute the queries against the database and only + // will prepare the file. + querybenchPath string + count uint + seed int64 // TODO(yuzefovich): this is a great variable to move to the main generator. stmtTimeoutSeconds int64 verbose bool @@ -69,6 +76,7 @@ type querylogState struct { totalQueryCount int queryCountPerTable []int seenQueriesByTableName []map[string]int + tableUsed map[string]bool columnsByTableName map[string][]columnInfo } @@ -97,6 +105,7 @@ var querylogMeta = workload.Meta{ `verbose`: {RuntimeOnly: true}, `zip`: {RuntimeOnly: true}, } + g.flags.UintVar(&g.count, `count`, 100, `Number of queries to be written for querybench (used only if --querybench-path is specified).`) g.flags.StringVar(&g.dirPath, `dir`, ``, `Directory of the querylog files.`) g.flags.IntVar(&g.filesToParse, `files-to-parse`, 5, `Maximum number of files in the query log to process.`) g.flags.Float64Var(&g.minSamplingProb, `min-sampling-prob`, 0.01, `Minimum sampling probability defines the minimum chance `+ @@ -109,6 +118,10 @@ var querylogMeta = workload.Meta{ g.flags.BoolVar(&g.omitWriteQueries, `omit-write-queries`, true, `Indicates whether write queries (INSERTs and UPSERTs) should be omitted.`) g.flags.Float64Var(&g.probOfUsingSamples, `prob-of-using-samples`, 0.9, `Probability of using samples to generate values for `+ `the placeholders. Say it is 0.9, then with 0.1 probability the values will be generated randomly.`) + // TODO(yuzefovich): improve termination of querylog when used for + // querybench file generation. + g.flags.StringVar(&g.querybenchPath, `querybench-path`, ``, `Path to write the generated queries to for querybench tool. `+ + `NOTE: at the moment --max-ops=1 is the best way to terminate the generator to produce the desired count of queries.`) g.flags.Int64Var(&g.seed, `seed`, 1, `Random number generator seed.`) g.flags.Int64Var(&g.stmtTimeoutSeconds, `statement-timeout`, 0, `Sets session's statement_timeout setting (in seconds).'`) g.flags.BoolVar(&g.verbose, `verbose`, false, `Indicates whether error messages should be printed out.`) @@ -204,19 +217,21 @@ func (w *querylog) Ops(urls []string, reg *histogram.Registry) (workload.QueryLo return workload.QueryLoad{}, err } ql := workload.QueryLoad{SQLDatabase: sqlDatabase} - for i := 0; i < w.connFlags.Concurrency; i++ { + if w.querybenchPath != `` { conn, err := pgx.Connect(connCfg) if err != nil { return workload.QueryLoad{}, err } - worker := &worker{ - config: w, - hists: reg.GetHandle(), - conn: conn, - id: i, - rng: rand.New(rand.NewSource(w.seed + int64(i))), - reWriteQuery: regexp.MustCompile(regexWriteQueryPattern), + worker := newQuerybenchWorker(w, reg, conn, 0 /* id */) + ql.WorkerFns = append(ql.WorkerFns, worker.querybenchRun) + return ql, nil + } + for i := 0; i < w.connFlags.Concurrency; i++ { + conn, err := pgx.Connect(connCfg) + if err != nil { + return workload.QueryLoad{}, err } + worker := newWorker(w, reg, conn, i) ql.WorkerFns = append(ql.WorkerFns, worker.run) } return ql, nil @@ -232,6 +247,25 @@ type worker struct { rng *rand.Rand reWriteQuery *regexp.Regexp + + querybenchPath string +} + +func newWorker(q *querylog, reg *histogram.Registry, conn *pgx.Conn, id int) *worker { + return &worker{ + config: q, + hists: reg.GetHandle(), + conn: conn, + id: id, + rng: rand.New(rand.NewSource(q.seed + int64(id))), + reWriteQuery: regexp.MustCompile(regexWriteQueryPattern), + } +} + +func newQuerybenchWorker(q *querylog, reg *histogram.Registry, conn *pgx.Conn, id int) *worker { + w := newWorker(q, reg, conn, id) + w.querybenchPath = q.querybenchPath + return w } // run is the main function of the worker in which it chooses a query, attempts @@ -391,39 +425,63 @@ func (w *worker) generatePlaceholders( for j := 0; j < numRepeats; j++ { for i, column := range pholdersColumnNames { columnMatched := false - for _, c := range w.config.state.columnsByTableName[tableName] { - if c.name == column { - if w.rng.Float64() < w.config.probOfUsingSamples && c.samples != nil && !isWriteQuery { - // For non-write queries when samples are present, we're using - // the samples with w.config.probOfUsingSamples probability. - sampleIdx := w.rng.Intn(len(c.samples)) - placeholders = append(placeholders, c.samples[sampleIdx]) - } else { - // In all other cases, we generate random values for the - // placeholders. - nullPct := 0 - if c.isNullable && w.config.nullPct > 0 { - nullPct = 100 / w.config.nullPct - } - d := sqlbase.RandDatumWithNullChance(w.rng, c.dataType, nullPct) - if i, ok := d.(*tree.DInt); ok && c.intRange > 0 { - j := int64(*i) % int64(c.intRange/2) - d = tree.NewDInt(tree.DInt(j)) - } - p, err := workloadrand.DatumToGoSQL(d) - if err != nil { - return nil, err - } - placeholders = append(placeholders, p) + actualTableName := true + if strings.Contains(column, ".") { + actualTableName = false + column = strings.Split(column, ".")[1] + } + possibleTableNames := make([]string, 0, 1) + possibleTableNames = append(possibleTableNames, tableName) + if !actualTableName { + // column comes from a table that was aliased. In order to not parse + // the query to figure out actual table name, we simply compare to + // columns of all used tables giving priority to the table that the + // query is assigned to. + for _, n := range w.config.state.tableNames { + if n == tableName || !w.config.state.tableUsed[n] { + continue } - columnMatched = true + possibleTableNames = append(possibleTableNames, n) + } + } + for _, tableName := range possibleTableNames { + if columnMatched { break } + for _, c := range w.config.state.columnsByTableName[tableName] { + if c.name == column { + if w.rng.Float64() < w.config.probOfUsingSamples && c.samples != nil && !isWriteQuery { + // For non-write queries when samples are present, we're using + // the samples with w.config.probOfUsingSamples probability. + sampleIdx := w.rng.Intn(len(c.samples)) + placeholders = append(placeholders, c.samples[sampleIdx]) + } else { + // In all other cases, we generate random values for the + // placeholders. + nullPct := 0 + if c.isNullable && w.config.nullPct > 0 { + nullPct = 100 / w.config.nullPct + } + d := sqlbase.RandDatumWithNullChance(w.rng, c.dataType, nullPct) + if i, ok := d.(*tree.DInt); ok && c.intRange > 0 { + j := int64(*i) % int64(c.intRange/2) + d = tree.NewDInt(tree.DInt(j)) + } + p, err := workloadrand.DatumToGoSQL(d) + if err != nil { + return nil, err + } + placeholders = append(placeholders, p) + } + columnMatched = true + break + } + } } if !columnMatched { d := w.rng.Int31n(10) + 1 if w.config.verbose { - log.Infof(ctx, "Couldn't deduce the corresponding to $%d column, so generated %d (a small int)", i+1, d) + log.Infof(ctx, "Couldn't deduce the corresponding to $%d, so generated %d (a small int)", i+1, d) printQueryShortened(ctx, query) } p, err := workloadrand.DatumToGoSQL(tree.NewDInt(tree.DInt(d))) @@ -582,19 +640,23 @@ func (w *querylog) parseFile(ctx context.Context, fileInfo os.FileInfo, re *rege (w.omitWriteQueries && isWriteQuery) || (w.omitDeleteQueries && strings.HasPrefix(query, "DELETE")) if !skipQuery { - tableFound := false + tableAssigned := false for i, tableName := range w.state.tableNames { // TODO(yuzefovich): this simplistic matching doesn't work in all // cases. if strings.Contains(query, tableName) { - w.state.seenQueriesByTableName[i][query]++ - w.state.queryCountPerTable[i]++ - w.state.totalQueryCount++ - tableFound = true - break + if !tableAssigned { + // The query is assigned to one table, namely the one that + // showed up earlier in `SHOW TABLES`. + w.state.seenQueriesByTableName[i][query]++ + w.state.queryCountPerTable[i]++ + w.state.totalQueryCount++ + tableAssigned = true + } + w.state.tableUsed[tableName] = true } } - if !tableFound { + if !tableAssigned { return errors.Errorf("No table matched query %s while processing %s", query, fileInfo.Name()) } } @@ -627,6 +689,7 @@ func (w *querylog) processQueryLog(ctx context.Context) error { for i := range w.state.tableNames { w.state.seenQueriesByTableName[i] = make(map[string]int) } + w.state.tableUsed = make(map[string]bool) re := regexp.MustCompile(regexQueryLogFormat) log.Infof(ctx, "Starting to parse the query log") @@ -663,9 +726,9 @@ func (w *querylog) processQueryLog(ctx context.Context) error { // that at least one query was issued against. func (w *querylog) getColumnsInfo(db *gosql.DB) error { w.state.columnsByTableName = make(map[string][]columnInfo) - for i, tableName := range w.state.tableNames { - if w.state.queryCountPerTable[i] == 0 { - // There were no queries operating on ith table, so no query will be + for _, tableName := range w.state.tableNames { + if !w.state.tableUsed[tableName] { + // There were no queries operating on this table, so no query will be // generated against this table as well, and we don't need the // information about the columns. continue @@ -817,6 +880,124 @@ func (w *querylog) populateSamples(ctx context.Context, db *gosql.DB) error { return nil } +// querybenchRun is the main function of a querybench worker. It is run only +// when querylog is used in querybench mode (i.e. querybench-path argument was +// specified). It chooses a query, attempts to generate values for the +// placeholders, and - instead of executing the query - writes it into the +// requested file in format that querybench understands. +func (w *worker) querybenchRun(ctx context.Context) error { + file, err := os.Create(w.querybenchPath) + if err != nil { + return err + } + defer file.Close() + + // We will skip all queries for which the placeholder values contain `$1` + // and alike to make it easier to replace actual placeholders. + reToAvoid := regexp.MustCompile(`\$[0-9]+`) + writer := bufio.NewWriter(file) + queryCount := uint(0) + for { + chosenQuery, tableName := w.chooseQuery() + pholdersColumnNames, numRepeats, err := w.deduceColumnNamesForPlaceholders(ctx, chosenQuery) + if err != nil { + if w.config.verbose { + log.Infof(ctx, "Encountered an error %s while deducing column names corresponding to the placeholders", err.Error()) + printQueryShortened(ctx, chosenQuery) + } + continue + } + + placeholders, err := w.generatePlaceholders(ctx, chosenQuery, pholdersColumnNames, numRepeats, tableName) + if err != nil { + if w.config.verbose { + log.Infof(ctx, "Encountered an error %s while generating values for the placeholders", err.Error()) + printQueryShortened(ctx, chosenQuery) + } + continue + } + + query := chosenQuery + skipQuery := false + // We're iterating over placeholders in reverse order so that `$1` does not + // replace the first two characters of `$10`. + for i := len(placeholders) - 1; i >= 0; i-- { + pholderString := fmt.Sprintf("$%d", i+1) + if !strings.Contains(chosenQuery, pholderString) { + skipQuery = true + break + } + pholderValue := printPlaceholder(placeholders[i]) + if reToAvoid.MatchString(pholderValue) { + skipQuery = true + break + } + query = strings.Replace(query, pholderString, pholderValue, -1) + } + if skipQuery { + if w.config.verbose { + log.Infof(ctx, "Could not replace placeholders with values on query") + printQueryShortened(ctx, chosenQuery) + } + continue + } + if _, err = writer.WriteString(query + "\n\n"); err != nil { + return err + } + + queryCount++ + if queryCount%250 == 0 { + log.Infof(ctx, "%d queries have been written", queryCount) + } + if queryCount == w.config.count { + writer.Flush() + return nil + } + } +} + +func printPlaceholder(i interface{}) string { + if ptr, ok := i.(*interface{}); ok { + return printPlaceholder(*ptr) + } + switch p := i.(type) { + case bool: + return fmt.Sprintf("%v", p) + case int64: + return fmt.Sprintf("%d", p) + case float64: + return fmt.Sprintf("%f", p) + case []uint8: + u, err := uuid.FromString(string(p)) + if err != nil { + panic(err) + } + return fmt.Sprintf("'%s'", u.String()) + case uuid.UUID: + return fmt.Sprintf("'%s'", p.String()) + case string: + s := strings.Replace(p, "'", "''", -1) + // querybench assumes that each query is on a single line, so we remove + // line breaks. + s = strings.Replace(s, "\n", "", -1) + s = strings.Replace(s, "\r", "", -1) + return fmt.Sprintf("'%s'", s) + case time.Time: + timestamp := p.String() + // timestamp can be of the format '1970-01-09 00:14:01.000812453 +0000 UTC' + // or '2019-02-26 20:52:01.65434 +0000 +0000', and the parser complains + // that it could not parse it, so we remove all stuff that comes after + // first +0000 to make the parser happy. + idx := strings.Index(timestamp, `+0000`) + timestamp = timestamp[:idx+5] + return fmt.Sprintf("'%s':::TIMESTAMP", timestamp) + case nil: + return fmt.Sprintf("NULL") + default: + panic(fmt.Sprintf("unsupported type: %T", i)) + } +} + // TODO(yuzefovich): columnInfo is copied from workload/rand package and // extended. Should we export workload/rand.col? type columnInfo struct { @@ -832,8 +1013,8 @@ type columnInfo struct { } func printQueryShortened(ctx context.Context, query string) { - if len(query) > 200 { - log.Infof(ctx, "%s...%s", query[:100], query[len(query)-100:]) + if len(query) > 1000 { + log.Infof(ctx, "%s...%s", query[:500], query[len(query)-500:]) } else { log.Infof(ctx, "%s", query) }