Skip to content

Commit

Permalink
etcdctl/check: create new check command for memory usage
Browse files Browse the repository at this point in the history
Create a new command similar to check perf that can check the memory
consumption for putting different workloads on a given endpoint. If no endpoint
is provided, localhost will be used. Return user with a message that whether
there are enough memory for a given workload with pass or fail.

Fixed #9121
  • Loading branch information
spzala committed Feb 14, 2018
1 parent c9ebe6c commit d5dd233
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 51 deletions.
78 changes: 27 additions & 51 deletions etcdctl/ctlv3/command/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ import (
"context"
"encoding/binary"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"

v3 "github.com/coreos/etcd/clientv3"
Expand Down Expand Up @@ -258,8 +254,9 @@ func newCheckPerfCommand(cmd *cobra.Command, args []string) {
// NewCheckDatascaleCommand returns the cobra command for "check datascale".
func NewCheckDatascaleCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "datascale",
Short: "Check the availability of enough memory for holding data",
Use: "datascale [options]",
Short: "Check the availability of memory for holding data on a given server endpoint.",
Long: "If no endpoint is provided, localhost will be used. If multiple endpoints are provided, first endpoint will be used.",
Run: newCheckDatascaleCommand,
}

Expand Down Expand Up @@ -292,6 +289,12 @@ func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
clients[i] = cc.mustClient()
}

// get endpoints
eps, errEndpoints := endpointsFromCmd(cmd)
if errEndpoints != nil {
ExitWithError(ExitError, errEndpoints)
}

ctx, cancel := context.WithCancel(context.Background())
resp, err := clients[0].Get(ctx, checkDatascalePrefix, v3.WithPrefix(), v3.WithLimit(1))
cancel()
Expand All @@ -309,21 +312,16 @@ func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup
wg.Add(len(clients))

// get the available system memory before the put operations
systemInfoBefore := &syscall.Sysinfo_t{}
sysErrBefore := syscall.Sysinfo(systemInfoBefore)
if sysErrBefore != nil {
fmt.Println("FAIL: Could not read system memory.")
os.Exit(ExitError)
}
availableFreeMemory := systemInfoBefore.Freeram

// get the process_resident_memory_bytes before the put operations
bytesBefore := processMemoryResidentBytes("127.0.0.1")
// get the process_resident_memory_bytes and process_virtual_memory_bytes before the put operations
bytesBefore, virtualMemoryAvailable := endpointMemoryMetrics(eps[0])
if bytesBefore == 0 {
fmt.Println("FAIL: Could not read process_resident_memory_bytes before the put operations.")
os.Exit(ExitError)
}
if virtualMemoryAvailable == 0 {
fmt.Println("FAIL: Could not read process_virtual_memory_bytes.")
os.Exit(ExitError)
}

fmt.Println(fmt.Sprintf("Start data scale check for work load [%v key-value pairs, %v bytes per key-value, %v concurrent clients].", cfg.limit, cfg.kvSize, cfg.clients))
for i := range clients {
Expand Down Expand Up @@ -351,12 +349,16 @@ func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
s := <-sc

// get the process_resident_memory_bytes after the put operations
bytesAfter := processMemoryResidentBytes("127.0.0.1")
bytesAfter, _ := endpointMemoryMetrics(eps[0])
if bytesAfter == 0 {
fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
os.Exit(ExitError)
}

// delete the created kv pairs
ctx, cancel = context.WithCancel(context.Background())
_, err = clients[0].Delete(ctx, checkDatascalePrefix, v3.WithPrefix())
cancel()
defer cancel()
if err != nil {
ExitWithError(ExitError, err)
}
Expand All @@ -365,8 +367,9 @@ func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.")
os.Exit(ExitError)
}

bytesUsed := bytesAfter - bytesBefore
percentUsed := (bytesUsed * 100) / float64(availableFreeMemory)
percentUsed := (bytesUsed * 100) / float64(virtualMemoryAvailable)

