Skip to content

Commit

Permalink
tools/etcd-dump-metrics: initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Jul 21, 2018
1 parent fa1533d commit dc25387
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ clean:
rm -rf ./gopath.proto
rm -rf ./release
rm -f ./snapshot/localhost:*
rm -f ./tools/etcd-dump-metrics/localhost:*
rm -f ./integration/127.0.0.1:* ./integration/localhost:*
rm -f ./clientv3/integration/127.0.0.1:* ./clientv3/integration/localhost:*
rm -f ./clientv3/ordering/127.0.0.1:* ./clientv3/ordering/localhost:*
Expand Down
18 changes: 18 additions & 0 deletions tools/etcd-dump-metrics/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

go install -v ./tools/etcd-dump-metrics

# for latest master branch
etcd-dump-metrics > docs/metrics-latest

# download etcd v3.3 to ./bin
goreman start
etcd-dump-metrics -addr http://localhost:2379/metrics > docs/metrics-v3.3

# download etcd v3.2 to ./bin
goreman start
etcd-dump-metrics -addr http://localhost:2379/metrics > docs/metrics-v3.2

# download etcd v3.1 to ./bin
goreman start
etcd-dump-metrics -addr http://localhost:2379/metrics > docs/metrics-v3.1

314 changes: 314 additions & 0 deletions tools/etcd-dump-metrics/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/pkg/transport"

"go.uber.org/zap"
)

var lg *zap.Logger

func init() {
var err error
lg, err = zap.NewProduction()
if err != nil {
panic(err)
}
}

func main() {
addr := flag.String("addr", "", "etcd metrics URL to fetch from (empty to use current git branch)")
enableLog := flag.Bool("server-log", false, "true to enable embedded etcd server logs")
debug := flag.Bool("debug", false, "true to enable debug logging")
flag.Parse()

if *debug {
lg = zap.NewExample()
}

ep := *addr
if ep == "" {
uss := newEmbedURLs(4)
ep = uss[0].String() + "/metrics"

cfgs := []*embed.Config{embed.NewConfig(), embed.NewConfig()}
cfgs[0].Name, cfgs[1].Name = "0", "1"
setupEmbedCfg(cfgs[0], *enableLog, []url.URL{uss[0]}, []url.URL{uss[1]}, []url.URL{uss[1], uss[3]})
setupEmbedCfg(cfgs[1], *enableLog, []url.URL{uss[2]}, []url.URL{uss[3]}, []url.URL{uss[1], uss[3]})
type embedAndError struct {
ec *embed.Etcd
err error
}
ech := make(chan embedAndError)
for _, cfg := range cfgs {
go func(c *embed.Config) {
e, err := embed.StartEtcd(c)
if err != nil {
ech <- embedAndError{err: err}
return
}
<-e.Server.ReadyNotify()
ech <- embedAndError{ec: e}
}(cfg)
}
for range cfgs {
ev := <-ech
if ev.err != nil {
lg.Panic("failed to start embedded etcd", zap.Error(ev.err))
}
defer ev.ec.Close()
}
lg.Debug("started 2-node embedded etcd cluster")
}

lg.Debug("starting etcd-dump-metrics", zap.String("endpoint", ep))

// send client requests to populate gRPC client-side metrics
// TODO: enable default metrics initialization in v3.1 and v3.2
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{strings.Replace(ep, "/metrics", "", 1)}})
if err != nil {
lg.Panic("failed to create client", zap.Error(err))
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = cli.Put(ctx, "____test", "")
if err != nil {
lg.Panic("failed to write test key", zap.Error(err))
}
_, err = cli.Get(ctx, "____test")
if err != nil {
lg.Panic("failed to read test key", zap.Error(err))
}
_, err = cli.Delete(ctx, "____test")
if err != nil {
lg.Panic("failed to delete test key", zap.Error(err))
}

fmt.Println(getMetrics(ep))
}

func getMetrics(ep string) (m metricSlice) {
lines, err := fetchMetrics(ep)
if err != nil {
lg.Panic("failed to fetch metrics", zap.Error(err))
}
mss := parse(lines)
sort.Sort(metricSlice(mss))
return mss
}

func (mss metricSlice) String() (s string) {
ver := "unknown"
for i, v := range mss {
if strings.HasPrefix(v.name, "etcd_server_version") {
ver = v.metrics[0]
}
s += v.String()
if i != len(mss)-1 {
s += "\n\n"
}
}
return "# server version: " + ver + "\n\n" + s
}

type metricSlice []metric

func (mss metricSlice) Len() int {
return len(mss)
}

func (mss metricSlice) Less(i, j int) bool {
return mss[i].name < mss[j].name
}

