Skip to content

Commit

Permalink
workload: fix --splits regression introduced in cockroachdb#35349
Browse files Browse the repository at this point in the history
workload's Table schemas are SQL schemas, but cockroachdb#35349 switched the
initial data to be returned as a coldata.Batch, which has a more limited
set of types. (Or, in the case of simple workloads that return a
[]interface{}, it's roundtripped through coldata.Batch by the `Tuples`
helper.) Notably, this means a SQL STRING column is represented the same
as a BYTES column (ditto UUID, etc).

This caused a regression in splits, which received some []byte data for
a column tried to hand it to SPLIT as a SQL BYTES datum. This didn't
work for the UUID column in tpcc's history table nor the VARCHAR in
ycsb's usertable. Happily, a STRING works for both of these. It also
seems to work for BYTES columns, so it seems like the ambiguity is fine
in this case. When/if someone wants to add a workload that splits a
BYTES primary key column containing non-utf8 data, we'll may need to
revisit.

A more principled fix would be to get the fidelity back by parsing the
SQL schema, which in fact we do in `importccl.makeDatumFromColOffset`.
However, at the moment, this hack works and avoids the complexity and
the undesirable pkg/sql/parser dep.

Closes cockroachdb#37383
Closes cockroachdb#37382
Closes cockroachdb#37381
Closes cockroachdb#37380
Closes cockroachdb#37379
Closes cockroachdb#37378
Closes cockroachdb#37377
Closes cockroachdb#37393

Release note: None
  • Loading branch information
