Skip to content

Commit

Permalink
workload/tpcc: support multiple partitioning strategies
Browse files Browse the repository at this point in the history
This commit adds support for two different partitioning strategies:
replication partitioning and lease partitioning. The former is the
only strategy we supported before and constrains replication for a
given partition to within a single zone. The latter is new and
collocates read leases for a given partition to within a single
zone.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 16, 2019
1 parent 76c85ea commit 3ebed10
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 56 deletions.
207 changes: 158 additions & 49 deletions pkg/workload/tpcc/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,102 @@ import (
"golang.org/x/exp/rand"
)

type partitionStrategy int

const (
// The partitionedReplication strategy constrains replication for a given
// partition to within a single zone. It does so by requiring that all
// replicas of each range in a partition are stored in the same zone.
//
// Example of 9 warehouses partitioned over 3 zones:
// partitions = [0,1,2], [3,4,5], [6,7,8]
// w = warehouse #
// L = leaseholder
//
// us-east1-b:
// n1 = [w0(L), w1, w2 ]
// n2 = [w0, w1(L), w2 ]
// n3 = [w0, w1, w2(L)]
//
// us-west1-b:
// n4 = [w3(L), w4, w5 ]
// n5 = [w3, w4, w5 ]
// n6 = [w3, w4(L), w5(L)]
//
// europe-west2-b:
// n7 = [w6, w7, w8(L)]
// n8 = [w6, w7(L), w8 ]
// n9 = [w6(L), w7, w8 ]
//
// NOTE: the lease for a range is randomly scattered within the zone
// that contains all replicas of the range.
//
partitionedReplication partitionStrategy = iota
// The partitionedLeases strategy collocates read leases for a given
// partition to within a single zone. It does so by configuring lease
// preferences on each range in a partition to prefer the same zone.
// Unlike the partitioned replication strategy, it does not prevent
// cross-zone replication.
//
// Example of 9 warehouses partitioned over 3 zones:
// partitions = [0,1,2], [3,4,5], [6,7,8]
// w = warehouse #
// L = leaseholder
//
// us-east1-b:
// n1 = [w0(L), w3, w6]
// n2 = [w1(L), w4, w7]
// n3 = [w2(L), w5, w8]
//
// us-west1-b:
// n4 = [w0, w1, w2 ]
// n5 = [w3(L), w4(L), w5(L)]
// n6 = [w6, w7, w8 ]
//
// europe-west2-b:
// n7 = [w2, w5, w8(L)]
// n8 = [w1, w4, w7(L)]
// n9 = [w0, w3, w6(L)]
//
// NOTE: a copy of each range is randomly scattered within each zone.
//
partitionedLeases
)

// Part of pflag's Value interface.
func (ps partitionStrategy) String() string {
switch ps {
case partitionedReplication:
return "replication"
case partitionedLeases:
return "leases"
}
panic("unexpected")
}

// Part of pflag's Value interface.
func (ps *partitionStrategy) Set(value string) error {
switch value {
case "replication":
*ps = partitionedReplication
return nil
case "leases":
*ps = partitionedLeases
return nil
}
return errors.Errorf("unknown partition strategy %q", value)
}

// Part of pflag's Value interface.
func (ps partitionStrategy) Type() string {
return "partitionStrategy"
}

type zoneConfig struct {
zones []string
strat partitionStrategy
}

// partitioner encapsulates all logic related to partitioning discrete numbers
// of warehouses into disjoint sets of roughly equal sizes. Partitions are then
// evenly assigned "active" warehouses, which allows for an even split of live
Expand Down Expand Up @@ -152,19 +248,32 @@ func (p *partitioner) randActive(rng *rand.Rand) int {
return p.totalElems[rng.Intn(len(p.totalElems))]
}

