diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 63b8309137b..d3459b06d53 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -71,6 +71,7 @@ type ResourcePool struct { resources chan resourceWrapper factory Factory idleTimer *timer.Timer + logWait func(time.Time) } type resourceWrapper struct { @@ -89,7 +90,7 @@ type resourceWrapper struct { // An idleTimeout of 0 means that there is no timeout. // A non-zero value of prefillParallelism causes the pool to be pre-filled. // The value specifies how many resources can be opened in parallel. -func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool { +func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool { if capacity <= 0 || maxCap <= 0 || capacity > maxCap { panic(errors.New("invalid/out of range capacity")) } @@ -99,6 +100,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur available: sync2.NewAtomicInt64(int64(capacity)), capacity: sync2.NewAtomicInt64(int64(capacity)), idleTimeout: sync2.NewAtomicDuration(idleTimeout), + logWait: logWait, } for i := 0; i < capacity; i++ { rp.resources <- resourceWrapper{} @@ -325,6 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error { func (rp *ResourcePool) recordWait(start time.Time) { rp.waitCount.Add(1) rp.waitTime.Add(time.Since(start)) + rp.logWait(start) } // SetIdleTimeout sets the idle timeout. It can only be used if there was an diff --git a/go/pools/resource_pool_flaky_test.go b/go/pools/resource_pool_flaky_test.go index f3950e5e23e..eeb3f8eda9a 100644 --- a/go/pools/resource_pool_flaky_test.go +++ b/go/pools/resource_pool_flaky_test.go @@ -26,6 +26,7 @@ import ( ) var lastID, count sync2.AtomicInt64 +var waitStarts []time.Time type TestResource struct { num int64 @@ -39,6 +40,10 @@ func (tr *TestResource) Close() { } } +func logWait(start time.Time) { + waitStarts = append(waitStarts, start) +} + func PoolFactory() (Resource, error) { count.Add(1) return &TestResource{lastID.Add(1), false}, nil @@ -57,7 +62,9 @@ func TestOpen(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait) p.SetCapacity(5) var resources [10]Resource @@ -74,6 +81,9 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 0 { t.Errorf("expecting 0, received %d", p.WaitCount()) } + if len(waitStarts) != 0 { + t.Errorf("expecting 0, received %d", len(waitStarts)) + } if p.WaitTime() != 0 { t.Errorf("expecting 0, received %d", p.WaitTime()) } @@ -109,6 +119,15 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 5 { t.Errorf("Expecting 5, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } + // verify start times are monotonic increasing + for i := 1; i < len(waitStarts); i++ { + if waitStarts[i].Before(waitStarts[i-1]) { + t.Errorf("Expecting monotonic increasing start times") + } + } if p.WaitTime() == 0 { t.Errorf("Expecting non-zero") } @@ -198,12 +217,12 @@ func TestOpen(t *testing.T) { func TestPrefill(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 5 { t.Errorf("p.Active(): %d, want 5", p.Active()) } - p = NewResourcePool(FailFactory, 5, 5, time.Second, 1) + p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 0 { t.Errorf("p.Active(): %d, want 0", p.Active()) @@ -218,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) { defer func() { prefillTimeout = saveTimeout }() start := time.Now() - p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1) + p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if elapsed := time.Since(start); elapsed > 20*time.Millisecond { t.Errorf("elapsed: %v, should be around 10ms", elapsed) @@ -232,7 +251,9 @@ func TestShrinking(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource // Leave one empty slot in the pool for i := 0; i < 4; i++ { @@ -315,6 +336,9 @@ func TestShrinking(t *testing.T) { if p.WaitCount() != 1 { t.Errorf("Expecting 1, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } if count.Get() != 2 { t.Errorf("Expecting 2, received %d", count.Get()) } @@ -371,7 +395,7 @@ func TestClosing(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource for i := 0; i < 5; i++ { r, err := p.Get(ctx) @@ -425,7 +449,7 @@ func TestIdleTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) @@ -536,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -557,7 +581,7 @@ func TestCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(FailFactory, 5, 5, time.Second, 0) + p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait) defer p.Close() if _, err := p.Get(ctx); err.Error() != "Failed" { t.Errorf("Expecting Failed, received %v", err) @@ -573,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) defer p.Close() _, err := p.Get(ctx) if err != nil { @@ -590,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0) + p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait) defer p.Close() ch := make(chan bool) // The third Get should not wait indefinitely @@ -612,7 +636,7 @@ func TestTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -631,7 +655,7 @@ func TestTimeout(t *testing.T) { func TestExpired(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) r, err := p.Get(ctx) diff --git a/go/vt/dbconnpool/connection_pool.go b/go/vt/dbconnpool/connection_pool.go index 76520467665..76466592f64 100644 --- a/go/vt/dbconnpool/connection_pool.go +++ b/go/vt/dbconnpool/connection_pool.go @@ -65,12 +65,13 @@ type ConnectionPool struct { hostIsNotIP bool mysqlStats *stats.Timings + name string } // NewConnectionPool creates a new ConnectionPool. The name is used // to publish stats only. func NewConnectionPool(name string, capacity int, idleTimeout time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool { - cp := &ConnectionPool{capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency} + cp := &ConnectionPool{name: name, capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency} if name == "" || usedNames[name] { return cp } @@ -146,7 +147,7 @@ func (cp *ConnectionPool) Open(info dbconfigs.Connector, mysqlStats *stats.Timin defer cp.mu.Unlock() cp.info = info cp.mysqlStats = mysqlStats - cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0) + cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback()) // Check if we need to resolve a hostname (The Host is not just an IP address). if cp.resolutionFrequency > 0 && net.ParseIP(info.Host()) == nil { cp.hostIsNotIP = true @@ -168,6 +169,15 @@ func (cp *ConnectionPool) Open(info dbconfigs.Connector, mysqlStats *stats.Timin } } +func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // connect is used by the resource pool to create a new Resource. func (cp *ConnectionPool) connect() (pools.Resource, error) { c, err := NewDBConnection(cp.info, cp.mysqlStats) diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go index 39634869519..4bd440929f9 100644 --- a/go/vt/vttablet/endtoend/misc_test.go +++ b/go/vt/vttablet/endtoend/misc_test.go @@ -296,10 +296,6 @@ func TestConsolidation(t *testing.T) { wg.Wait() vend := framework.DebugVars() - if err := compareIntDiff(vend, "Waits/TotalCount", vstart, 1); err != nil { - t.Logf("DebugVars Waits/TotalCount not incremented with sleep=%v", sleep) - continue - } if err := compareIntDiff(vend, "Waits/Histograms/Consolidations/Count", vstart, 1); err != nil { t.Logf("DebugVars Waits/Histograms/Consolidations/Count not incremented with sleep=%v", sleep) continue diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 78e811bb574..280c8acb23b 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "strings" + "sync" "testing" "time" @@ -812,3 +813,44 @@ func TestManualTwopcz(t *testing.T) { fmt.Print("Sleeping for 30 seconds\n") time.Sleep(30 * time.Second) } + +func TestTransactionPoolResourceWaitTime(t *testing.T) { + defer framework.Server.SetPoolSize(framework.Server.TxPoolSize()) + defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout()) + framework.Server.SetTxPoolSize(1) + framework.Server.SetTxPoolTimeout(10 * time.Second) + debugVarPath := "Waits/Histograms/TransactionPoolResourceWaitTime/Count" + + for sleep := 0.1; sleep < 10.0; sleep *= 2 { + vstart := framework.DebugVars() + var wg sync.WaitGroup + wg.Add(2) + + transactionFunc := func() { + client := framework.NewClient() + + bv := map[string]*querypb.BindVariable{} + query := fmt.Sprintf("select sleep(%v) from dual", sleep) + if _, err := client.BeginExecute(query, bv); err != nil { + t.Error(err) + return + } + if err := client.Rollback(); err != nil { + t.Error(err) + return + } + wg.Done() + } + go transactionFunc() + go transactionFunc() + wg.Wait() + vend := framework.DebugVars() + if err := compareIntDiff(vend, debugVarPath, vstart, 1); err != nil { + t.Logf("DebugVars %v not incremented with sleep=%v", debugVarPath, sleep) + continue + } + t.Logf("DebugVars %v properly incremented with sleep=%v", debugVarPath, sleep) + return + } + t.Errorf("DebugVars %v not incremented", debugVarPath) +} diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index f9fc025150d..aa48333ab92 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) { f := func() (pools.Resource, error) { return NewDBConn(cp, appParams) } - cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism) + cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback()) cp.appDebugParams = appDebugParams cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats) } +func (cp *Pool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // Close will close the pool and wait for connections to be returned before // exiting. func (cp *Pool) Close() {