Skip to content

Commit

Permalink
Merge pull request #9185 from spzala/checkdatabase
Browse files Browse the repository at this point in the history
etcdctl/check: create new check command for memory usage
  • Loading branch information
gyuho authored Feb 20, 2018
2 parents 0b5c660 + 53d2a2e commit e19df69
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 0 deletions.
161 changes: 161 additions & 0 deletions etcdctl/ctlv3/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"math/rand"
"os"
"strconv"
"sync"
"time"

Expand All @@ -37,6 +38,8 @@ var (
checkPerfPrefix string
checkPerfAutoCompact bool
checkPerfAutoDefrag bool
checkDatascaleLoad string
checkDatascalePrefix string
)

type checkPerfCfg struct {
Expand Down Expand Up @@ -69,6 +72,36 @@ var checkPerfCfgMap = map[string]checkPerfCfg{
},
}

type checkDatascaleCfg struct {
limit int
kvSize int
clients int
}

var checkDatascaleCfgMap = map[string]checkDatascaleCfg{
"s": {
limit: 10000,
kvSize: 1024,
clients: 50,
},
"m": {
limit: 100000,
kvSize: 1024,
clients: 200,
},
"l": {
limit: 1000000,
kvSize: 1024,
clients: 500,
},
"xl": {
// xl tries to hit the upper bound aggressively which is 3 versions of 1M objects (3M in total)
limit: 30000000,
kvSize: 1024,
clients: 1000,
},
}

// NewCheckCommand returns the cobra command for "check".
func NewCheckCommand() *cobra.Command {
cc := &cobra.Command{
Expand All @@ -77,6 +110,7 @@ func NewCheckCommand() *cobra.Command {
}

cc.AddCommand(NewCheckPerfCommand())
cc.AddCommand(NewCheckDatascaleCommand())

return cc
}
Expand Down Expand Up @@ -252,3 +286,130 @@ func defrag(c *v3.Client, ep string) {
}
fmt.Printf("Defragmented %q\n", ep)
}

// NewCheckDatascaleCommand returns the cobra command for "check datascale".
func NewCheckDatascaleCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "datascale [options]",
Short: "Check the memory usage of holding data for diferent workloads on a given server endpoint.",
Long: "If no endpoint is provided, localhost will be used. If multiple endpoints are provided, first endpoint will be used.",
Run: newCheckDatascaleCommand,
}

cmd.Flags().StringVar(&checkDatascaleLoad, "load", "s", "The datascale check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)")
cmd.Flags().StringVar(&checkDatascalePrefix, "prefix", "/etcdctl-check-datascale/", "The prefix for writing the datascale check's keys.")

return cmd
}

// newCheckDatascaleCommand executes the "check datascale" command.
func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
var checkDatascaleAlias = map[string]string{
"s": "s", "small": "s",
"m": "m", "medium": "m",
"l": "l", "large": "l",
"xl": "xl", "xLarge": "xl",
}

model, ok := checkDatascaleAlias[checkDatascaleLoad]
if !ok {
ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkDatascaleLoad))
}
cfg := checkDatascaleCfgMap[model]

requests := make(chan v3.Op, cfg.clients)

cc := clientConfigFromCmd(cmd)
clients := make([]*v3.Client, cfg.clients)
for i := 0; i < cfg.clients; i++ {
clients[i] = cc.mustClient()
}

// get endpoints
eps, errEndpoints := endpointsFromCmd(cmd)
if errEndpoints != nil {
ExitWithError(ExitError, errEndpoints)
}

ctx, cancel := context.WithCancel(context.Background())
resp, err := clients[0].Get(ctx, checkDatascalePrefix, v3.WithPrefix(), v3.WithLimit(1))
cancel()
if err != nil {
ExitWithError(ExitError, err)
}
if len(resp.Kvs) > 0 {
ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkDatascalePrefix, checkDatascalePrefix))
}

ksize, vsize := 512, 512
k, v := make([]byte, ksize), string(make([]byte, vsize))

r := report.NewReport("%4.4f")
var wg sync.WaitGroup
wg.Add(len(clients))

// get the process_resident_memory_bytes and process_virtual_memory_bytes before the put operations
bytesBefore := endpointMemoryMetrics(eps[0])
if bytesBefore == 0 {
fmt.Println("FAIL: Could not read process_resident_memory_bytes before the put operations.")
os.Exit(ExitError)
}

fmt.Println(fmt.Sprintf("Start data scale check for work load [%v key-value pairs, %v bytes per key-value, %v concurrent clients].", cfg.limit, cfg.kvSize, cfg.clients))
for i := range clients {
go func(c *v3.Client) {
defer wg.Done()
for op := range requests {
st := time.Now()
_, derr := c.Do(context.Background(), op)
r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()}
}
}(clients[i])
}

go func() {
for i := 0; i < cfg.limit; i++ {
binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64)))
requests <- v3.OpPut(checkDatascalePrefix+string(k), v)
}
close(requests)
}()

sc := r.Stats()
wg.Wait()
close(r.Results())
s := <-sc

// get the process_resident_memory_bytes after the put operations
bytesAfter := endpointMemoryMetrics(eps[0])
if bytesAfter == 0 {
fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
os.Exit(ExitError)
}

// delete the created kv pairs
ctx, cancel = context.WithCancel(context.Background())
_, err = clients[0].Delete(ctx, checkDatascalePrefix, v3.WithPrefix())
defer cancel()
if err != nil {
ExitWithError(ExitError, err)
}

if bytesAfter == 0 {
fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
os.Exit(ExitError)
}

bytesUsed := bytesAfter - bytesBefore
mbUsed := bytesUsed / (1024 * 1024)

if len(s.ErrorDist) != 0 {
fmt.Println("FAIL: too many errors")
for k, v := range s.ErrorDist {
fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v)
}
os.Exit(ExitError)
} else {
fmt.Println(fmt.Sprintf("PASS: Approximate system memory used : %v MB.", strconv.FormatFloat(float64(mbUsed), 'f', 2, 64)))
}
}
43 changes: 43 additions & 0 deletions etcdctl/ctlv3/command/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"

pb "github.com/coreos/etcd/internal/mvcc/mvccpb"

Expand Down Expand Up @@ -75,3 +79,42 @@ func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
}
return context.WithTimeout(context.Background(), timeOut)
}

// get the process_resident_memory_bytes from <server:2379>/metrics
func endpointMemoryMetrics(host string) float64 {
residentMemoryKey := "process_resident_memory_bytes"
var residentMemoryValue string
if !strings.HasPrefix(host, `http://`) {
host = "http://" + host
}
url := host + "/metrics"
resp, err := http.Get(url)
if err != nil {
fmt.Println(fmt.Sprintf("fetch error: %v", err))
return 0.0
}
byts, readerr := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if readerr != nil {
fmt.Println(fmt.Sprintf("fetch error: reading %s: %v", url, readerr))
return 0.0
}

for _, line := range strings.Split(string(byts), "\n") {
if strings.HasPrefix(line, residentMemoryKey) {
residentMemoryValue = strings.TrimSpace(strings.TrimPrefix(line, residentMemoryKey))
break
}
}
if residentMemoryValue == "" {
fmt.Println(fmt.Sprintf("could not find: %v", residentMemoryKey))
return 0.0
}
residentMemoryBytes, parseErr := strconv.ParseFloat(residentMemoryValue, 64)
if parseErr != nil {
fmt.Println(fmt.Sprintf("parse error: %v", parseErr))
return 0.0
}

return residentMemoryBytes
}

0 comments on commit e19df69

Please sign in to comment.