// configureZone sets up zone configs for previously created partitions. By default it adds constraints
// in terms of racks, but if the zones flag is passed into tpcc, it will set the constraints based on the
// geographic zones provided.
func configureZone(db *gosql.DB, table, partition string, constraint int, zones []string) error {
var constraints string
if len(zones) > 0 {
constraints = fmt.Sprintf("[+zone=%s]", zones[constraint])
// configureZone sets up zone configs for previously created partitions. By
// default it adds constraints/preferences in terms of racks, but if the zones
// flag is passed into tpcc, it will set the constraints/preferences based on
// the geographic zones provided.
func configureZone(db *gosql.DB, cfg zoneConfig, table, partition string, partIdx int) error {
var kv string
if len(cfg.zones) > 0 {
kv = fmt.Sprintf("zone=%s", cfg.zones[partIdx])
} else {
constraints = fmt.Sprintf("[+rack=%d]", constraint)
kv = fmt.Sprintf("rack=%d", partIdx)
}

var opts string
switch cfg.strat {
case partitionedReplication:
// Place all replicas in the zone.
opts = fmt.Sprintf(`constraints = '[+%s]'`, kv)
case partitionedLeases:
// Place one replica in the zone and give that replica lease preference.
opts = fmt.Sprintf(`constraints = '{"+%s":1}', lease_preferences = '[[+%s]]'`, kv, kv)
default:
panic("unexpected")
}

sql := fmt.Sprintf(`ALTER PARTITION %s OF TABLE %s CONFIGURE ZONE USING constraints = '%s'`,
partition, table, constraints)
sql := fmt.Sprintf(`ALTER PARTITION %s OF TABLE %s CONFIGURE ZONE USING %s`,
partition, table, opts)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %q", sql)
}
Expand All @@ -175,7 +284,7 @@ func configureZone(db *gosql.DB, table, partition string, constraint int, zones
// provided name, given the partitioning. Callers of the function must specify
// the associated table and the partition's number.
func partitionObject(
db *gosql.DB, p *partitioner, zones []string, obj, name, col, table string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, obj, name, col, table string, idx int,
) error {
var buf bytes.Buffer
fmt.Fprintf(&buf, "ALTER %s %s PARTITION BY RANGE (%s) (\n", obj, name, col)
Expand All @@ -193,21 +302,21 @@ func partitionObject(
}

for i := 0; i < p.parts; i++ {
if err := configureZone(db, table, fmt.Sprintf("p%d_%d", idx, i), i, zones); err != nil {
if err := configureZone(db, cfg, table, fmt.Sprintf("p%d_%d", idx, i), i); err != nil {
return err
}
}
return nil
}

func partitionTable(
db *gosql.DB, p *partitioner, zones []string, table, col string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, table, col string, idx int,
) error {
return partitionObject(db, p, zones, "TABLE", table, col, table, idx)
return partitionObject(db, cfg, p, "TABLE", table, col, table, idx)
}

func partitionIndex(
db *gosql.DB, p *partitioner, zones []string, table, index, col string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, table, index, col string, idx int,
) error {
if exists, err := indexExists(db, table, index); err != nil {
return err
Expand All @@ -218,64 +327,64 @@ func partitionIndex(
return nil
}
indexStr := fmt.Sprintf("%s@%s", table, index)
return partitionObject(db, p, zones, "INDEX", indexStr, col, table, idx)
return partitionObject(db, cfg, p, "INDEX", indexStr, col, table, idx)
}

func partitionWarehouse(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "warehouse", "w_id", 0)
func partitionWarehouse(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "warehouse", "w_id", 0)
}

func partitionDistrict(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "district", "d_w_id", 0)
func partitionDistrict(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "district", "d_w_id", 0)
}

func partitionNewOrder(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "new_order", "no_w_id", 0)
func partitionNewOrder(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "new_order", "no_w_id", 0)
}

func partitionOrder(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, `"order"`, "o_w_id", 0); err != nil {
func partitionOrder(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, `"order"`, "o_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, `"order"`, "order_idx", "o_w_id", 1)
return partitionIndex(db, cfg, wPart, `"order"`, "order_idx", "o_w_id", 1)
}

func partitionOrderLine(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "order_line", "ol_w_id", 0); err != nil {
func partitionOrderLine(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "order_line", "ol_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "order_line", "order_line_stock_fk_idx", "ol_supply_w_id", 1)
return partitionIndex(db, cfg, wPart, "order_line", "order_line_stock_fk_idx", "ol_supply_w_id", 1)
}

func partitionStock(db *gosql.DB, wPart *partitioner, zones []string) error {
func partitionStock(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
// The stock_item_fk_idx can't be partitioned because it doesn't have a
// warehouse prefix. It's an all-around unfortunate index that we only
// need because of a restriction in SQL.
return partitionTable(db, wPart, zones, "stock", "s_w_id", 0)
return partitionTable(db, cfg, wPart, "stock", "s_w_id", 0)
}

func partitionCustomer(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "customer", "c_w_id", 0); err != nil {
func partitionCustomer(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "customer", "c_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "customer", "customer_idx", "c_w_id", 1)
return partitionIndex(db, cfg, wPart, "customer", "customer_idx", "c_w_id", 1)
}

func partitionHistory(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "history", "h_w_id", 0); err != nil {
func partitionHistory(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "history", "h_w_id", 0); err != nil {
return err
}
if err := partitionIndex(db, wPart, zones, "history", "history_customer_fk_idx", "h_c_w_id", 1); err != nil {
if err := partitionIndex(db, cfg, wPart, "history", "history_customer_fk_idx", "h_c_w_id", 1); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "history", "history_district_fk_idx", "h_w_id", 2)
return partitionIndex(db, cfg, wPart, "history", "history_district_fk_idx", "h_w_id", 2)
}

// replicateItem creates a covering "replicated index" for the item table for
// each of the zones provided. The item table is immutable, so this comes at a
// negligible cost and allows all lookups into it to be local.
func replicateItem(db *gosql.DB, zones []string) error {
for i, zone := range zones {
func replicateItem(db *gosql.DB, cfg zoneConfig) error {
for i, zone := range cfg.zones {
idxName := fmt.Sprintf("replicated_idx_%d", i)

create := fmt.Sprintf(`
Expand All @@ -298,32 +407,32 @@ func replicateItem(db *gosql.DB, zones []string) error {
return nil
}

func partitionTables(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionWarehouse(db, wPart, zones); err != nil {
func partitionTables(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionWarehouse(db, cfg, wPart); err != nil {
return err
}
if err := partitionDistrict(db, wPart, zones); err != nil {
if err := partitionDistrict(db, cfg, wPart); err != nil {
return err
}
if err := partitionNewOrder(db, wPart, zones); err != nil {
if err := partitionNewOrder(db, cfg, wPart); err != nil {
return err
}
if err := partitionOrder(db, wPart, zones); err != nil {
if err := partitionOrder(db, cfg, wPart); err != nil {
return err
}
if err := partitionOrderLine(db, wPart, zones); err != nil {
if err := partitionOrderLine(db, cfg, wPart); err != nil {
return err
}
if err := partitionStock(db, wPart, zones); err != nil {
if err := partitionStock(db, cfg, wPart); err != nil {
return err
}
if err := partitionCustomer(db, wPart, zones); err != nil {
if err := partitionCustomer(db, cfg, wPart); err != nil {
return err
}
if err := partitionHistory(db, wPart, zones); err != nil {
if err := partitionHistory(db, cfg, wPart); err != nil {
return err
}
return replicateItem(db, zones)
return replicateItem(db, cfg)
}

func partitionCount(db *gosql.DB) (int, error) {
Expand Down
16 changes: 9 additions & 7 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type tpcc struct {

partitions int
affinityPartition int
zones []string
wPart *partitioner
zoneCfg zoneConfig

usePostgres bool
serializable bool
Expand Down Expand Up @@ -106,14 +106,15 @@ var tpccMeta = workload.Meta{
`mix`: {RuntimeOnly: true},
`partitions`: {RuntimeOnly: true},
`partition-affinity`: {RuntimeOnly: true},
`partition-strategy`: {RuntimeOnly: true},
`zones`: {RuntimeOnly: true},
`active-warehouses`: {RuntimeOnly: true},
`scatter`: {RuntimeOnly: true},
`serializable`: {RuntimeOnly: true},
`split`: {RuntimeOnly: true},
`wait`: {RuntimeOnly: true},
`workers`: {RuntimeOnly: true},
`conns`: {RuntimeOnly: true},
`zones`: {RuntimeOnly: true},
`active-warehouses`: {RuntimeOnly: true},
`expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true},
}

Expand All @@ -135,13 +136,14 @@ var tpccMeta = workload.Meta{
`Number of connections. Defaults to --warehouses * %d (except in nowait mode, where it defaults to --workers`,
numConnsPerWarehouse,
))
g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables (requires split)`)
g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`)
g.flags.IntVar(&g.affinityPartition, `partition-affinity`, -1, `Run load generator against specific partition (requires partitions)`)
g.flags.Var(&g.zoneCfg.strat, `partition-strategy`, `Partition tables according to which strategy [replication, leases]`)
g.flags.StringSliceVar(&g.zoneCfg.zones, "zones", []string{}, "Zones for partitioning, the number of zones should match the number of partitions and the zones used to start cockroach.")
g.flags.IntVar(&g.activeWarehouses, `active-warehouses`, 0, `Run the load generator against a specific number of warehouses. Defaults to --warehouses'`)
g.flags.BoolVar(&g.scatter, `scatter`, false, `Scatter ranges`)
g.flags.BoolVar(&g.serializable, `serializable`, false, `Force serializable mode`)
g.flags.BoolVar(&g.split, `split`, false, `Split tables`)
g.flags.StringSliceVar(&g.zones, "zones", []string{}, "Zones for partitioning, the number of zones should match the number of partitions and the zones used to start cockroach.")
g.flags.BoolVar(&g.expensiveChecks, `expensive-checks`, false, `Run expensive checks`)
g.connFlags = workload.NewConnFlags(&g.flags)

Expand Down Expand Up @@ -182,7 +184,7 @@ func (w *tpcc) Hooks() workload.Hooks {
return errors.Errorf(`--partition-affinity out of bounds of --partitions`)
}

if len(w.zones) > 0 && (len(w.zones) != w.partitions) {
if len(w.zoneCfg.zones) > 0 && (len(w.zoneCfg.zones) != w.partitions) {
return errors.Errorf(`--zones should have the sames length as --partitions.`)
}

Expand Down Expand Up @@ -587,7 +589,7 @@ func (w *tpcc) partitionAndScatterWithDB(db *gosql.DB) error {
if parts, err := partitionCount(db); err != nil {
return errors.Wrapf(err, "could not determine if tables are partitioned")
} else if parts == 0 {
if err := partitionTables(db, w.wPart, w.zones); err != nil {
if err := partitionTables(db, w.zoneCfg, w.wPart); err != nil {
return errors.Wrapf(err, "could not partition tables")
}
} else if parts != w.partitions {
Expand Down

0 comments on commit 3ebed10

Please sign in to comment.