Skip to content

Commit

Permalink
sql: deflake and unskip TestTenantStatementTimeoutAdmissionQueueCance…
Browse files Browse the repository at this point in the history
…lation

Make sure we get 4 blockers parked in the admission control queues and no more
and retry the main query until it gets parked and booted from it, before we
expected the main query to always reach the admission control q but that isn't
reliable.

Fixes: #78494
  • Loading branch information
cucaroach committed Mar 30, 2023
1 parent 4d62f8e commit 5bf7940
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ go_test(
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_petermattis_goid//:goid",
"@com_github_pmezard_go_difflib//difflib",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
52 changes: 38 additions & 14 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/petermattis/goid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -836,7 +838,6 @@ func getUserConn(t *testing.T, username string, server serverutils.TestServerInt
// main statement with a timeout is blocked.
func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 78494, "flaky test")
defer log.Scope(t).Close(t)

skip.UnderStress(t, "times out under stress")
Expand All @@ -845,8 +846,9 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tenantID := serverutils.TestTenantID()

var hitMainQuery uint64
numBlockers := 4
var matches int64

// We can't get the tableID programmatically here, checked below with assert.
const tableID = 104
Expand All @@ -865,9 +867,12 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
matchBatch := func(ctx context.Context, req *kvpb.BatchRequest) bool {
tid, ok := roachpb.ClientTenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
return true
scan, ok := req.Requests[0].GetInner().(*kvpb.GetRequest)
if ok {
if tableSpan.ContainsKey(scan.Key) {
log.Infof(ctx, "matchBatch %d", goid.Get())
return true
}
}
}
return false
Expand All @@ -885,18 +890,30 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, req *kvpb.BatchRequest) *kvpb.Error {
if matchBatch(ctx, req) {
m := atomic.AddInt64(&matches, 1)
// If any of the blockers get retried just ignore.
if m > int64(numBlockers) {
log.Infof(ctx, "ignoring extra blocker %d", goid.Get())
return nil
}
// Notify we're blocking.
log.Infof(ctx, "blocking %d", goid.Get())
unblockClientCh <- struct{}{}
<-qBlockersCh
}
return nil
},
TestingResponseErrorEvent: func(ctx context.Context, req *kvpb.BatchRequest, err error) {
if matchBatch(ctx, req) {
tid, ok := roachpb.ClientTenantFromContext(ctx)
if ok && tid == tenantID && len(req.Requests) > 0 {
scan, ok := req.Requests[0].GetInner().(*kvpb.ScanRequest)
if ok && tableSpan.ContainsKey(scan.Key) {
cancel()
wg.Done()
log.Infof(ctx, "%s %d", scan, goid.Get())
if ok {
if tableSpan.ContainsKey(scan.Key) && atomic.CompareAndSwapUint64(&hitMainQuery, 0, 1) {
log.Infof(ctx, "got scan request error %d", goid.Get())
cancel()
wg.Done()
}
}
}
},
Expand All @@ -911,8 +928,8 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
defer db.Close()

r1 := sqlutils.MakeSQLRunner(db)
r1.Exec(t, `CREATE TABLE foo (t int)`)

r1.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false`)
r1.Exec(t, `CREATE TABLE foo (t int PRIMARY KEY)`)
row := r1.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'foo'`)
var id int64
row.Scan(&id)
Expand All @@ -931,18 +948,25 @@ func TestTenantStatementTimeoutAdmissionQueueCancelation(t *testing.T) {
for _, r := range blockers {
go func(r *sqlutils.SQLRunner) {
defer wg.Done()
r.Exec(t, `SELECT * FROM foo`)
r.Exec(t, `SELECT * FROM foo WHERE t = 1234`)
}(r)
}
// Wait till all blockers are parked.
for i := 0; i < numBlockers; i++ {
<-unblockClientCh
}
client.ExpectErr(t, "timeout", `SELECT * FROM foo`)
// Unblock the blockers.
log.Infof(ctx, "blockers parked")
// Because we don't know when statement timeout will happen we have to repeat
// till we get one into the KV layer.
for atomic.LoadUint64(&hitMainQuery) == 0 {
_, err := client.DB.ExecContext(context.Background(), `SELECT * FROM foo`)
require.Error(t, err)
log.Infof(ctx, "main req finished: %v", err)
}
for i := 0; i < numBlockers; i++ {
qBlockersCh <- struct{}{}
}
log.Infof(ctx, "unblocked blockers")
wg.Wait()
require.ErrorIs(t, ctx.Err(), context.Canceled)
}

0 comments on commit 5bf7940

Please sign in to comment.