Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload/tpcc: support replicated indexes and lease partitioning #36855

Merged
merged 3 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/workload/tpcc/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func scatterRanges(db *gosql.DB) error {
g.Go(func() error {
sql := fmt.Sprintf(`ALTER TABLE %s SCATTER`, table)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)
return errors.Wrapf(err, "Couldn't exec %q", sql)
}
return nil
})
Expand Down
257 changes: 185 additions & 72 deletions pkg/workload/tpcc/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,107 @@ import (
"bytes"
gosql "database/sql"
"fmt"
"strings"

"github.com/pkg/errors"
"golang.org/x/exp/rand"
)

type partitionStrategy int

const (
// The partitionedReplication strategy constrains replication for a given
// partition to within a single zone. It does so by requiring that all
// replicas of each range in a partition are stored in the same zone.
//
// Example of 9 warehouses partitioned over 3 zones:
// partitions = [0,1,2], [3,4,5], [6,7,8]
// w = warehouse #
// L = leaseholder
//
// us-east1-b:
// n1 = [w0(L), w1, w2 ]
// n2 = [w0, w1(L), w2 ]
// n3 = [w0, w1, w2(L)]
//
// us-west1-b:
// n4 = [w3(L), w4, w5 ]
// n5 = [w3, w4, w5 ]
// n6 = [w3, w4(L), w5(L)]
//
// europe-west2-b:
// n7 = [w6, w7, w8(L)]
// n8 = [w6, w7(L), w8 ]
// n9 = [w6(L), w7, w8 ]
//
// NOTE: the lease for a range is randomly scattered within the zone
// that contains all replicas of the range.
//
partitionedReplication partitionStrategy = iota
// The partitionedLeases strategy collocates read leases for a given
// partition to within a single zone. It does so by configuring lease
// preferences on each range in a partition to prefer the same zone.
// Unlike the partitioned replication strategy, it does not prevent
// cross-zone replication.
//
// Example of 9 warehouses partitioned over 3 zones:
// partitions = [0,1,2], [3,4,5], [6,7,8]
// w = warehouse #
// L = leaseholder
//
// us-east1-b:
// n1 = [w0(L), w3, w6]
// n2 = [w1(L), w4, w7]
// n3 = [w2(L), w5, w8]
//
// us-west1-b:
// n4 = [w0, w1, w2 ]
// n5 = [w3(L), w4(L), w5(L)]
// n6 = [w6, w7, w8 ]
//
// europe-west2-b:
// n7 = [w2, w5, w8(L)]
// n8 = [w1, w4, w7(L)]
// n9 = [w0, w3, w6(L)]
//
// NOTE: a copy of each range is randomly scattered within each zone.
//
partitionedLeases
)

// Part of pflag's Value interface.
func (ps partitionStrategy) String() string {
switch ps {
case partitionedReplication:
return "replication"
case partitionedLeases:
return "leases"
}
panic("unexpected")
}

// Part of pflag's Value interface.
func (ps *partitionStrategy) Set(value string) error {
switch value {
case "replication":
*ps = partitionedReplication
return nil
case "leases":
*ps = partitionedLeases
return nil
}
return errors.Errorf("unknown partition strategy %q", value)
}

// Part of pflag's Value interface.
func (ps partitionStrategy) Type() string {
return "partitionStrategy"
}

type zoneConfig struct {
zones []string
strategy partitionStrategy
}

// partitioner encapsulates all logic related to partitioning discrete numbers
// of warehouses into disjoint sets of roughly equal sizes. Partitions are then
// evenly assigned "active" warehouses, which allows for an even split of live
Expand Down Expand Up @@ -153,31 +248,34 @@ func (p *partitioner) randActive(rng *rand.Rand) int {
return p.totalElems[rng.Intn(len(p.totalElems))]
}