ok = true
if len(s.ErrorDist) != 0 {
Expand All @@ -378,44 +381,17 @@ func newCheckDatascaleCommand(cmd *cobra.Command, args []string) {
}

mbUsed := bytesUsed / (1024 * 1024)
mbTotal := availableFreeMemory / (1024 * 1024 * 1024)
mbTotal := virtualMemoryAvailable / (1024 * 1024 * 1024)

if int(percentUsed) > 60 { // leaves less than 40 percent of memory
ok = false
}

fmt.Println(fmt.Sprintf("INFO: Approximate total system memory used : %v MB out of %v GB.", mbUsed, mbTotal))
fmt.Println(fmt.Sprintf("INFO: Approximate total system memory used : %v MB out of %v GB.", strconv.FormatFloat(float64(mbUsed), 'f', 2, 64), strconv.FormatFloat(float64(mbTotal), 'f', 2, 64)))
if ok {
fmt.Println(fmt.Sprintf("PASS: Memory usage is %v%% of available. Expected less than %v%%.", percentUsed, 60))
fmt.Println(fmt.Sprintf("PASS: Memory usage is %v%% of available. Expected less than %v%%.", strconv.FormatFloat(float64(percentUsed), 'f', 2, 64), 60))
} else {
fmt.Println(fmt.Sprintf("FAIL: Memory usage is %v%% of available. Expected less than %v%%.", percentUsed, 60))
fmt.Println(fmt.Sprintf("FAIL: Memory usage is %v%% of available. Expected less than %v%%.", strconv.FormatFloat(float64(percentUsed), 'f', 2, 64), 60))
os.Exit(ExitError)
}
}

// get the process_resident_memory_bytes from <server:2379>/metrics
func processMemoryResidentBytes(host string) float64 {
var processBytes string
url := "http://" + host + ":2379/metrics"
resp, err := http.Get(url)
if err != nil {
fmt.Println("fetch error: %v", err)
return 0
}
byts, readerr := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if readerr != nil {
fmt.Println("fetch error: reading %s: %v", url, readerr)
return 0
}

for _, line := range strings.Split(string(byts), "\n") {
if strings.HasPrefix(line, `process_resident_memory_bytes`) {
processBytes = strings.TrimSpace(strings.TrimPrefix(line, `process_resident_memory_bytes`))
break
}
}
memBytes, _ := strconv.ParseFloat(processBytes, 64)

return memBytes
}
44 changes: 44 additions & 0 deletions etcdctl/ctlv3/command/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"

pb "github.com/coreos/etcd/mvcc/mvccpb"

Expand Down Expand Up @@ -75,3 +79,43 @@ func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) {
}
return context.WithTimeout(context.Background(), timeOut)
}

// get the resident and virtual memory from <server:2379>/metrics
func endpointMemoryMetrics(host string) (float64, float64) {
var residentMemory string
var virtualMemory string
if !strings.HasPrefix(host, `http://`) {
host = "http://" + host
}
url := host + "/metrics"
resp, err := http.Get(url)
if err != nil {
fmt.Println(fmt.Sprintf("fetch error: %v", err))
return 0.0, 0.0
}
byts, readerr := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if readerr != nil {
fmt.Println(fmt.Sprintf("fetch error: reading %s: %v", url, readerr))
return 0.0, 0.0
}

for _, line := range strings.Split(string(byts), "\n") {
if strings.HasPrefix(line, `process_resident_memory_bytes`) {
residentMemory = strings.TrimSpace(strings.TrimPrefix(line, `process_resident_memory_bytes`))
}
if strings.HasPrefix(line, `process_virtual_memory_bytes`) {
virtualMemory = strings.TrimSpace(strings.TrimPrefix(line, `process_virtual_memory_bytes`))
}
if residentMemory != "" && virtualMemory != "" {
break
}
}
if residentMemory == "" || virtualMemory == "" {
return 0.0, 0.0
}
residentMemoryBytes, _ := strconv.ParseFloat(residentMemory, 64)
virtualMemoryBytes, _ := strconv.ParseFloat(virtualMemory, 64)

return residentMemoryBytes, virtualMemoryBytes
}

0 comments on commit d5dd233

Please sign in to comment.