Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add innodb_read_rows as vttablet metric #7520

Merged
merged 4 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions go/vt/vttablet/tabletserver/query_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/streamlog"
Expand Down Expand Up @@ -114,12 +115,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) {
}
}

func TestGetMessageStreamPlan(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
db.AddQuery(query, result)
}
func addSchemaEngineQueries(db *fakesqldb.DB) {
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{
Fields: mysql.BaseShowTablesFields,
Rows: [][]sqltypes.Value{
Expand All @@ -129,6 +125,22 @@ func TestGetMessageStreamPlan(t *testing.T) {
mysql.BaseShowTablesRow("seq", false, "vitess_sequence"),
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
}})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))
}

func TestGetMessageStreamPlan(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
db.AddQuery(query, result)
}

addSchemaEngineQueries(db)

qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db))
qe.se.Open()
qe.Open()
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,11 @@ func initQueryExecutorTestDB(db *fakesqldb.DB) {
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
},
})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))
}

func getTestTableFields() []*querypb.Field {
Expand Down
26 changes: 24 additions & 2 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Engine struct {

tableFileSizeGauge *stats.GaugesWithSingleLabel
tableAllocatedSizeGauge *stats.GaugesWithSingleLabel
innoDbReadRowsGauge *stats.GaugesWithSingleLabel
}

// NewEngine creates a new Engine.
Expand All @@ -99,6 +100,7 @@ func NewEngine(env tabletenv.Env) *Engine {
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table")
se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table")
se.innoDbReadRowsGauge = env.Exporter().NewGaugesWithSingleLabel("InnodbRowsRead", "number of rows read by mysql", "Database")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a label? Why can't we simply use NewGauge?


env.Exporter().HandleFunc("/debug/schema", se.handleDebugSchema)
env.Exporter().HandleFunc("/schemaz", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -290,9 +292,7 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error {

// reload reloads the schema. It can also be used to initialize it.
func (se *Engine) reload(ctx context.Context) error {
//start := time.Now()
defer func() {
//log.Infof("Time taken to load the schema: %v", time.Since(start))
se.env.LogError()
}()

Expand All @@ -316,6 +316,11 @@ func (se *Engine) reload(ctx context.Context) error {
return err
}

err = se.updateInnoDBRowsRead(ctx, conn)
if err != nil {
return err
}

rec := concurrency.AllErrorRecorder{}
// curTables keeps track of tables in the new snapshot so we can detect what was dropped.
curTables := map[string]bool{"dual": true}
Expand Down Expand Up @@ -388,6 +393,23 @@ func (se *Engine) reload(ctx context.Context) error {
return nil
}

func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.DBConn) error {
readRowsData, err := conn.Exec(ctx, "show status like 'Innodb_rows_read'", 10, false)
if err != nil {
return err
}

if len(readRowsData.Rows) == 1 && len(readRowsData.Rows[0]) == 2 {
value, err := evalengine.ToInt64(readRowsData.Rows[0][1])
if err != nil {
return err
}

se.innoDbReadRowsGauge.Set("read_rows", value)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log a message or return an error if the condition is not met.

return nil
}

func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, error) {
// Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so.
tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false)
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package schema

import (
"expvar"
"fmt"
"net/http"
"net/http/httptest"
"sort"
Expand Down Expand Up @@ -80,6 +81,8 @@ func TestOpenAndReload(t *testing.T) {
"int64"),
"1427325876",
))
firstReadRowsValue := 12
AddFakeInnoDBReadRowsResult(db, firstReadRowsValue)
se := newEngine(10, 10*time.Second, 10*time.Second, db)
se.Open()
defer se.Close()
Expand All @@ -93,6 +96,8 @@ func TestOpenAndReload(t *testing.T) {
"int64"),
"1427325877",
))
assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows")
assert.EqualValues(t, firstReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"])
// Modify test_table_03
// Add test_table_04
// Drop msg
Expand Down Expand Up @@ -144,6 +149,8 @@ func TestOpenAndReload(t *testing.T) {
mysql.ShowPrimaryRow("seq", "id"),
},
})
secondReadRowsValue := 123
AddFakeInnoDBReadRowsResult(db, secondReadRowsValue)

firstTime := true
notifier := func(full map[string]*Table, created, altered, dropped []string) {
Expand All @@ -164,6 +171,9 @@ func TestOpenAndReload(t *testing.T) {
err := se.Reload(context.Background())
require.NoError(t, err)

assert.Contains(t, se.innoDbReadRowsGauge.Counts(), "read_rows")
assert.EqualValues(t, secondReadRowsValue, se.innoDbReadRowsGauge.Counts()["read_rows"])

want["test_table_03"] = &Table{
Name: sqlparser.NewTableIdent("test_table_03"),
Fields: []*querypb.Field{{
Expand Down Expand Up @@ -338,6 +348,7 @@ func TestOpenFailedDueToTableErr(t *testing.T) {
{sqltypes.NewVarBinary("")},
},
})
AddFakeInnoDBReadRowsResult(db, 0)
se := newEngine(10, 1*time.Second, 1*time.Second, db)
err := se.Open()
want := "Row count exceeded"
Expand Down Expand Up @@ -492,3 +503,11 @@ func initialSchema() map[string]*Table {
},
}
}

func AddFakeInnoDBReadRowsResult(db *fakesqldb.DB, value int) *fakesqldb.ExpectedResult {
return db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
fmt.Sprintf("Innodb_rows_read|%d", value),
))
}
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/schema/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) {
))
db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{})
db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{})
AddFakeInnoDBReadRowsResult(db, 1)
se := newEngine(10, 10*time.Second, 10*time.Second, db)
require.NoError(t, se.Open())
cancel := func() {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,11 @@ func setupFakeDB(t *testing.T) *fakesqldb.DB {
mysql.BaseShowTablesRow("msg", false, "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30"),
},
})
db.AddQuery("show status like 'Innodb_rows_read'", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"Variable_name|Value",
"varchar|int64"),
"Innodb_rows_read|0",
))

return db
}
Expand Down