func (mss metricSlice) Swap(i, j int) {
mss[i], mss[j] = mss[j], mss[i]
}

type metric struct {
// raw data for debugging purposes
raw []string

// metrics name
name string

// metrics description
desc string

// metrics type
tp string

// aggregates of "grpc_server_handled_total"
grpcCodes []string

// keep fist 1 and last 4 if histogram or summary
// otherwise, keep only 1
metrics []string
}

func (m metric) String() (s string) {
s += fmt.Sprintf("# name: %q\n", m.name)
s += fmt.Sprintf("# description: %q\n", m.desc)
s += fmt.Sprintf("# type: %q\n", m.tp)
if len(m.grpcCodes) > 0 {
s += "# gRPC codes: \n"
for _, c := range m.grpcCodes {
s += fmt.Sprintf("# - %q\n", c)
}
}
s += strings.Join(m.metrics, "\n")
return s
}

func parse(lines []string) (mss []metric) {
m := metric{raw: make([]string, 0), metrics: make([]string, 0)}
for _, line := range lines {
if strings.HasPrefix(line, "# HELP ") {
// add previous metric and initialize
if m.name != "" {
mss = append(mss, m)
}
m = metric{raw: make([]string, 0), metrics: make([]string, 0)}

m.raw = append(m.raw, line)
ss := strings.Split(strings.Replace(line, "# HELP ", "", 1), " ")
m.name, m.desc = ss[0], strings.Join(ss[1:], " ")
continue
}

if strings.HasPrefix(line, "# TYPE ") {
m.raw = append(m.raw, line)
m.tp = strings.Split(strings.Replace(line, "# TYPE "+m.tp, "", 1), " ")[1]
continue
}

m.raw = append(m.raw, line)
m.metrics = append(m.metrics, strings.Split(line, " ")[0])
}
if m.name != "" {
mss = append(mss, m)
}

// aggregate
for i := range mss {
if mss[i].tp == "histogram" || mss[i].tp == "summary" {
n := len(mss[i].metrics)
mss[i].metrics = []string{
mss[i].metrics[0], // first bucket
mss[i].metrics[n-4], // second bucket
mss[i].metrics[n-3], // last bucket
mss[i].metrics[n-2], // sum
mss[i].metrics[n-1], // count
}
}

if mss[i].name == "grpc_server_handled_total" {
pfx := `grpc_server_handled_total{grpc_code="`
codes, metrics := make(map[string]struct{}), make(map[string]struct{})
for _, v := range mss[i].metrics {
v2 := strings.Replace(v, pfx, "", 1)
idx := strings.Index(v2, `",grpc_method="`)
code := v2[:idx]
v2 = v2[idx:]
codes[code] = struct{}{}
v2 = pfx + "CODE" + v2
metrics[v2] = struct{}{}
}
cs := make([]string, 0, len(codes))
for k := range codes {
cs = append(cs, k)
}
sort.Strings(cs)
ms := make([]string, 0, len(metrics))
for k := range metrics {
ms = append(ms, k)
}
sort.Strings(ms)
mss[i].grpcCodes = cs
mss[i].metrics = ms
}
}
return mss
}

func newEmbedURLs(n int) (urls []url.URL) {
urls = make([]url.URL, n)
for i := 0; i < n; i++ {
u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i))
urls[i] = *u
}
return urls
}

func setupEmbedCfg(cfg *embed.Config, enableLog bool, curls, purls, ics []url.URL) {
cfg.Logger = "zap"
cfg.LogOutputs = []string{"/dev/null"}
if enableLog {
cfg.LogOutputs = []string{"stderr"}
}
cfg.Debug = false

var err error
cfg.Dir, err = ioutil.TempDir(os.TempDir(), fmt.Sprintf("%016X", time.Now().UnixNano()))
if err != nil {
panic(err)
}
os.RemoveAll(cfg.Dir)

cfg.ClusterState = "new"
cfg.LCUrls, cfg.ACUrls = curls, curls
cfg.LPUrls, cfg.APUrls = purls, purls

cfg.InitialCluster = ""
for i := range ics {
cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, ics[i].String())
}
cfg.InitialCluster = cfg.InitialCluster[1:]
}

func fetchMetrics(ep string) (lines []string, err error) {
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, time.Second, time.Second, time.Second)
if err != nil {
return nil, err
}
cli := &http.Client{Transport: tr}
resp, err := cli.Get(ep)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, rerr := ioutil.ReadAll(resp.Body)
if rerr != nil {
return nil, rerr
}
lines = strings.Split(string(b), "\n")
return lines, nil
}

0 comments on commit dc25387

Please sign in to comment.