Skip to content

Commit

Permalink
Merge pull request #2493 from dolthub/fulghum/status_vars
Browse files Browse the repository at this point in the history
Instrument status variables for `Slow_queries`, `Max_used_connections`, `Com_select`, and `Connections`
  • Loading branch information
fulghum authored May 13, 2024
2 parents dc95862 + 5653071 commit 0e38605
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 44 deletions.
4 changes: 4 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ func (e *Engine) QueryWithBindings(ctx *sql.Context, query string, parsed sqlpar
return nil, nil, err
}

if plan.NodeRepresentsSelect(analyzed) {
sql.IncrementStatusVariable(ctx, "Com_select", 1)
}

if bindCtx := binder.BindCtx(); bindCtx != nil {
if unused := bindCtx.UnusedBindings(); len(unused) > 0 {
return nil, nil, fmt.Errorf("invalid arguments. expected: %d, found: %d", len(bindCtx.Bindings)-len(unused), len(bindCtx.Bindings))
Expand Down
24 changes: 24 additions & 0 deletions processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sqle
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -149,7 +150,14 @@ func (pl *ProcessList) EndQuery(ctx *sql.Context) {
pid := ctx.Pid()
delete(pl.byQueryPid, pid)
p := pl.procs[id]

if p != nil && p.QueryPid == pid {
processTime := time.Now().Sub(p.StartedAt)
longQueryTime := getLongQueryTime()
if longQueryTime > 0 && processTime.Seconds() > longQueryTime {
sql.IncrementStatusVariable(ctx, "Slow_queries", 1)
}

sql.IncrementStatusVariable(ctx, "Threads_running", -1)
p.Command = sql.ProcessCommandSleep
p.Query = ""
Expand Down Expand Up @@ -318,3 +326,19 @@ func (pl *ProcessList) Kill(connID uint32) {
p.Kill()
}
}

// getLongQueryTime returns the value of the long_query_time system variable. If any errors are encountered loading
// the value, then an error is logged and 0 is returned.
func getLongQueryTime() float64 {
_, longQueryTimeValue, ok := sql.SystemVariables.GetGlobal("long_query_time")
if !ok {
logrus.Errorf("unable to find long_query_time system variable")
return 0
}
longQueryTime, ok := longQueryTimeValue.(float64)
if !ok {
logrus.Errorf(fmt.Sprintf("unexpected type for value of long_query_time system variable: %T", longQueryTimeValue))
return 0
}
return longQueryTime
}
37 changes: 37 additions & 0 deletions processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -167,3 +168,39 @@ func TestKillConnection(t *testing.T) {
require.True(t, killed[1])
require.False(t, killed[2])
}

// TestSlowQueryTracking tests that processes that take longer than @@long_query_time increment the
// Slow_queries status variable.
func TestSlowQueryTracking(t *testing.T) {
_, value, ok := sql.StatusVariables.GetGlobal("Slow_queries")
require.True(t, ok)
require.Equal(t, uint64(0), value)

p := NewProcessList()
p.AddConnection(1, "127.0.0.1:34567")
sess := sql.NewBaseSessionWithClientServer("0.0.0.0:3306",
sql.Client{Address: "127.0.0.1:34567", User: "foo"}, 1)
sess.SetCurrentDatabase("test_db")
p.ConnectionReady(sess)
ctx := sql.NewContext(context.Background(), sql.WithPid(1), sql.WithSession(sess))
ctx, err := p.BeginQuery(ctx, "SELECT foo")
require.NoError(t, err)

// Change @@long_query_time so we don't have to wait for 10 seconds
require.NoError(t, sql.SystemVariables.SetGlobal("long_query_time", 1))
time.Sleep(1_500 * time.Millisecond)
p.EndQuery(ctx)

// Status variables are updated asynchronously, so try a few times to find the updated value
found := false
for range 10 {
_, value, ok = sql.StatusVariables.GetGlobal("Slow_queries")
require.True(t, ok)
if value == uint64(1) {
found = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, found, "Never found Slow_queries value updated")
}
2 changes: 1 addition & 1 deletion server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *SessionManager) nextPid() uint64 {
return s.lastPid
}

// Add a connection to be tracked by the SessionManager. Should be called as
// AddConn adds a connection to be tracked by the SessionManager. Should be called as
// soon as possible after the server has accepted the connection. Results in
// the connection being tracked by ProcessList and being available through
// KillConnection. The connection will be tracked until RemoveConn is called,
Expand Down
75 changes: 51 additions & 24 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import (
"github.com/dolthub/go-mysql-server/sql/types"
)

var errConnectionNotFound = errors.NewKind("connection not found: %c")

// ErrRowTimeout will be returned if the wait for the row is longer than the connection timeout
var ErrRowTimeout = errors.NewKind("row read wait bigger than connection timeout")

Expand Down Expand Up @@ -84,6 +82,8 @@ func (h *Handler) NewConnection(c *mysql.Conn) {
}

h.sm.AddConn(c)
updateMaxUsedConnectionsStatusVariable()
sql.StatusVariables.IncrementGlobal("Connections", 1)

c.DisableClientMultiStatements = h.disableMultiStmts
logrus.WithField(sql.ConnectionIdLogField, c.ConnectionID).WithField("DisableClientMultiStatements", c.DisableClientMultiStatements).Infof("NewConnection")
Expand Down Expand Up @@ -725,39 +725,66 @@ func (h *Handler) WarningCount(c *mysql.Conn) uint16 {
return 0
}

func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
var err error
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
}
// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if s != nil {
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
}
// getMaxUsedConnections returns the maximum number of connections that have been established at the same time for
// this sql-server, as tracked by the Max_used_connections status variable. If any error is encountered, it will be
// logged and 0 will be returned.
func getMaxUsedConnections() uint64 {
_, maxUsedConnectionsValue, ok := sql.StatusVariables.GetGlobal("Max_used_connections")
if !ok {
logrus.Errorf("unable to find Max_used_connections status variable")
return 0
}
maxUsedConnections, ok := maxUsedConnectionsValue.(uint64)
if !ok {
logrus.Errorf("unexpected type for Max_used_connections status variable: %T", maxUsedConnectionsValue)
return 0
}
return maxUsedConnections
}

return o, nil
// getThreadsConnected returns the current number of connected threads, as tracked by the Threads_connected status
// variable. If any error is encountered, it will be logged and 0 will be returned.
func getThreadsConnected() uint64 {
_, threadsConnectedValue, ok := sql.StatusVariables.GetGlobal("Threads_connected")
if !ok {
logrus.Errorf("unable to find Threads_connected status variable")
return 0
}
threadsConnected, ok := threadsConnectedValue.(uint64)
if !ok {
logrus.Errorf("unexpected type for Threads_connected status variable: %T", threadsConnectedValue)
return 0
}
return threadsConnected
}

// updateMaxUsedConnectionsStatusVariable updates the Max_used_connections status
// variables if the current number of connected threads is greater than the current
// value of Max_used_connections.
func updateMaxUsedConnectionsStatusVariable() {
go func() {
maxUsedConnections := getMaxUsedConnections()
threadsConnected := getThreadsConnected()
if threadsConnected > maxUsedConnections {
sql.StatusVariables.SetGlobal("Max_used_connections", threadsConnected)
// TODO: When Max_used_connections is updated, we should also update
// Max_used_connections_time with the current time, but our status
// variables support currently only supports Uint values.
}
}()
}

func row2ToSQL(s sql.Schema, row sql.Row2) ([]sqltypes.Value, error) {
func rowToSQL(ctx *sql.Context, s sql.Schema, row sql.Row) ([]sqltypes.Value, error) {
o := make([]sqltypes.Value, len(row))
var err error
for i := 0; i < row.Len(); i++ {
v := row.GetField(i)
if v.IsNull() {
for i, v := range row {
if v == nil {
o[i] = sqltypes.NULL
continue
}

// need to make sure the schema is not null as some plan schema is defined as null (e.g. IfElseBlock)
if s != nil {
o[i], err = s[i].Type.(sql.Type2).SQL2(v)
o[i], err = s[i].Type.SQL(ctx, nil, v)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0e38605

Please sign in to comment.