Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite processlist collector #603

Merged
merged 3 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 89 additions & 150 deletions collector/info_schema_processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"database/sql"
"fmt"
"reflect"
"sort"
"strings"

"github.com/go-kit/log"
Expand All @@ -30,16 +32,15 @@ const infoSchemaProcesslistQuery = `
SELECT
user,
SUBSTRING_INDEX(host, ':', 1) AS host,
COALESCE(command,'') AS command,
COALESCE(state,'') AS state,
count(*) AS processes,
sum(time) AS seconds
COALESCE(command, '') AS command,
COALESCE(state, '') AS state,
COUNT(*) AS processes,
SUM(time) AS seconds
FROM information_schema.processlist
WHERE ID != connection_id()
AND TIME >= %d
GROUP BY user,SUBSTRING_INDEX(host, ':', 1),command,state
ORDER BY null
`
GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state
`

// Tunable flags.
var (
Expand All @@ -60,104 +61,23 @@ var (
// Metric descriptors.
var (
processlistCountDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "threads"),
"The number of threads (connections) split by current state.",
[]string{"state"}, nil)
prometheus.BuildFQName(namespace, informationSchema, "processlist_total"),
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
"The number of threads split by current state.",
[]string{"command", "state"}, nil)
processlistTimeDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "threads_seconds"),
"The number of seconds threads (connections) have used split by current state.",
[]string{"state"}, nil)
prometheus.BuildFQName(namespace, informationSchema, "processlist_seconds_total"),
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
"The number of seconds threads have used split by current state.",
[]string{"command", "state"}, nil)
processesByUserDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "processes_by_user"),
prometheus.BuildFQName(namespace, informationSchema, "processlist_by_user"),
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
"The number of processes by user.",
[]string{"mysql_user"}, nil)
processesByHostDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, informationSchema, "processes_by_host"),
prometheus.BuildFQName(namespace, informationSchema, "processlist_by_host"),
"The number of processes by host.",
[]string{"client_host"}, nil)
)

// whitelist for connection/process states in SHOW PROCESSLIST
// tokudb uses the state column for "Queried about _______ rows"
var (
// TODO: might need some more keys for other MySQL versions or other storage engines
// see https://dev.mysql.com/doc/refman/5.7/en/general-thread-states.html
threadStateCounterMap = map[string]uint32{
"after create": uint32(0),
"altering table": uint32(0),
"analyzing": uint32(0),
"checking permissions": uint32(0),
"checking table": uint32(0),
"cleaning up": uint32(0),
"closing tables": uint32(0),
"converting heap to myisam": uint32(0),
"copying to tmp table": uint32(0),
"creating sort index": uint32(0),
"creating table": uint32(0),
"creating tmp table": uint32(0),
"deleting": uint32(0),
"executing": uint32(0),
"execution of init_command": uint32(0),
"end": uint32(0),
"freeing items": uint32(0),
"flushing tables": uint32(0),
"fulltext initialization": uint32(0),
"idle": uint32(0),
"init": uint32(0),
"killed": uint32(0),
"waiting for lock": uint32(0),
"logging slow query": uint32(0),
"login": uint32(0),
"manage keys": uint32(0),
"opening tables": uint32(0),
"optimizing": uint32(0),
"preparing": uint32(0),
"reading from net": uint32(0),
"removing duplicates": uint32(0),
"removing tmp table": uint32(0),
"reopen tables": uint32(0),
"repair by sorting": uint32(0),
"repair done": uint32(0),
"repair with keycache": uint32(0),
"replication master": uint32(0),
"rolling back": uint32(0),
"searching rows for update": uint32(0),
"sending data": uint32(0),
"sorting for group": uint32(0),
"sorting for order": uint32(0),
"sorting index": uint32(0),
"sorting result": uint32(0),
"statistics": uint32(0),
"updating": uint32(0),
"waiting for tables": uint32(0),
"waiting for table flush": uint32(0),
"waiting on cond": uint32(0),
"writing to net": uint32(0),
"other": uint32(0),
}
threadStateMapping = map[string]string{
"user sleep": "idle",
"creating index": "altering table",
"committing alter table to storage engine": "altering table",
"discard or import tablespace": "altering table",
"rename": "altering table",
"setup": "altering table",
"renaming result table": "altering table",
"preparing for alter table": "altering table",
"copying to group table": "copying to tmp table",
"copy to tmp table": "copying to tmp table",
"query end": "end",
"update": "updating",
"updating main table": "updating",
"updating reference tables": "updating",
"system lock": "waiting for lock",
"user lock": "waiting for lock",
"table lock": "waiting for lock",
"deleting from main table": "deleting",
"deleting from reference tables": "deleting",
}
)

// ScrapeProcesslist collects from `information_schema.processlist`.
type ScrapeProcesslist struct{}

Expand Down Expand Up @@ -189,83 +109,102 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
defer processlistRows.Close()

var (
user string
host string
command string
state string
processes uint32
time uint32
user string
host string
command string
state string
count uint32
time uint32
)
stateCounts := make(map[string]uint32, len(threadStateCounterMap))
stateTime := make(map[string]uint32, len(threadStateCounterMap))
hostCount := make(map[string]uint32)
userCount := make(map[string]uint32)
for k, v := range threadStateCounterMap {
stateCounts[k] = v
stateTime[k] = v
}
// Define maps
stateCounts := make(map[string]map[string]uint32)
stateTime := make(map[string]map[string]uint32)
stateHostCounts := make(map[string]uint32)
stateUserCounts := make(map[string]uint32)

for processlistRows.Next() {
err = processlistRows.Scan(&user, &host, &command, &state, &processes, &time)
err = processlistRows.Scan(&user, &host, &command, &state, &count, &time)
if err != nil {
return err
}
realState := deriveThreadState(command, state)
stateCounts[realState] += processes
stateTime[realState] += time
hostCount[host] = hostCount[host] + processes
userCount[user] = userCount[user] + processes
}
command = sanitizeState(command)
state = sanitizeState(state)
if host == "" {
host = "blank"
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
}

if *processesByHostFlag {
for host, processes := range hostCount {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(processes), host)
// Init maps
if _, ok := stateCounts[command]; !ok {
stateCounts[command] = make(map[string]uint32)
stateTime[command] = make(map[string]uint32)
}
if _, ok := stateCounts[command][state]; !ok {
stateCounts[command][state] = 0
stateTime[command][state] = 0
}
if _, ok := stateHostCounts[host]; !ok {
stateHostCounts[host] = 0
}
if _, ok := stateUserCounts[user]; !ok {
stateUserCounts[user] = 0
}

stateCounts[command][state] += count
stateTime[command][state] += time
stateHostCounts[host] += count
stateUserCounts[user] += count
}

if *processesByUserFlag {
for user, processes := range userCount {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(processes), user)
for _, command := range sortedMapKeys(stateCounts) {
for _, state := range sortedMapKeys(stateCounts[command]) {
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(stateCounts[command][state]), command, state)
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(stateTime[command][state]), command, state)
}
}

for state, processes := range stateCounts {
ch <- prometheus.MustNewConstMetric(processlistCountDesc, prometheus.GaugeValue, float64(processes), state)
if *processesByHostFlag {
for _, host := range sortedMapKeys(stateHostCounts) {
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
}
}
for state, time := range stateTime {
ch <- prometheus.MustNewConstMetric(processlistTimeDesc, prometheus.GaugeValue, float64(time), state)
if *processesByUserFlag {
for _, user := range sortedMapKeys(stateUserCounts) {
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
}
}

return nil
}

func deriveThreadState(command string, state string) string {
var normCmd = strings.Replace(strings.ToLower(command), "_", " ", -1)
var normState = strings.Replace(strings.ToLower(state), "_", " ", -1)
// check if it's already a valid state
_, knownState := threadStateCounterMap[normState]
if knownState {
return normState
}
// check if plain mapping applies
mappedState, canMap := threadStateMapping[normState]
if canMap {
return mappedState
}
// check special waiting for XYZ lock
if strings.Contains(normState, "waiting for") && strings.Contains(normState, "lock") {
return "waiting for lock"
func sortedMapKeys(m interface{}) []string {
v := reflect.ValueOf(m)
keys := make([]string, 0, len(v.MapKeys()))
for _, key := range v.MapKeys() {
keys = append(keys, key.String())
}
if normCmd == "sleep" && normState == "" {
return "idle"
sort.Strings(keys)
return keys
}

func sanitizeState(state string) string {
if state == "" {
state = "blank"
}
if normCmd == "query" {
return "executing"
state = strings.ToLower(state)
replacements := map[string]string{
";": "",
",": "",
":": "",
".": "",
"(": "",
")": "",
" ": "_",
"-": "_",
}
if normCmd == "binlog dump" {
return "replication master"
for r := range replacements {
state = strings.Replace(state, r, replacements[r], -1)
}
return "other"
return state
}

// check interface
Expand Down
77 changes: 77 additions & 0 deletions collector/info_schema_processlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2018 The Prometheus Authors
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
)

func TestScrapeProcesslist(t *testing.T) {
roman-vynar marked this conversation as resolved.
Show resolved Hide resolved
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

query := fmt.Sprintf(infoSchemaProcesslistQuery, 0)
columns := []string{"user", "host", "command", "state", "processes", "seconds"}
rows := sqlmock.NewRows(columns).
AddRow("manager", "10.0.7.234", "Sleep", "", 10, 87).
AddRow("foobar", "10.0.7.154", "Sleep", "", 8, 842).
AddRow("root", "10.0.7.253", "Sleep", "", 1, 20).
AddRow("feedback", "10.0.7.179", "Sleep", "", 2, 14).
AddRow("system user", "", "Connect", "waiting for handler commit", 1, 7271248).
AddRow("message", "10.0.7.234", "Sleep", "", 4, 62).
AddRow("system user", "", "Query", "Slave has read all relay log; waiting for more updates", 1, 7271248).
AddRow("event_scheduler", "localhost", "Daemon", "Waiting on empty queue", 1, 7271248)
mock.ExpectQuery(sanitizeQuery(query)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

expected := []MetricResult{
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "connect", "state": "waiting_for_handler_commit"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "daemon", "state": "waiting_on_empty_queue"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "query", "state": "slave_has_read_all_relay_log_waiting_for_more_updates"}, value: 7271248, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "sleep", "state": "blank"}, value: 25, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"command": "sleep", "state": "blank"}, value: 1025, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
got := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, got)
}
})

// Ensure all SQL queries were executed
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}