Skip to content

Commit

Permalink
Merge pull request #7520 from planetscale/more-metrics
Browse files Browse the repository at this point in the history
Add innodb_read_rows as vttablet metric
  • Loading branch information
shlomi-noach authored Feb 22, 2021
2 parents ad933b5 + 891ec0a commit 25baa46
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 9 deletions.
26 changes: 19 additions & 7 deletions go/vt/vttablet/tabletserver/query_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/streamlog"
Expand Down Expand Up @@ -114,12 +115,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) {
}
}

func TestGetMessageStreamPlan(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
db.AddQuery(query, result)
}
func addSchemaEngineQueries(db *fakesqldb.DB) {
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{
Fields: mysql.BaseShowTablesFields,
Rows: [][]sqltypes.Value{
Expand All @@ -129,6 +125,22 @@ func TestGetMessageStreamPlan(t *testing.T) {
mysql.BaseShowTablesRow("seq", false, "vitess_sequence"),
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
}})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))
}

func TestGetMessageStreamPlan(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
db.AddQuery(query, result)
}

addSchemaEngineQueries(db)

qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db))
qe.se.Open()
qe.Open()
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,11 @@ func initQueryExecutorTestDB(db *fakesqldb.DB) {
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
},
})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))
}

func getTestTableFields() []*querypb.Field {
Expand Down
28 changes: 26 additions & 2 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Engine struct {

tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
innoDbReadRowsGauge *stats.Gauge
}

// NewEngine creates a new Engine.
Expand All @@ -99,6 +100,7 @@ func NewEngine(env tabletenv.Env) *Engine {
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table")
se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table")
se.innoDbReadRowsGauge = env.Exporter().NewGauge("InnodbRowsRead", "number of rows read by mysql")

env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema)
env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -290,9 +292,7 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error {

// reload reloads the schema. It can also be used to initialize it.
func (se *Engine) reload(ctx context.Context) error {
//start := time.Now()
defer func() {
//log.Infof("Time taken to load the schema: %v", time.Since(start))
se.env.LogError()
}()

Expand All @@ -316,6 +316,11 @@ func (se *Engine) reload(ctx context.Context) error {
return err
}

err = se.updateInnoDBRowsRead(ctx, conn)
if err != nil {
return err
}

rec := concurrency.AllErrorRecorder{}
// curTables keeps track of tables in the new snapshot so we can detect what was dropped.
curTables := map[string]bool{"dual": true}
Expand Down Expand Up @@ -388,6 +393,25 @@ func (se *Engine) reload(ctx context.Context) error {
return nil
}

func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBConn) error {
readRowsData, err := conn.Exec(ctx, "show status like 'Innodb_rows_read'", 10, false)
if err != nil {
return err
}

if len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2 {
value, err := evalengine.ToInt64(readRowsData.Rows[0][1])
if err != nil {
return err
}

se.innoDbReadRowsGauge.Set(value)
} else {
log.Warningf("got strange results from 'show status': %v", readRowsData.Rows)
}
return nil
}

func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, error) {
// Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so.
tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false)
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package schema

import (
"expvar"
"fmt"
"net/http"
"net/http/httptest"
"sort"
Expand Down Expand Up @@ -80,6 +81,8 @@ func TestOpenAndReload(t *testing.T) {
"int64"),
"1427325876",
))
firstReadRowsValue := 12
AddFakeInnoDBReadRowsResult(db, firstReadRowsValue)
se := newEngine(10, 10*time.Second, 10*time.Second, db)
se.Open()
defer se.Close()
Expand All @@ -93,6 +96,7 @@ func TestOpenAndReload(t *testing.T) {
"int64"),
"1427325877",
))
assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Get())
// Modify test_table_03
// Add test_table_04
// Drop msg
Expand Down Expand Up @@ -144,6 +148,8 @@ func TestOpenAndReload(t *testing.T) {
mysql.ShowPrimaryRow("seq", "id"),
},
})
secondReadRowsValue := 123
AddFakeInnoDBReadRowsResult(db, secondReadRowsValue)

firstTime := true
notifier := func(full map[string]*Table, created, altered, dropped []string) {
Expand All @@ -164,6 +170,8 @@ func TestOpenAndReload(t *testing.T) {
err := se.Reload(context.Background())
require.NoError(t, err)

assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Get())

want["test_table_03"] = &Table{
Name: sqlparser.NewTableIdent("test_table_03"),
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -338,6 +346,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) {
{sqltypes.NewVarBinary("")},
},
})
AddFakeInnoDBReadRowsResult(db, 0)
se := newEngine(10, 1*time.Second, 1*time.Second, db)
err := se.Open()
want := "Row count exceeded"
Expand Down Expand Up @@ -492,3 +501,11 @@ func initialSchema() map[string]*Table {
},
}
}

func AddFakeInnoDBReadRowsResult(db *fakesqldb.DB, value int) *fakesqldb.ExpectedResult {
return db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
fmt.Sprintf("Innodb_rows_read|%d", value),
))
}
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/schema/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) {
))
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{})
db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{})
AddFakeInnoDBReadRowsResult(db, 1)
se := newEngine(10, 10*time.Second, 10*time.Second, db)
require.NoError(t, se.Open())
cancel := func() {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,11 @@ func setupFakeDB(t *testing.T) *fakesqldb.DB {
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
},
})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))

return db
}
Expand Down

0 comments on commit 25baa46

Please sign in to comment.