danhhz committed May 8, 2019
1 parent d0ff97b commit b0607c0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/workload/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func colDatumToCSVString(col coldata.Vec, rowIdx int) string {
case types.Float64:
return strconv.FormatFloat(col.Float64()[rowIdx], 'f', -1, 64)
case types.Bytes:
// See the HACK comment in ColBatchToRows.
bytes := col.Bytes()[rowIdx]
return *(*string)(unsafe.Pointer(&bytes))
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ func ColBatchToRows(cb coldata.Batch) [][]interface{} {
}
}
case types.Bytes:
// HACK: workload's Table schemas are SQL schemas, but the initial data is
// returned as a coldata.Batch, which has a more limited set of types.
// (Or, in the case of simple workloads that return a []interface{}, it's
// roundtripped through coldata.Batch by the `Tuples` helper.)
//
// Notably, this means a SQL STRING column is represented the same as a
// BYTES column (ditto UUID, etc). We could get the fidelity back by
// parsing the SQL schema, which in fact we do in
// `importccl.makeDatumFromColOffset`. At the moment, the set of types
// used in workloads is limited enough that the users of initial
// data/splits are okay with the fidelity loss. So, to avoid the
// complexity and the undesirable pkg/sql/parser dep, we simply treat them
// all as bytes and let the caller deal with the ambiguity.
for rowIdx, datum := range col.Bytes() {
if !nulls.NullAt64(uint64(rowIdx)) {
datums[rowIdx*numCols+colIdx] = datum
Expand Down Expand Up @@ -581,6 +594,9 @@ func Split(ctx context.Context, db *gosql.DB, table Table, concurrency int) erro

buf.Reset()
fmt.Fprintf(&buf, `ALTER TABLE %s SPLIT AT VALUES (%s)`, table.Name, split)
// If you're investigating an error coming out of this Exec, see the
// HACK comment in ColBatchToRows for some context that may (or may
// not) help you.
if _, err := db.Exec(buf.String()); err != nil {
return errors.Wrap(err, buf.String())
}
Expand Down Expand Up @@ -656,7 +672,8 @@ func StringTuple(datums []interface{}) []string {
case float64:
s[i] = fmt.Sprintf(`%f`, x)
case []byte:
s[i] = fmt.Sprintf(`X'%x'`, x)
// See the HACK comment in ColBatchToRows.
s[i] = lex.EscapeSQLString(string(x))
default:
panic(fmt.Sprintf("unsupported type %T: %v", x, x))
}
Expand Down Expand Up @@ -690,6 +707,13 @@ func (s sliceSliceInterface) Less(i, j int) bool {
return false
}
continue
case float64:
if y := s[j][offset].(float64); x < y {
return true
} else if x > y {
return false
}
continue
case uint64:
if y := s[j][offset].(uint64); x < y {
return true
Expand All @@ -699,6 +723,8 @@ func (s sliceSliceInterface) Less(i, j int) bool {
continue
case string:
cmp = strings.Compare(x, s[j][offset].(string))
case []byte:
cmp = bytes.Compare(x, s[j][offset].([]byte))
default:
panic(fmt.Sprintf("unsupported type %T: %v", x, x))
}
Expand Down
79 changes: 61 additions & 18 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package workload_test
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
)
Expand Down Expand Up @@ -86,33 +88,74 @@ func TestSetup(t *testing.T) {
func TestSplits(t *testing.T) {
defer leaktest.AfterTest(t)()

const rows, payloadBytes, concurrency = 10, 0, 10
tests := []int{1, 2, 3, 4, 10}

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{UseDatabase: `test`})
defer s.Stopper().Stop(ctx)
sqlutils.MakeSQLRunner(db).Exec(t, `CREATE DATABASE test`)

for _, ranges := range tests {
for _, ranges := range []int{1, 2, 3, 4, 10} {

tables := []workload.Table{
{
Name: `ints`,
Schema: `(a INT PRIMARY KEY)`,
Splits: workload.Tuples(ranges-1, func(i int) []interface{} {
return []interface{}{i}
}),
},
{
Name: `floats`,
Schema: `(a FLOAT PRIMARY KEY)`,
Splits: workload.Tuples(ranges-1, func(i int) []interface{} {
return []interface{}{float64(i)}
}),
},
{
Name: `strings`,
Schema: `(a STRING PRIMARY KEY)`,
Splits: workload.Tuples(ranges-1, func(i int) []interface{} {
return []interface{}{strconv.Itoa(i)}
}),
},
{
Name: `bytes`,
Schema: `(a BYTES PRIMARY KEY)`,
Splits: workload.Tuples(ranges-1, func(i int) []interface{} {
return []interface{}{strconv.Itoa(i)}
}),
},
{
Name: `uuids`,
Schema: `(a UUID PRIMARY KEY)`,
Splits: workload.Tuples(ranges-1, func(i int) []interface{} {
u, err := uuid.NewV4()
if err != nil {
panic(err)
}
return []interface{}{u.String()}
}),
},
}

t.Run(fmt.Sprintf("ranges=%d", ranges), func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `DROP TABLE IF EXISTS bank`)

gen := bank.FromConfig(rows, payloadBytes, ranges)
table := gen.Tables()[0]
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, table.Name, table.Schema))
for _, table := range tables {
sqlDB.Exec(t, fmt.Sprintf(`DROP TABLE IF EXISTS %s`, table.Name))
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, table.Name, table.Schema))

if err := workload.Split(ctx, db, table, concurrency); err != nil {
t.Fatalf("%+v", err)
}
const concurrency = 10
if err := workload.Split(ctx, db, table, concurrency); err != nil {
t.Fatalf("%+v", err)
}

var actual int
sqlDB.QueryRow(
t, `SELECT count(*) FROM [SHOW EXPERIMENTAL_RANGES FROM TABLE test.bank]`,
).Scan(&actual)
if ranges != actual {
t.Errorf(`expected %d got %d`, ranges, actual)
countRangesQ := fmt.Sprintf(
`SELECT count(*) FROM [SHOW EXPERIMENTAL_RANGES FROM TABLE test.%s]`, table.Name,
)
var actual int
sqlDB.QueryRow(t, countRangesQ).Scan(&actual)
if ranges != actual {
t.Errorf(`expected %d got %d`, ranges, actual)
}
}
})
}
Expand Down

0 comments on commit b0607c0

Please sign in to comment.