// configureZone sets up zone configs for previously created partitions. By default it adds constraints
// in terms of racks, but if the zones flag is passed into tpcc, it will set the constraints based on the
// geographic zones provided.
func configureZone(db *gosql.DB, table, partition string, constraint int, zones []string) error {
var constraints string
if len(zones) > 0 {
constraints = fmt.Sprintf("[+zone=%s]", zones[constraint])
// configureZone sets up zone configs for previously created partitions. By
// default it adds constraints/preferences in terms of racks, but if the zones
// flag is passed into tpcc, it will set the constraints/preferences based on
// the geographic zones provided.
func configureZone(db *gosql.DB, cfg zoneConfig, table, partition string, partIdx int) error {
var kv string
if len(cfg.zones) > 0 {
kv = fmt.Sprintf("zone=%s", cfg.zones[partIdx])
} else {
constraints = fmt.Sprintf("[+rack=%d]", constraint)
kv = fmt.Sprintf("rack=%d", partIdx)
}

// We are removing the EXPERIMENTAL keyword in 2.1. For compatibility
// with 2.0 clusters we still need to try with it if the
// syntax without EXPERIMENTAL fails.
// TODO(knz): Remove this in 2.2.
sql := fmt.Sprintf(`ALTER PARTITION %s OF TABLE %s CONFIGURE ZONE USING constraints = '%s'`,
partition, table, constraints)
_, err := db.Exec(sql)
if err != nil && strings.Contains(err.Error(), "syntax error") {
sql = fmt.Sprintf(`ALTER PARTITION %s OF TABLE %s EXPERIMENTAL CONFIGURE ZONE 'constraints: %s'`,
partition, table, constraints)
_, err = db.Exec(sql)
var opts string
switch cfg.strategy {
case partitionedReplication:
// Place all replicas in the zone.
opts = fmt.Sprintf(`constraints = '[+%s]'`, kv)
case partitionedLeases:
// Place one replica in the zone and give that replica lease preference.
opts = fmt.Sprintf(`constraints = '{"+%s":1}', lease_preferences = '[[+%s]]'`, kv, kv)
default:
panic("unexpected")
}
if err != nil {
return errors.Wrapf(err, "Couldn't exec %s", sql)

sql := fmt.Sprintf(`ALTER PARTITION %s OF TABLE %s CONFIGURE ZONE USING %s`,
partition, table, opts)
if _, err := db.Exec(sql); err != nil {
return errors.Wrapf(err, "Couldn't exec %q", sql)
}
return nil
}
Expand All @@ -186,7 +284,7 @@ func configureZone(db *gosql.DB, table, partition string, constraint int, zones
// provided name, given the partitioning. Callers of the function must specify
// the associated table and the partition's number.
func partitionObject(
db *gosql.DB, p *partitioner, zones []string, obj, name, col, table string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, obj, name, col, table string, idx int,
) error {
var buf bytes.Buffer
fmt.Fprintf(&buf, "ALTER %s %s PARTITION BY RANGE (%s) (\n", obj, name, col)
Expand All @@ -200,25 +298,25 @@ func partitionObject(
}
buf.WriteString(")\n")
if _, err := db.Exec(buf.String()); err != nil {
return errors.Wrapf(err, "Couldn't exec %s", buf.String())
return errors.Wrapf(err, "Couldn't exec %q", buf.String())
}

for i := 0; i < p.parts; i++ {
if err := configureZone(db, table, fmt.Sprintf("p%d_%d", idx, i), i, zones); err != nil {
if err := configureZone(db, cfg, table, fmt.Sprintf("p%d_%d", idx, i), i); err != nil {
return err
}
}
return nil
}

func partitionTable(
db *gosql.DB, p *partitioner, zones []string, table, col string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, table, col string, idx int,
) error {
return partitionObject(db, p, zones, "TABLE", table, col, table, idx)
return partitionObject(db, cfg, p, "TABLE", table, col, table, idx)
}

func partitionIndex(
db *gosql.DB, p *partitioner, zones []string, table, index, col string, idx int,
db *gosql.DB, cfg zoneConfig, p *partitioner, table, index, col string, idx int,
) error {
if exists, err := indexExists(db, table, index); err != nil {
return err
Expand All @@ -229,97 +327,112 @@ func partitionIndex(
return nil
}
indexStr := fmt.Sprintf("%s@%s", table, index)
return partitionObject(db, p, zones, "INDEX", indexStr, col, table, idx)
return partitionObject(db, cfg, p, "INDEX", indexStr, col, table, idx)
}

func partitionWarehouse(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "warehouse", "w_id", 0)
func partitionWarehouse(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "warehouse", "w_id", 0)
}

func partitionDistrict(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "district", "d_w_id", 0)
func partitionDistrict(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "district", "d_w_id", 0)
}

func partitionNewOrder(db *gosql.DB, wPart *partitioner, zones []string) error {
return partitionTable(db, wPart, zones, "new_order", "no_w_id", 0)
func partitionNewOrder(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
return partitionTable(db, cfg, wPart, "new_order", "no_w_id", 0)
}

func partitionOrder(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, `"order"`, "o_w_id", 0); err != nil {
func partitionOrder(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, `"order"`, "o_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, `"order"`, "order_idx", "o_w_id", 1)
return partitionIndex(db, cfg, wPart, `"order"`, "order_idx", "o_w_id", 1)
}

func partitionOrderLine(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "order_line", "ol_w_id", 0); err != nil {
func partitionOrderLine(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "order_line", "ol_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "order_line", "order_line_stock_fk_idx", "ol_supply_w_id", 1)
return partitionIndex(db, cfg, wPart, "order_line", "order_line_stock_fk_idx", "ol_supply_w_id", 1)
}

func partitionStock(db *gosql.DB, wPart, iPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "stock", "s_w_id", 0); err != nil {
return err
}
return partitionIndex(db, iPart, zones, "stock", "stock_item_fk_idx", "s_i_id", 1)
func partitionStock(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
// The stock_item_fk_idx can't be partitioned because it doesn't have a
// warehouse prefix. It's an all-around unfortunate index that we only
// need because of a restriction in SQL. See #36859 and #37255.
return partitionTable(db, cfg, wPart, "stock", "s_w_id", 0)
}

func partitionCustomer(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "customer", "c_w_id", 0); err != nil {
func partitionCustomer(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "customer", "c_w_id", 0); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "customer", "customer_idx", "c_w_id", 1)
return partitionIndex(db, cfg, wPart, "customer", "customer_idx", "c_w_id", 1)
}

func partitionHistory(db *gosql.DB, wPart *partitioner, zones []string) error {
if err := partitionTable(db, wPart, zones, "history", "h_w_id", 0); err != nil {
func partitionHistory(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionTable(db, cfg, wPart, "history", "h_w_id", 0); err != nil {
return err
}
if err := partitionIndex(db, wPart, zones, "history", "history_customer_fk_idx", "h_c_w_id", 1); err != nil {
if err := partitionIndex(db, cfg, wPart, "history", "history_customer_fk_idx", "h_c_w_id", 1); err != nil {
return err
}
return partitionIndex(db, wPart, zones, "history", "history_district_fk_idx", "h_w_id", 2)
return partitionIndex(db, cfg, wPart, "history", "history_district_fk_idx", "h_w_id", 2)
}

func partitionItem(db *gosql.DB, iPart *partitioner, zones []string) error {
return partitionTable(db, iPart, zones, "item", "i_id", 0)
}
// replicateItem creates a covering "replicated index" for the item table for
// each of the zones provided. The item table is immutable, so this comes at a
// negligible cost and allows all lookups into it to be local.
func replicateItem(db *gosql.DB, cfg zoneConfig) error {
for i, zone := range cfg.zones {
idxName := fmt.Sprintf("replicated_idx_%d", i)

create := fmt.Sprintf(`
CREATE UNIQUE INDEX %s
ON item (i_id)
STORING (i_im_id, i_name, i_price, i_data)`,
idxName)
if _, err := db.Exec(create); err != nil {
return errors.Wrapf(err, "Couldn't exec %q", create)
}

func partitionTables(db *gosql.DB, wPart *partitioner, zones []string) error {
// Create a separate partitioning for the fixed-size items table and its
// associated indexes.
const nItems = 100000
iPart, err := makePartitioner(nItems, nItems, wPart.parts)
if err != nil {
return errors.Wrap(err, "creating item partitioner")
configure := fmt.Sprintf(`
ALTER INDEX item@%s
CONFIGURE ZONE USING lease_preferences = '[[+zone=%s]]'`,
idxName, zone)
if _, err := db.Exec(configure); err != nil {
return errors.Wrapf(err, "Couldn't exec %q", configure)
}
}
return nil
}

if err := partitionWarehouse(db, wPart, zones); err != nil {
func partitionTables(db *gosql.DB, cfg zoneConfig, wPart *partitioner) error {
if err := partitionWarehouse(db, cfg, wPart); err != nil {
return err
}
if err := partitionDistrict(db, wPart, zones); err != nil {
if err := partitionDistrict(db, cfg, wPart); err != nil {
return err
}
if err := partitionNewOrder(db, wPart, zones); err != nil {
if err := partitionNewOrder(db, cfg, wPart); err != nil {
return err
}
if err := partitionOrder(db, wPart, zones); err != nil {
if err := partitionOrder(db, cfg, wPart); err != nil {
return err
}
if err := partitionOrderLine(db, wPart, zones); err != nil {
if err := partitionOrderLine(db, cfg, wPart); err != nil {
return err
}
if err := partitionStock(db, wPart, iPart, zones); err != nil {
if err := partitionStock(db, cfg, wPart); err != nil {
return err
}
if err := partitionCustomer(db, wPart, zones); err != nil {
if err := partitionCustomer(db, cfg, wPart); err != nil {
return err
}
if err := partitionHistory(db, wPart, zones); err != nil {
if err := partitionHistory(db, cfg, wPart); err != nil {
return err
}
return partitionItem(db, iPart, zones)
return replicateItem(db, cfg)
}

func partitionCount(db *gosql.DB) (int, error) {
Expand Down
Loading