Skip to content

Commit

Permalink
Merge pull request #3123 from michael-berlin/hotrow_concurrency
Browse files Browse the repository at this point in the history
vttablet: Hot Row Protection: Make the transaction concurrency for a hot row configurable.
  • Loading branch information
michael-berlin authored Sep 5, 2017
2 parents c3523e8 + c2c7a4b commit 7084913
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 78 deletions.
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab

qe.consolidator = sync2.NewConsolidator()
qe.txSerializer = txserializer.New(config.EnableHotRowProtectionDryRun,
config.HotRowProtectionMaxQueueSize, config.HotRowProtectionMaxGlobalQueueSize)
config.HotRowProtectionMaxQueueSize,
config.HotRowProtectionMaxGlobalQueueSize,
config.HotRowProtectionConcurrentTransactions)
qe.streamQList = NewQueryList()

qe.autoCommit.Set(config.EnableAutoCommit)
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func init() {
flag.BoolVar(&Config.EnableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", DefaultQsConfig.EnableHotRowProtectionDryRun, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
flag.IntVar(&Config.HotRowProtectionMaxQueueSize, "hot_row_protection_max_queue_size", DefaultQsConfig.HotRowProtectionMaxQueueSize, "Maximum number of BeginExecute RPCs which will be queued for the same row (range).")
flag.IntVar(&Config.HotRowProtectionMaxGlobalQueueSize, "hot_row_protection_max_global_queue_size", DefaultQsConfig.HotRowProtectionMaxGlobalQueueSize, "Global queue limit across all row (ranges). Useful to prevent that the queue can grow unbounded.")
flag.IntVar(&Config.HotRowProtectionConcurrentTransactions, "hot_row_protection_concurrent_transactions", DefaultQsConfig.HotRowProtectionConcurrentTransactions, "Number of concurrent transactions let through to the txpool/MySQL for the same hot row. Should be > 1 to have enough 'ready' transactions in MySQL and benefit from a pipelining effect.")

flag.BoolVar(&Config.HeartbeatEnable, "heartbeat_enable", DefaultQsConfig.HeartbeatEnable, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&Config.HeartbeatInterval, "heartbeat_interval", DefaultQsConfig.HeartbeatInterval, "How frequently to read and write replication heartbeat.")
Expand Down Expand Up @@ -125,10 +126,11 @@ type TabletConfig struct {
TxThrottlerConfig string
TxThrottlerHealthCheckCells []string

EnableHotRowProtection bool
EnableHotRowProtectionDryRun bool
HotRowProtectionMaxQueueSize int
HotRowProtectionMaxGlobalQueueSize int
EnableHotRowProtection bool
EnableHotRowProtectionDryRun bool
HotRowProtectionMaxQueueSize int
HotRowProtectionMaxGlobalQueueSize int
HotRowProtectionConcurrentTransactions int

HeartbeatEnable bool
HeartbeatInterval time.Duration
Expand Down Expand Up @@ -180,6 +182,9 @@ var DefaultQsConfig = TabletConfig{
// Default value is the same as TransactionCap.
HotRowProtectionMaxQueueSize: 20,
HotRowProtectionMaxGlobalQueueSize: 1000,
// Allow more than 1 transaction for the same hot row through to have enough
// of them ready in MySQL and profit from a pipelining effect.
HotRowProtectionConcurrentTransactions: 5,

HeartbeatEnable: false,
HeartbeatInterval: 1 * time.Second,
Expand Down Expand Up @@ -218,6 +223,9 @@ func VerifyConfig() error {
if globalSize, size := Config.HotRowProtectionMaxGlobalQueueSize, Config.HotRowProtectionMaxQueueSize; globalSize < size {
return fmt.Errorf("global queue size must be >= per row (range) queue size: -hot_row_protection_max_global_queue_size < hot_row_protection_max_queue_size (%v < %v)", globalSize, size)
}
if v := Config.HotRowProtectionConcurrentTransactions; v <= 0 {
return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
return nil
}

Expand Down
138 changes: 134 additions & 4 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 1
// Reduce the txpool to 2 because we should never consume more than two slots.
config.TransactionCap = 2
tsv := NewTabletServerWithNilTopoServer(config)
Expand Down Expand Up @@ -1512,7 +1513,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */",
func() {
close(tx1Started)
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
t.Fatal(err)
}
})
Expand Down Expand Up @@ -1580,7 +1581,134 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
}
}

func waitForTxSerializationCount(tsv *TabletServer, key string, i int) error {
func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) {
// This test runs three transaction in parallel:
// tx1 | tx2 | tx3
// Out of these three, two can run in parallel because we increased the
// ConcurrentTransactions limit to 2.
// One out of the three transaction will always get serialized though.
db := setUpTabletServerTest(t)
defer db.Close()
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 2
// Reduce the txpool to 2 because we should never consume more than two slots.
config.TransactionCap = 2
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
if err := tsv.StartService(target, dbconfigs, testUtils.newMysqld(&dbconfigs)); err != nil {
t.Fatalf("StartService failed: %v", err)
}
defer tsv.StopService()
countStart := tabletenv.WaitStats.Counts()["TxSerializer"]

// Fake data.
q1 := "update test_table set name_string = 'tx1' where pk = :pk and name = :name"
q2 := "update test_table set name_string = 'tx2' where pk = :pk and name = :name"
q3 := "update test_table set name_string = 'tx3' where pk = :pk and name = :name"
// Every request needs their own bind variables to avoid data races.
bvTx1 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}
bvTx2 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}
bvTx3 := map[string]*querypb.BindVariable{
"pk": sqltypes.Int64BindVariable(1),
"name": sqltypes.Int64BindVariable(1),
}

