diff --git a/etcdctl/ctlv3/command/check.go b/etcdctl/ctlv3/command/check.go index 5e10a48d31f..c83cda9bbf3 100644 --- a/etcdctl/ctlv3/command/check.go +++ b/etcdctl/ctlv3/command/check.go @@ -21,6 +21,7 @@ import ( "math" "math/rand" "os" + "strconv" "sync" "time" @@ -37,6 +38,8 @@ var ( checkPerfPrefix string checkPerfAutoCompact bool checkPerfAutoDefrag bool + checkDatascaleLoad string + checkDatascalePrefix string ) type checkPerfCfg struct { @@ -69,6 +72,36 @@ var checkPerfCfgMap = map[string]checkPerfCfg{ }, } +type checkDatascaleCfg struct { + limit int + kvSize int + clients int +} + +var checkDatascaleCfgMap = map[string]checkDatascaleCfg{ + "s": { + limit: 10000, + kvSize: 1024, + clients: 50, + }, + "m": { + limit: 100000, + kvSize: 1024, + clients: 200, + }, + "l": { + limit: 1000000, + kvSize: 1024, + clients: 500, + }, + "xl": { + // xl tries to hit the upper bound aggressively which is 3 versions of 1M objects (3M in total) + limit: 30000000, + kvSize: 1024, + clients: 1000, + }, +} + // NewCheckCommand returns the cobra command for "check". func NewCheckCommand() *cobra.Command { cc := &cobra.Command{ @@ -77,6 +110,7 @@ func NewCheckCommand() *cobra.Command { } cc.AddCommand(NewCheckPerfCommand()) + cc.AddCommand(NewCheckDatascaleCommand()) return cc } @@ -252,3 +286,130 @@ func defrag(c *v3.Client, ep string) { } fmt.Printf("Defragmented %q\n", ep) } + +// NewCheckDatascaleCommand returns the cobra command for "check datascale". +func NewCheckDatascaleCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "datascale [options]", + Short: "Check the memory usage of holding data for diferent workloads on a given server endpoint.", + Long: "If no endpoint is provided, localhost will be used. If multiple endpoints are provided, first endpoint will be used.", + Run: newCheckDatascaleCommand, + } + + cmd.Flags().StringVar(&checkDatascaleLoad, "load", "s", "The datascale check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)") + cmd.Flags().StringVar(&checkDatascalePrefix, "prefix", "/etcdctl-check-datascale/", "The prefix for writing the datascale check's keys.") + + return cmd +} + +// newCheckDatascaleCommand executes the "check datascale" command. +func newCheckDatascaleCommand(cmd *cobra.Command, args []string) { + var checkDatascaleAlias = map[string]string{ + "s": "s", "small": "s", + "m": "m", "medium": "m", + "l": "l", "large": "l", + "xl": "xl", "xLarge": "xl", + } + + model, ok := checkDatascaleAlias[checkDatascaleLoad] + if !ok { + ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkDatascaleLoad)) + } + cfg := checkDatascaleCfgMap[model] + + requests := make(chan v3.Op, cfg.clients) + + cc := clientConfigFromCmd(cmd) + clients := make([]*v3.Client, cfg.clients) + for i := 0; i < cfg.clients; i++ { + clients[i] = cc.mustClient() + } + + // get endpoints + eps, errEndpoints := endpointsFromCmd(cmd) + if errEndpoints != nil { + ExitWithError(ExitError, errEndpoints) + } + + ctx, cancel := context.WithCancel(context.Background()) + resp, err := clients[0].Get(ctx, checkDatascalePrefix, v3.WithPrefix(), v3.WithLimit(1)) + cancel() + if err != nil { + ExitWithError(ExitError, err) + } + if len(resp.Kvs) > 0 { + ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkDatascalePrefix, checkDatascalePrefix)) + } + + ksize, vsize := 512, 512 + k, v := make([]byte, ksize), string(make([]byte, vsize)) + + r := report.NewReport("%4.4f") + var wg sync.WaitGroup + wg.Add(len(clients)) + + // get the process_resident_memory_bytes and process_virtual_memory_bytes before the put operations + bytesBefore := endpointMemoryMetrics(eps[0]) + if bytesBefore == 0 { + fmt.Println("FAIL: Could not read process_resident_memory_bytes before the put operations.") + os.Exit(ExitError) + } + + fmt.Println(fmt.Sprintf("Start data scale check for work load [%v key-value pairs, %v bytes per key-value, %v concurrent clients].", cfg.limit, cfg.kvSize, cfg.clients)) + for i := range clients { + go func(c *v3.Client) { + defer wg.Done() + for op := range requests { + st := time.Now() + _, derr := c.Do(context.Background(), op) + r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()} + } + }(clients[i]) + } + + go func() { + for i := 0; i < cfg.limit; i++ { + binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64))) + requests <- v3.OpPut(checkDatascalePrefix+string(k), v) + } + close(requests) + }() + + sc := r.Stats() + wg.Wait() + close(r.Results()) + s := <-sc + + // get the process_resident_memory_bytes after the put operations + bytesAfter := endpointMemoryMetrics(eps[0]) + if bytesAfter == 0 { + fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.") + os.Exit(ExitError) + } + + // delete the created kv pairs + ctx, cancel = context.WithCancel(context.Background()) + _, err = clients[0].Delete(ctx, checkDatascalePrefix, v3.WithPrefix()) + defer cancel() + if err != nil { + ExitWithError(ExitError, err) + } + + if bytesAfter == 0 { + fmt.Println("FAIL: Could not read process_resident_memory_bytes after the put operations.") + os.Exit(ExitError) + } + + bytesUsed := bytesAfter - bytesBefore + mbUsed := bytesUsed / (1024 * 1024) + + if len(s.ErrorDist) != 0 { + fmt.Println("FAIL: too many errors") + for k, v := range s.ErrorDist { + fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v) + } + os.Exit(ExitError) + } else { + fmt.Println(fmt.Sprintf("PASS: Approximate system memory used : %v MB.", strconv.FormatFloat(float64(mbUsed), 'f', 2, 64))) + } +} diff --git a/etcdctl/ctlv3/command/util.go b/etcdctl/ctlv3/command/util.go index 8c94a2df94a..64a4a16f4c0 100644 --- a/etcdctl/ctlv3/command/util.go +++ b/etcdctl/ctlv3/command/util.go @@ -18,7 +18,11 @@ import ( "context" "encoding/hex" "fmt" + "io/ioutil" + "net/http" "regexp" + "strconv" + "strings" pb "github.com/coreos/etcd/internal/mvcc/mvccpb" @@ -75,3 +79,42 @@ func commandCtx(cmd *cobra.Command) (context.Context, context.CancelFunc) { } return context.WithTimeout(context.Background(), timeOut) } + +// get the process_resident_memory_bytes from /metrics +func endpointMemoryMetrics(host string) float64 { + residentMemoryKey := "process_resident_memory_bytes" + var residentMemoryValue string + if !strings.HasPrefix(host, `http://`) { + host = "http://" + host + } + url := host + "/metrics" + resp, err := http.Get(url) + if err != nil { + fmt.Println(fmt.Sprintf("fetch error: %v", err)) + return 0.0 + } + byts, readerr := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if readerr != nil { + fmt.Println(fmt.Sprintf("fetch error: reading %s: %v", url, readerr)) + return 0.0 + } + + for _, line := range strings.Split(string(byts), "\n") { + if strings.HasPrefix(line, residentMemoryKey) { + residentMemoryValue = strings.TrimSpace(strings.TrimPrefix(line, residentMemoryKey)) + break + } + } + if residentMemoryValue == "" { + fmt.Println(fmt.Sprintf("could not find: %v", residentMemoryKey)) + return 0.0 + } + residentMemoryBytes, parseErr := strconv.ParseFloat(residentMemoryValue, 64) + if parseErr != nil { + fmt.Println(fmt.Sprintf("parse error: %v", parseErr)) + return 0.0 + } + + return residentMemoryBytes +}