Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make connection killing resilient to MySQL hangs (#14500) #104

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
key := strings.ToLower(query)
db.mu.Lock()
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)
// Check if we should close the connection and provoke errno 2013.
if db.shouldClose.Load() {
defer db.mu.Unlock()
c.Close()

//log error
Expand All @@ -393,7 +393,9 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
// The driver may send this at connection time, and we don't want it to
// interfere.
if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") {
//log error
defer db.mu.Unlock()

// log error
if err := callback(&sqltypes.Result{}); err != nil {
log.Errorf("callback failed : %v", err)
}
Expand All @@ -402,12 +404,14 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R

// check if we should reject it.
if err, ok := db.rejectedData[key]; ok {
db.mu.Unlock()
return err
}

// Check explicit queries from AddQuery().
result, ok := db.data[key]
if ok {
db.mu.Unlock()
if f := result.BeforeFunc; f != nil {
f()
}
Expand All @@ -418,6 +422,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
db.mu.Unlock()
if ok {
userCallback(query)
}
Expand All @@ -428,6 +433,8 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
}

defer db.mu.Unlock()

if db.neverFail.Load() {
return callback(&sqltypes.Result{})
}
Expand Down
163 changes: 90 additions & 73 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/pools"
Expand All @@ -30,7 +31,6 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
Expand All @@ -41,6 +41,8 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const defaultKillTimeout = 5 * time.Second

// DBConn is a db connection for tabletserver.
// It performs automatic reconnects as needed.
// Its Execute function has a timeout that can kill
Expand All @@ -52,14 +54,16 @@ type DBConn struct {
pool *Pool
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current sync2.AtomicString
current atomic.Pointer[string]
timeCreated time.Time
setting string
resetSetting string

// err will be set if a query is killed through a Kill.
errmu sync.Mutex
err error

killTimeout time.Duration
}

// NewDBConn creates a new DBConn. It triggers a CheckMySQL if creation fails.
Expand All @@ -78,6 +82,7 @@ func NewDBConn(ctx context.Context, cp *Pool, appParams dbconfigs.Connector) (*D
info: appParams,
pool: cp,
dbaPool: cp.dbaPool,
killTimeout: defaultKillTimeout,
timeCreated: time.Now(),
stats: cp.env.Stats(),
}, nil
Expand All @@ -95,6 +100,7 @@ func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *d
dbaPool: dbaPool,
pool: nil,
timeCreated: time.Now(),
killTimeout: defaultKillTimeout,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
}
if setting == nil {
Expand Down Expand Up @@ -157,8 +163,8 @@ func (dbc *DBConn) Exec(ctx context.Context, query string, maxrows int, wantfiel
}

func (dbc *DBConn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
dbc.current.Set(query)
defer dbc.current.Set("")
dbc.current.Store(&query)
defer dbc.current.Store(nil)

// Check if the context is already past its deadline before
// trying to execute the query.
Expand All @@ -168,19 +174,33 @@ func (dbc *DBConn) execOnce(ctx context.Context, query string, maxrows int, want
default:
}

defer dbc.stats.MySQLTimings.Record("Exec", time.Now())

done, wg := dbc.setDeadline(ctx)
qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
now := time.Now()
defer dbc.stats.MySQLTimings.Record("Exec", now)

if done != nil {
close(done)
wg.Wait()
type execResult struct {
result *sqltypes.Result
err error
}
if dbcerr := dbc.Err(); dbcerr != nil {
return nil, dbcerr

ch := make(chan execResult)
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
ch <- execResult{result, err}
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return nil, dbc.Err()
case r := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return nil, dbcErr
}
return r.result, r.err
}
return qr, err
}

// ExecOnce executes the specified query, but does not retry on connection errors.
Expand Down Expand Up @@ -260,22 +280,30 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt
}

func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {
defer dbc.stats.MySQLTimings.Record("ExecStream", time.Now())
dbc.current.Store(&query)
defer dbc.current.Store(nil)

dbc.current.Set(query)
defer dbc.current.Set("")
now := time.Now()
defer dbc.stats.MySQLTimings.Record("ExecStream", now)

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
ch := make(chan error)
go func() {
ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
}()

if done != nil {
close(done)
wg.Wait()
}
if dbcerr := dbc.Err(); dbcerr != nil {
return dbcerr
select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return dbc.Err()
case err := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return dbcErr
}
return err
}
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
Expand Down Expand Up @@ -414,6 +442,16 @@ func (dbc *DBConn) Taint() {
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *DBConn) Kill(reason string, elapsed time.Duration) error {
return dbc.KillWithContext(context.Background(), reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *DBConn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}
dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

Expand All @@ -424,25 +462,43 @@ func (dbc *DBConn) Kill(reason string, elapsed time.Duration) error {
dbc.conn.Close()

// Server side action. Kill the session.
killConn, err := dbc.dbaPool.Get(context.TODO())
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill %d", dbc.conn.ID())
_, err = killConn.ExecuteFetch(sql, 10000, false)
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(),
dbc.CurrentForLogging(), err)
return err
go func() {
_, err := killConn.ExecuteFetch(sql, -1, false)
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())

return context.Cause(ctx)
case err := <-ch:
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
return nil
}

// Current returns the currently executing query.
func (dbc *DBConn) Current() string {
return dbc.current.Get()
if q := dbc.current.Load(); q != nil {
return *q
}
return ""
}

// ID returns the connection id.
Expand Down Expand Up @@ -480,45 +536,6 @@ func (dbc *DBConn) reconnect(ctx context.Context) error {
return nil
}

// setDeadline starts a goroutine that will kill the currently executing query
// if the deadline is exceeded. It returns a channel and a waitgroup. After the
// query is done executing, the caller is required to close the done channel
// and wait for the waitgroup to make sure that the necessary cleanup is done.
func (dbc *DBConn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) {
if ctx.Done() == nil {
return nil, nil
}
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
startTime := time.Now()
select {
case <-ctx.Done():
dbc.Kill(ctx.Err().Error(), time.Since(startTime))
case <-done:
return
}
elapsed := time.Since(startTime)

// Give 2x the elapsed time and some buffer as grace period
// for the query to get killed.
tmr2 := time.NewTimer(2*elapsed + 5*time.Second)
defer tmr2.Stop()
select {
case <-tmr2.C:
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())
case <-done:
return
}
<-done
log.Warningf("Hung query returned")
}()
return done, &wg
}

// CurrentForLogging applies transformations to the query making it suitable to log.
// It applies sanitization rules based on tablet settings and limits the max length of
// queries.
Expand Down
Loading
Loading