Skip to content

Commit

Permalink
decouple olap tx timeout from oltp tx timeout (vitessio#1040)
Browse files Browse the repository at this point in the history
* decouple olap tx timeout from oltp tx timeout

Since workload=olap bypasses the query timeouts
(--queryserver-config-query-timeout) and also row limits, the natural
assumption is that it also bypasses the transaction timeout.

This is not the case, e.g. for a tablet where the
--queryserver-config-transaction-timeout is 10.

This commit:

 * Adds new CLI flag and YAML field to independently configure TX
   timeouts for OLAP workloads (--queryserver-config-olap-transaction-timeout).
 * Decouples TX kill interval from OLTP TX timeout via new CLI flag and
   YAML field (--queryserver-config-transaction-killer-interval).

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#1

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#2 consolidate timeout logic in sc

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: remove unused tx killer flag

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: update 15_0_0_summary.md

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: fix race cond

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#3 -txProps.timeout, +sc.expiryTime

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#4 -atomic.Value for expiryTime

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: fix race cond (without atomic.Value)

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#5 -unused funcs, fix comments, set ticks interval once

Signed-off-by: Max Englander <[email protected]>

* decouple ol{a,t}p tx timeouts: pr comments vitessio#5 +txkill tests

Signed-off-by: Max Englander <[email protected]>

* fix flags

Signed-off-by: Max Englander <[email protected]>

Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander authored Sep 15, 2022
1 parent b72f326 commit c5a99ec
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 193 deletions.
18 changes: 18 additions & 0 deletions doc/releasenotes/15_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ The reason you cannot change all the values together is because the restore proc
should be used to process the previous backup. Please make sure you have thought out all possible scenarios for restore before transitioning from one
compression engine to another.

#### Independent OLAP and OLTP transactional timeouts

`--queryserver-config-olap-transaction-timeout` specifies the timeout applied
to a transaction created within an OLAP workload. The default value is `30`
seconds, but this can be raised, lowered, or set to zero to disable the timeout
altogether.

Until now, while OLAP queries would bypass the query timeout, transactions
created within an OLAP session would be rolled back
`--queryserver-config-transaction-timeout` seconds after the transaction was
started.

As of now, OLTP and OLAP transaction timeouts can be configured independently of each
other.

The main use case is to run queries spanning a long period of time which
require transactional guarantees such as consistency or atomicity.

### Online DDL changes

#### Concurrent vitess migrations
Expand Down
4 changes: 3 additions & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,8 @@ Usage of vttablet:
DEPRECATED
--queryserver-config-message-postpone-cap int
query server message postpone cap is the maximum number of messages that can be postponed at any given time. Set this number to substantially lower than transaction cap, so that the transaction pool isn't exhausted by the message subsystem. (default 4)
--queryserver-config-olap-transaction-timeout float
query server transaction timeout (in seconds), after which a transaction in an OLAP session will be killed (default 30)
--queryserver-config-passthrough-dmls
query server pass through all dml statements without rewriting
--queryserver-config-pool-prefill-parallelism int
Expand Down Expand Up @@ -724,7 +726,7 @@ Usage of vttablet:
--queryserver-config-transaction-prefill-parallelism int
query server transaction prefill parallelism, a non-zero value will prefill the pool using the specified parallism.
--queryserver-config-transaction-timeout float
query server transaction timeout (in seconds), a transaction will be killed if it takes longer than this value (default 30)
query server transaction timeout (in seconds), after which a transaction in an OLTP session will be killed (default 30)
--queryserver-config-txpool-timeout float
query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1)
--queryserver-config-txpool-waiter-cap int
Expand Down
65 changes: 9 additions & 56 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ type Numbered struct {
}

type numberedWrapper struct {
val any
inUse bool
purpose string
timeCreated time.Time
timeUsed time.Time
enforceTimeout bool
val any
inUse bool
purpose string
}