tx1Started := make(chan struct{})
allQueriesPending := make(chan struct{})
db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk in (1) /* _stream test_table (pk ) (1 ); */",
func() {
close(tx1Started)
<-allQueriesPending
})

// Run all three transactions.
ctx := context.Background()
wg := sync.WaitGroup{}

// tx1.
wg.Add(1)
go func() {
defer wg.Done()

_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q1, err)
}

if err := tsv.Commit(ctx, &target, tx1); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// tx2.
wg.Add(1)
go func() {
defer wg.Done()

// Wait for tx1 to avoid that this tx could pass tx1, without any contention.
// In that case, we would see less than 3 pending transactions.
<-tx1Started

_, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q2, err)
}

if err := tsv.Commit(ctx, &target, tx2); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// tx3.
wg.Add(1)
go func() {
defer wg.Done()

// Wait for tx1 to avoid that this tx could pass tx1, without any contention.
// In that case, we would see less than 3 pending transactions.
<-tx1Started

_, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
if err != nil {
t.Fatalf("failed to execute query: %s: %s", q3, err)
}

if err := tsv.Commit(ctx, &target, tx3); err != nil {
t.Fatalf("call TabletServer.Commit failed: %v", err)
}
}()

// At this point, all three transactions should be blocked in BeginExecute()
// and therefore count as pending transaction by the Hot Row Protection.
//
// NOTE: We are not doing more sophisticated synchronizations between the
// transactions via db.SetBeforeFunc() for the same reason as mentioned
// in TestSerializeTransactionsSameRow: The MySQL C client does not seem
// to allow more than connection attempt at a time.
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
t.Fatal(err)
}
close(allQueriesPending)

wg.Wait()

got, ok := tabletenv.WaitStats.Counts()["TxSerializer"]
want := countStart + 1
if !ok || got != want {
t.Fatalf("One out of the three transactions must have waited: ok? %v got: %v want: %v", ok, got, want)
}
}

func waitForTxSerializationPendingQueries(tsv *TabletServer, key string, i int) error {
start := time.Now()
for {
got, want := tsv.qe.txSerializer.Pending(key), i
Expand All @@ -1607,6 +1735,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) {
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionMaxQueueSize = 1
config.HotRowProtectionConcurrentTransactions = 1
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
Expand Down Expand Up @@ -1693,6 +1822,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
testUtils := newTestUtils()
config := testUtils.newQueryServiceConfig()
config.EnableHotRowProtection = true
config.HotRowProtectionConcurrentTransactions = 1
tsv := NewTabletServerWithNilTopoServer(config)
dbconfigs := testUtils.newDBConfigs(db)
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
Expand Down Expand Up @@ -1774,7 +1904,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
defer wg.Done()

// Wait until tx1 and tx2 are pending to make the test deterministic.
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 2); err != nil {
t.Fatal(err)
}

Expand All @@ -1789,7 +1919,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
}()

// Wait until tx1, 2 and 3 are pending.
if err := waitForTxSerializationCount(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
if err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and name = 1", 3); err != nil {
t.Fatal(err)
}
// Now unblock tx2 and cancel it.
Expand Down
Loading

0 comments on commit 7084913

Please sign in to comment.