type unregistered struct {
Expand All @@ -62,14 +59,10 @@ func NewNumbered() *Numbered {
// Register starts tracking a resource by the supplied id.
// It does not lock the object.
// It returns an error if the id already exists.
func (nu *Numbered) Register(id int64, val any, enforceTimeout bool) error {
func (nu *Numbered) Register(id int64, val any) error {
// Optimistically assume we're not double registering.
now := time.Now()
resource := &numberedWrapper{
val: val,
timeCreated: now,
timeUsed: now,
enforceTimeout: enforceTimeout,
val: val,
}

nu.mu.Lock()
Expand Down Expand Up @@ -129,16 +122,15 @@ func (nu *Numbered) Get(id int64, purpose string) (val any, err error) {
}

// Put unlocks a resource for someone else to use.
func (nu *Numbered) Put(id int64, updateTime bool) {
func (nu *Numbered) Put(id int64) bool {
nu.mu.Lock()
defer nu.mu.Unlock()
if nw, ok := nu.resources[id]; ok {
nw.inUse = false
nw.purpose = ""
if updateTime {
nw.timeUsed = time.Now()
}
return true
}
return false
}

// GetAll returns the list of all resources in the pool.
Expand All @@ -157,50 +149,11 @@ func (nu *Numbered) GetAll() (vals []any) {
func (nu *Numbered) GetByFilter(purpose string, match func(val any) bool) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
for _, nw := range nu.resources {
if nw.inUse || !nw.enforceTimeout {
continue
}
if match(nw.val) {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
return vals
}

// GetOutdated returns a list of resources that are older than age, and locks them.
// It does not return any resources that are already locked.
func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
for _, nw := range nu.resources {
if nw.inUse || !nw.enforceTimeout {
continue
}
if nw.timeUsed.Add(age).Sub(now) <= 0 {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
return vals
}

// GetIdle returns a list of resurces that have been idle for longer
// than timeout, and locks them. It does not return any resources that
// are already locked.
func (nu *Numbered) GetIdle(timeout time.Duration, purpose string) (vals []any) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
for _, nw := range nu.resources {
if nw.inUse {
continue
}
if nw.timeUsed.Add(timeout).Sub(now) <= 0 {
if match(nw.val) {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
Expand Down
65 changes: 10 additions & 55 deletions go/pools/numbered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -30,10 +29,10 @@ func TestNumberedGeneral(t *testing.T) {
id := int64(0)
p := NewNumbered()

err := p.Register(id, id, true)
err := p.Register(id, id)
require.NoError(t, err)

err = p.Register(id, id, true)
err = p.Register(id, id)
assert.Contains(t, "already present", err.Error())

var v any
Expand All @@ -44,7 +43,7 @@ func TestNumberedGeneral(t *testing.T) {
_, err = p.Get(id, "test1")
assert.Contains(t, "in use: test", err.Error())

p.Put(id, true)
p.Put(id)
_, err = p.Get(1, "test2")
assert.Contains(t, "not found", err.Error())
p.Unregister(1, "test") // Should not fail
Expand All @@ -55,61 +54,17 @@ func TestNumberedGeneral(t *testing.T) {
t.Errorf("want prefix 'ended at' and suffix '(test)', got '%v'", err)
}

id = 0
p.Register(id, id, true)
id = 1
p.Register(id, id, true)
id = 2
p.Register(id, id, false)
time.Sleep(300 * time.Millisecond)
id = 3
p.Register(id, id, true)
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2, 3 (0, 1, 2 are aged, but 2 is not enforced)
vals := p.GetOutdated(200*time.Millisecond, "by outdated")
if num := len(vals); num != 2 {
t.Errorf("want 2, got %v", num)
if p.Size() != 0 {
t.Errorf("want 0, got %v", p.Size())
}
if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by outdated" {
t.Errorf("want 'in use: by outdated', got '%v'", err)
}
for _, v := range vals {
p.Put(v.(int64), true)
}
p.Put(2, true) // put to 2 to ensure it's not idle
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2 (2 is idle)
vals = p.GetIdle(200*time.Millisecond, "by idle")
if len(vals) != 1 {
t.Errorf("want 1, got %v", len(vals))
}
if _, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by idle" {
t.Errorf("want 'in use: by idle', got '%v'", err)
}
if vals[0].(int64) != 3 {
t.Errorf("want 3, got %v", vals[0])
}
p.Unregister(vals[0].(int64), "test")

// p has 0, 1, and 2
if p.Size() != 3 {
t.Errorf("want 3, got %v", p.Size())
}
go func() {
p.Unregister(0, "test")
p.Unregister(1, "test")
p.Unregister(2, "test")
}()
p.WaitForEmpty()
}

func TestNumberedGetByFilter(t *testing.T) {
p := NewNumbered()
p.Register(1, 1, true)
p.Register(2, 2, true)
p.Register(3, 3, true)
p.Register(1, 1)
p.Register(2, 2)
p.Register(3, 3)
p.Get(1, "locked")

vals := p.GetByFilter("filtered", func(v any) bool {
Expand All @@ -133,7 +88,7 @@ func BenchmarkRegisterUnregister(b *testing.B) {
id := int64(1)
val := "foobarbazdummyval"
for i := 0; i < b.N; i++ {
p.Register(id, val, false)
p.Register(id, val)
p.Unregister(id, "some reason")
}
}
Expand All @@ -145,7 +100,7 @@ func BenchmarkRegisterUnregisterParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
id := rand.Int63()
p.Register(id, val, false)
p.Register(id, val)
p.Unregister(id, "some reason")
}
})
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (bt *BufferingTest) createCluster() (*cluster.LocalProcessCluster, int) {
SchemaSQL: sqlSchema,
VSchema: bt.VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s",
clusterInstance.VtTabletExtraArgs = []string{
"--health_check_interval", "1s",
"--queryserver-config-transaction-timeout", "20",
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/godriver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func TestMain(m *testing.M) {
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "3",
}
if err := clusterInstance.StartKeyspace(*Keyspace, []string{"-80", "80-"}, 1, false); err != nil {
log.Fatal(err.Error())
return 1
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/readafterwrite/raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func TestMain(m *testing.M) {
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "5",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/reservedconn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func TestMain(m *testing.M) {
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "5",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vtgate/reservedconn/reconnect2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func TestMain(m *testing.M) {
SchemaSQL: sqlSchema,
VSchema: vSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "5"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "5",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true); err != nil {
return 1
}
Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func TestMain(m *testing.M) {
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-transaction-timeout", "3", "--queryserver-config-max-result-size", "30"}
clusterInstance.VtTabletExtraArgs = []string{
"--queryserver-config-transaction-timeout", "3",
"--queryserver-config-max-result-size", "30",
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,11 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {

func TestShortTxTimeout(t *testing.T) {
client := framework.NewClient()
defer framework.Server.SetTxTimeout(framework.Server.TxTimeout())
framework.Server.SetTxTimeout(10 * time.Millisecond)
defer framework.Server.Config().SetTxTimeoutForWorkload(
framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP),
querypb.ExecuteOptions_OLTP,
)
framework.Server.Config().SetTxTimeoutForWorkload(10*time.Millisecond, querypb.ExecuteOptions_OLTP)

err := client.Begin(false)
require.NoError(t, err)
Expand Down
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletserver/stateful_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type StatefulConnection struct {
reservedProps *Properties
tainted bool
enforceTimeout bool
timeout time.Duration
expiryTime time.Time
}

// Properties contains meta information about the connection
Expand Down Expand Up @@ -77,6 +79,16 @@ func (sc *StatefulConnection) IsInTransaction() bool {
return sc.txProps != nil
}

func (sc *StatefulConnection) ElapsedTimeout() bool {
if !sc.enforceTimeout {
return false
}
if sc.timeout <= 0 {
return false
}
return sc.expiryTime.Before(time.Now())
}

// Exec executes the statement in the dedicated connection
func (sc *StatefulConnection) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
if sc.IsClosed() {
Expand Down Expand Up @@ -273,6 +285,11 @@ func (sc *StatefulConnection) LogTransaction(reason tx.ReleaseReason) {
tabletenv.TxLogger.Send(sc)
}

func (sc *StatefulConnection) SetTimeout(timeout time.Duration) {
sc.timeout = timeout
sc.resetExpiryTime()
}

// logReservedConn logs reserved connection related stats.
func (sc *StatefulConnection) logReservedConn() {
if sc.reservedProps == nil {
Expand All @@ -292,3 +309,7 @@ func (sc *StatefulConnection) getUsername() string {
}
return callerid.GetUsername(sc.reservedProps.ImmediateCaller)
}

func (sc *StatefulConnection) resetExpiryTime() {
sc.expiryTime = time.Now().Add(sc.timeout)
}
Loading

0 comments on commit c5a99ec

Please sign in to comment.