From 54f8ac6433c993e0740782862ef35ebb2564cda8 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Fri, 18 Jan 2019 16:42:23 +0700 Subject: [PATCH 01/10] cmd/swarm: add sliding window tests (v1) --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 86 ------- cmd/swarm/swarm-smoke/main.go | 19 +- cmd/swarm/swarm-smoke/sliding_window.go | 181 +++++++++++++ cmd/swarm/swarm-smoke/upload_and_sync.go | 149 ----------- cmd/swarm/swarm-smoke/upload_speed.go | 39 +-- cmd/swarm/swarm-smoke/util.go | 240 ++++++++++++++++++ 6 files changed, 438 insertions(+), 276 deletions(-) create mode 100644 cmd/swarm/swarm-smoke/sliding_window.go create mode 100644 cmd/swarm/swarm-smoke/util.go diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 2c5e3fd235c4..59b72380998b 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -2,13 +2,10 @@ package main import ( "bytes" - "context" "crypto/md5" "fmt" "io" "io/ioutil" - "net/http" - "net/http/httptrace" "os" "os/exec" "strings" @@ -18,13 +15,8 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/ethereum/go-ethereum/swarm/testutil" - colorable "github.com/mattn/go-colorable" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -33,27 +25,6 @@ const ( feedRandomDataLength = 8 ) -func cliFeedUploadAndSync(c *cli.Context) error { - metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1) - log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))) - - errc := make(chan error) - go func() { - errc <- feedUploadAndSync(c) - }() - - select { - case err := <-errc: - if err != nil { - metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1) - } - return err - case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1) - return fmt.Errorf("timeout after %v sec", timeout) - } -} - // TODO: retrieve with manifest + extract repeating code func feedUploadAndSync(c *cli.Context) error { defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now()) @@ -307,60 +278,3 @@ func feedUploadAndSync(c *cli.Context) error { return nil } - -func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { - ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") - defer sp.Finish() - - log.Trace("sleeping", "ruid", ruid) - time.Sleep(3 * time.Second) - - log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) - - var tn time.Time - reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user - req, _ := http.NewRequest("GET", reqUri, nil) - - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) - - req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) - transport := http.DefaultTransport - - //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - tn = time.Now() - res, err := transport.RoundTrip(req) - if err != nil { - log.Error(err.Error(), "ruid", ruid) - return err - } - - log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) - - if res.StatusCode != 200 { - return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid) - } - - defer res.Body.Close() - - rdigest, err := digest(res.Body) - if err != nil { - log.Warn(err.Error(), "ruid", ruid) - return err - } - - if !bytes.Equal(rdigest, original) { - err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) - - return nil -} diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index ebd0c9715505..788021e47ee9 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -49,6 +49,7 @@ var ( verbosity int timeout int single bool + storeSize int ) func main() { @@ -122,6 +123,12 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, + cli.IntFlag{ + Name: "store", + Value: 5000, + Usage: "individual node store size", + Destination: &storeSize, + }, } app.Flags = append(app.Flags, []cli.Flag{ @@ -140,19 +147,25 @@ func main() { Name: "upload_and_sync", Aliases: []string{"c"}, Usage: "upload and sync", - Action: cliUploadAndSync, + Action: wrapCliCommand("upload-and-sync", uploadAndSync), }, { Name: "feed_sync", Aliases: []string{"f"}, Usage: "feed update generate, upload and sync", - Action: cliFeedUploadAndSync, + Action: wrapCliCommand("feed-and-sync", feedUploadAndSync), }, { Name: "upload_speed", Aliases: []string{"u"}, Usage: "measure upload speed", - Action: cliUploadSpeed, + Action: wrapCliCommand("upload-speed", uploadSpeed), + }, + { + Name: "sliding_window", + Aliases: []string{"s"}, + Usage: "measure network aggregate capacity", + Action: wrapCliCommand("sliding-window", slidingWindow), }, } diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go new file mode 100644 index 000000000000..3da3f12ce37c --- /dev/null +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -0,0 +1,181 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "bytes" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/testutil" + "github.com/pborman/uuid" + + cli "gopkg.in/urfave/cli.v1" +) + +type uploadResult struct { + hash string + digest []byte +} + +func slidingWindow(c *cli.Context) error { + // test dscription: + // 1. upload repeatedly the same file size, maintain a slice in which swarm hashes are stored, first hash at idx=0 + // 2. select a random node, start downloading the hashes, starting with the LAST one first (it should always be availble), till the FIRST hash + // 3. when + + defer func(now time.Time) { + totalTime := time.Since(now) + + log.Info("total time", "time", totalTime) + metrics.GetOrRegisterCounter("sliding-window.total-time", nil).Inc(int64(totalTime)) + }(time.Now()) + + generateEndpoints(scheme, cluster, appName, from, to) + storeSize = storeSize * 4096 //store size is in chunks - transform to bytes + hashes := []uploadResult{} //swarm hashes of the uploads + filesize := storeSize / 7 //each file to upload, bytes + nodes := to - from + networkCapacity := float64(storeSize) * float64(nodes) + const iterationTimeout = 30 * time.Second + log.Info("sliding window test started", "store size(kb)", int(storeSize/1000), "nodes", nodes, "filesize(kb)", int(filesize/1000), "network capacity(kb)", int(networkCapacity/1000), "timeout", timeout) + uploadedBytes := 0 + for uploadedBytes = 0; uploadedBytes <= int(networkCapacity); uploadedBytes += filesize { + seed := int(time.Now().UnixNano() / 1e6) + log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) + + randomBytes := testutil.RandomBytes(seed, filesize) + + t1 := time.Now() + hash, err := upload(&randomBytes, endpoints[0]) + if err != nil { + log.Error(err.Error()) + return err + } + metrics.GetOrRegisterCounter("sliding-window.upload-time", nil).Inc(int64(time.Since(t1))) + + fhash, err := digest(bytes.NewReader(randomBytes)) + if err != nil { + log.Error(err.Error()) + return err + } + + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) + hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) + } + + log.Info("done uploading files", "len(hashes)", len(hashes), "sleep for", syncDelay) + + time.Sleep(time.Duration(syncDelay) * time.Second) + + networkDepth := 0 + var errored int32 = 0 + + timedOut := false + +LOOP: + for i := len(hashes) - 1; i >= 0; i-- { + wg := sync.WaitGroup{} + done := time.After(iterationTimeout) + if single { + rand.Seed(time.Now().UTC().UnixNano()) + randIndex := 1 + rand.Intn(len(endpoints)-1) + ruid := uuid.New()[:8] + wg.Add(1) + go func(endpoint string, ruid string) { + // points to address: + // need to measure min/max/mean for the results when not in single mode (not all nodes would necessarily give the same result, though they should) + defer wg.Done() + + inner: + for { + select { + case <-done: + metrics.GetOrRegisterCounter("sliding-window.single.timeout", nil).Inc(1) + atomic.AddInt32(&errored, 1) + break inner + default: + } + + start := time.Now() + err := fetch(hashes[i].hash, endpoint, hashes[i].digest, ruid) + fetchTime := time.Since(start) + if err != nil { + continue + } + + metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) + return + } + + }(endpoints[randIndex], ruid) + } else { + for _, endpoint := range endpoints { + ruid := uuid.New()[:8] + wg.Add(1) + go func(endpoint string, ruid string) { + defer wg.Done() + + inner: + for { + select { + case <-done: + metrics.GetOrRegisterCounter("sliding-window.multi.timeout", nil).Inc(1) + atomic.AddInt32(&errored, 1) + break inner + default: + } + + start := time.Now() + err := fetch(hashes[i].hash, endpoint, hashes[i].digest, ruid) + fetchTime := time.Since(start) + if err != nil { + continue + } + + metrics.GetOrRegisterMeter("sliding-window.each.fetch-time", nil).Mark(int64(fetchTime)) + return + } + }(endpoint, ruid) + } + } + + wg.Wait() + networkDepth = len(hashes) - i + if errored > 0 { + break LOOP + } + select { + case <-done: + timedOut = true + break LOOP + default: + } + } + + log.Info("sliding window test finished", "timed out?", timedOut, "errored?", errored > 0, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize/1000)) + log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize/1000, "networkCapacityKb", int(networkCapacity/1000), "networkCapacityMb", int(networkCapacity/1000000)) + + metrics.GetOrRegisterMeter("sliding-window.network-depth", nil).Mark(int64(networkDepth)) + metrics.GetOrRegisterMeter("sliding-window.uploaded-bytes", nil).Mark(int64(uploadedBytes)) + return nil +} diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 0fc86c55d793..567363a17a08 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -18,75 +18,19 @@ package main import ( "bytes" - "context" - "crypto/md5" - crand "crypto/rand" - "errors" "fmt" - "io" - "io/ioutil" "math/rand" - "net/http" - "net/http/httptrace" - "os" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/testutil" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func generateEndpoints(scheme string, cluster string, app string, from int, to int) { - if cluster == "prod" { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) - } - } else if cluster == "private-internal" { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, port)) - } - } else { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) - } - } - - if includeLocalhost { - endpoints = append(endpoints, "http://localhost:8500") - } -} - -func cliUploadAndSync(c *cli.Context) error { - log.PrintOrigins(true) - log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - - metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) - - errc := make(chan error) - go func() { - errc <- uploadAndSync(c) - }() - - select { - case err := <-errc: - if err != nil { - metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) - } - return err - case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) - return fmt.Errorf("timeout after %v sec", timeout) - } -} - func uploadAndSync(c *cli.Context) error { defer func(now time.Time) { totalTime := time.Since(now) @@ -161,96 +105,3 @@ func uploadAndSync(c *cli.Context) error { return nil } - -// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string) error { - ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") - defer sp.Finish() - - log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - - var tn time.Time - reqUri := endpoint + "/bzz:/" + hash + "/" - req, _ := http.NewRequest("GET", reqUri, nil) - - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) - - req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) - transport := http.DefaultTransport - - //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - tn = time.Now() - res, err := transport.RoundTrip(req) - if err != nil { - log.Error(err.Error(), "ruid", ruid) - return err - } - log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) - - if res.StatusCode != 200 { - err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - defer res.Body.Close() - - rdigest, err := digest(res.Body) - if err != nil { - log.Warn(err.Error(), "ruid", ruid) - return err - } - - if !bytes.Equal(rdigest, original) { - err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) - - return nil -} - -// upload is uploading a file `f` to `endpoint` via the `swarm up` cmd -func upload(dataBytes *[]byte, endpoint string) (string, error) { - swarm := client.NewClient(endpoint) - f := &client.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), - ManifestEntry: api.ManifestEntry{ - ContentType: "text/plain", - Mode: 0660, - Size: int64(len(*dataBytes)), - }, - } - - // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - return swarm.Upload(f, "", false) -} - -func digest(r io.Reader) ([]byte, error) { - h := md5.New() - _, err := io.Copy(h, r) - if err != nil { - return nil, err - } - return h.Sum(nil), nil -} - -// generates random data in heap buffer -func generateRandomData(datasize int) ([]byte, error) { - b := make([]byte, datasize) - c, err := crand.Read(b) - if err != nil { - return nil, err - } else if c != datasize { - return nil, errors.New("short read") - } - return b, nil -} diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go index 4a355baf8291..ed6256753c92 100644 --- a/cmd/swarm/swarm-smoke/upload_speed.go +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -19,7 +19,6 @@ package main import ( "bytes" "fmt" - "os" "time" "github.com/ethereum/go-ethereum/log" @@ -29,42 +28,6 @@ import ( cli "gopkg.in/urfave/cli.v1" ) -var endpoint string - -//just use the first endpoint -func generateEndpoint(scheme string, cluster string, app string, from int) { - if cluster == "prod" { - endpoint = fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, from) - } else if cluster == "private-internal" { - endpoint = fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, from) - } else { - endpoint = fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, from, cluster) - } -} - -func cliUploadSpeed(c *cli.Context) error { - log.PrintOrigins(true) - log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - - metrics.GetOrRegisterCounter("upload-speed", nil).Inc(1) - - errc := make(chan error) - go func() { - errc <- uploadSpeed(c) - }() - - select { - case err := <-errc: - if err != nil { - metrics.GetOrRegisterCounter("upload-speed.fail", nil).Inc(1) - } - return err - case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("upload-speed.timeout", nil).Inc(1) - return fmt.Errorf("timeout after %v sec", timeout) - } -} - func uploadSpeed(c *cli.Context) error { defer func(now time.Time) { totalTime := time.Since(now) @@ -73,7 +36,7 @@ func uploadSpeed(c *cli.Context) error { metrics.GetOrRegisterCounter("upload-speed.total-time", nil).Inc(int64(totalTime)) }(time.Now()) - generateEndpoint(scheme, cluster, appName, from) + endpoint := generateEndpoint(scheme, cluster, appName, from) seed := int(time.Now().UnixNano() / 1e6) log.Info("uploading to "+endpoint, "seed", seed) diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go new file mode 100644 index 000000000000..4e92447b575d --- /dev/null +++ b/cmd/swarm/swarm-smoke/util.go @@ -0,0 +1,240 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "bytes" + "context" + "crypto/md5" + crand "crypto/rand" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptrace" + "os" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/api/client" + "github.com/ethereum/go-ethereum/swarm/spancontext" + opentracing "github.com/opentracing/opentracing-go" + cli "gopkg.in/urfave/cli.v1" +) + +func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { + return func(ctx *cli.Context) error { + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + log.Info("smoke test starting", "task", name, "timeout", timeout) + metrics.GetOrRegisterCounter(name, nil).Inc(1) + + errc := make(chan error) + go func() { + errc <- command(ctx) + }() + + select { + case err := <-errc: + if err != nil { + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", name), nil).Inc(1) + } + return err + case <-time.After(time.Duration(timeout) * time.Second): + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", name), nil).Inc(1) + return fmt.Errorf("timeout after %v sec", timeout) + } + } +} + +func generateEndpoints(scheme string, cluster string, app string, from int, to int) { + if cluster == "prod" { + for port := from; port < to; port++ { + endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) + } + } else { + for port := from; port < to; port++ { + endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) + } + } + + if includeLocalhost { + endpoints = append(endpoints, "http://localhost:8500") + } +} + +//just use the first endpoint +func generateEndpoint(scheme string, cluster string, app string, from int) string { + if cluster == "prod" { + return fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, from) + } else { + return fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, from, cluster) + } +} + +func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch") + defer sp.Finish() + + log.Trace("sleeping", "ruid", ruid) + time.Sleep(3 * time.Second) + + log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user) + + var tn time.Time + reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) + if err != nil { + log.Error(err.Error(), "ruid", ruid) + return err + } + + log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength) + + if res.StatusCode != 200 { + return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid) + } + + defer res.Body.Close() + + rdigest, err := digest(res.Body) + if err != nil { + log.Warn(err.Error(), "ruid", ruid) + return err + } + + if !bytes.Equal(rdigest, original) { + err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) + + return nil +} + +// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file +func fetch(hash string, endpoint string, original []byte, ruid string) error { + ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") + defer sp.Finish() + + log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) + + var tn time.Time + reqUri := endpoint + "/bzz:/" + hash + "/" + req, _ := http.NewRequest("GET", reqUri, nil) + + opentracing.GlobalTracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) + + req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) + transport := http.DefaultTransport + + //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + tn = time.Now() + res, err := transport.RoundTrip(req) + if err != nil { + log.Error(err.Error(), "ruid", ruid) + return err + } + log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) + + if res.StatusCode != 200 { + err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + defer res.Body.Close() + + rdigest, err := digest(res.Body) + if err != nil { + log.Warn(err.Error(), "ruid", ruid) + return err + } + + if !bytes.Equal(rdigest, original) { + err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) + log.Warn(err.Error(), "ruid", ruid) + return err + } + + log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) + + return nil +} + +// upload an arbitrary byte as a plaintext file to `endpoint` using the api client +func upload(dataBytes *[]byte, endpoint string) (string, error) { + swarm := client.NewClient(endpoint) + f := &client.File{ + ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), + ManifestEntry: api.ManifestEntry{ + ContentType: "text/plain", + Mode: 0660, + Size: int64(len(*dataBytes)), + }, + } + + // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. + return swarm.Upload(f, "", false) +} + +func digest(r io.Reader) ([]byte, error) { + h := md5.New() + _, err := io.Copy(h, r) + if err != nil { + return nil, err + } + return h.Sum(nil), nil +} + +// generates random data in heap buffer +func generateRandomData(datasize int) ([]byte, error) { + b := make([]byte, datasize) + c, err := crand.Read(b) + if err != nil { + return nil, err + } else if c != datasize { + return nil, errors.New("short read") + } + return b, nil +} From 7530bdaf56b8d79a64d4ac63d2bdd6feaa7df138 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 24 Jan 2019 13:38:45 +0700 Subject: [PATCH 02/10] cmd/swarm/swarm-smoke: sliding window test v2 --- cmd/swarm/swarm-smoke/sliding_window.go | 109 +++++------------------- 1 file changed, 20 insertions(+), 89 deletions(-) diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 3da3f12ce37c..5d505dea9643 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -20,8 +20,6 @@ import ( "bytes" "fmt" "math/rand" - "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -53,13 +51,16 @@ func slidingWindow(c *cli.Context) error { generateEndpoints(scheme, cluster, appName, from, to) storeSize = storeSize * 4096 //store size is in chunks - transform to bytes hashes := []uploadResult{} //swarm hashes of the uploads - filesize := storeSize / 7 //each file to upload, bytes nodes := to - from networkCapacity := float64(storeSize) * float64(nodes) const iterationTimeout = 30 * time.Second log.Info("sliding window test started", "store size(kb)", int(storeSize/1000), "nodes", nodes, "filesize(kb)", int(filesize/1000), "network capacity(kb)", int(networkCapacity/1000), "timeout", timeout) uploadedBytes := 0 - for uploadedBytes = 0; uploadedBytes <= int(networkCapacity); uploadedBytes += filesize { + networkDepth := 0 + errored := false + +outer: + for { seed := int(time.Now().UnixNano() / 1e6) log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) @@ -79,103 +80,33 @@ func slidingWindow(c *cli.Context) error { return err } - log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) - } - - log.Info("done uploading files", "len(hashes)", len(hashes), "sleep for", syncDelay) - - time.Sleep(time.Duration(syncDelay) * time.Second) + time.Sleep(time.Duration(syncDelay) * time.Second) + uploadedBytes += filesize - networkDepth := 0 - var errored int32 = 0 - - timedOut := false - -LOOP: - for i := len(hashes) - 1; i >= 0; i-- { - wg := sync.WaitGroup{} - done := time.After(iterationTimeout) - if single { + for i, v := range hashes { rand.Seed(time.Now().UTC().UnixNano()) randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - // points to address: - // need to measure min/max/mean for the results when not in single mode (not all nodes would necessarily give the same result, though they should) - defer wg.Done() - - inner: - for { - select { - case <-done: - metrics.GetOrRegisterCounter("sliding-window.single.timeout", nil).Inc(1) - atomic.AddInt32(&errored, 1) - break inner - default: - } - - start := time.Now() - err := fetch(hashes[i].hash, endpoint, hashes[i].digest, ruid) - fetchTime := time.Since(start) - if err != nil { - continue - } - - metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) - return - } - - }(endpoints[randIndex], ruid) - } else { - for _, endpoint := range endpoints { - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - defer wg.Done() - - inner: - for { - select { - case <-done: - metrics.GetOrRegisterCounter("sliding-window.multi.timeout", nil).Inc(1) - atomic.AddInt32(&errored, 1) - break inner - default: - } - - start := time.Now() - err := fetch(hashes[i].hash, endpoint, hashes[i].digest, ruid) - fetchTime := time.Since(start) - if err != nil { - continue - } - - metrics.GetOrRegisterMeter("sliding-window.each.fetch-time", nil).Mark(int64(fetchTime)) - return - } - }(endpoint, ruid) + start := time.Now() + err := fetch(v.hash, endpoints[randIndex], v.digest, ruid) + fetchTime := time.Since(start) + if err != nil { + errored = true + log.Error("error retrieving hash", "hash idx", i, "err", err) + metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) + networkDepth = i + break outer } - } - wg.Wait() - networkDepth = len(hashes) - i - if errored > 0 { - break LOOP - } - select { - case <-done: - timedOut = true - break LOOP - default: + metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) } } - log.Info("sliding window test finished", "timed out?", timedOut, "errored?", errored > 0, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize/1000)) + log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize/1000)) log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize/1000, "networkCapacityKb", int(networkCapacity/1000), "networkCapacityMb", int(networkCapacity/1000000)) metrics.GetOrRegisterMeter("sliding-window.network-depth", nil).Mark(int64(networkDepth)) - metrics.GetOrRegisterMeter("sliding-window.uploaded-bytes", nil).Mark(int64(uploadedBytes)) return nil } From 2555743a6229c99d0870e16f36e3c347e04a2d56 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Thu, 24 Jan 2019 23:14:18 +0700 Subject: [PATCH 03/10] cmd/swarm: fix filesize of uploaded file --- cmd/swarm/swarm-smoke/sliding_window.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 5d505dea9643..41eedad00359 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -54,7 +54,7 @@ func slidingWindow(c *cli.Context) error { nodes := to - from networkCapacity := float64(storeSize) * float64(nodes) const iterationTimeout = 30 * time.Second - log.Info("sliding window test started", "store size(kb)", int(storeSize/1000), "nodes", nodes, "filesize(kb)", int(filesize/1000), "network capacity(kb)", int(networkCapacity/1000), "timeout", timeout) + log.Info("sliding window test started", "store size(kb)", int(storeSize/1000), "nodes", nodes, "filesize(kb)", filesize, "network capacity(kb)", int(networkCapacity/1000), "timeout", timeout) uploadedBytes := 0 networkDepth := 0 errored := false @@ -64,7 +64,7 @@ outer: seed := int(time.Now().UnixNano() / 1e6) log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) - randomBytes := testutil.RandomBytes(seed, filesize) + randomBytes := testutil.RandomBytes(seed, filesize*1000) t1 := time.Now() hash, err := upload(&randomBytes, endpoints[0]) @@ -83,7 +83,7 @@ outer: log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) time.Sleep(time.Duration(syncDelay) * time.Second) - uploadedBytes += filesize + uploadedBytes += filesize * 1000 for i, v := range hashes { rand.Seed(time.Now().UTC().UnixNano()) @@ -104,8 +104,8 @@ outer: } } - log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize/1000)) - log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize/1000, "networkCapacityKb", int(networkCapacity/1000), "networkCapacityMb", int(networkCapacity/1000000)) + log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize)) + log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize, "networkCapacityKb", int(networkCapacity/1000), "networkCapacityMb", int(networkCapacity/1000000)) metrics.GetOrRegisterMeter("sliding-window.network-depth", nil).Mark(int64(networkDepth)) return nil From ca37153f3a6685c6907e38c91e9f30280b1e5c72 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Fri, 25 Jan 2019 11:31:08 +0700 Subject: [PATCH 04/10] cmd/swarm/swarm-smoke: fix error handling --- cmd/swarm/swarm-smoke/sliding_window.go | 46 +++++++++++++++++-------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 41eedad00359..7cf17685a24c 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/log" @@ -58,7 +59,6 @@ func slidingWindow(c *cli.Context) error { uploadedBytes := 0 networkDepth := 0 errored := false - outer: for { seed := int(time.Now().UnixNano() / 1e6) @@ -84,23 +84,39 @@ outer: hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) time.Sleep(time.Duration(syncDelay) * time.Second) uploadedBytes += filesize * 1000 - + var wg sync.WaitGroup for i, v := range hashes { - rand.Seed(time.Now().UTC().UnixNano()) - randIndex := 1 + rand.Intn(len(endpoints)-1) - ruid := uuid.New()[:8] - start := time.Now() - err := fetch(v.hash, endpoints[randIndex], v.digest, ruid) - fetchTime := time.Since(start) - if err != nil { - errored = true - log.Error("error retrieving hash", "hash idx", i, "err", err) - metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) - networkDepth = i + timeout := time.After(30 * time.Second) + errored = false + wg.Add(1) + go func(i int, v uploadResult) { + defer wg.Done() + for { + select { + case <-timeout: + errored = true + log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) + metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) + return + default: + } + rand.Seed(time.Now().UTC().UnixNano()) + randIndex := 1 + rand.Intn(len(endpoints)-1) + ruid := uuid.New()[:8] + start := time.Now() + err := fetch(v.hash, endpoints[randIndex], v.digest, ruid) + fetchTime := time.Since(start) + if err != nil { + continue + } + metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) + } + }(i, v) + wg.Wait() + if errored { break outer } - - metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) + networkDepth = i } } From 75b2b3428a8be6cc9f20d3176c6f8b21cac1cc10 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 28 Jan 2019 13:20:47 +0700 Subject: [PATCH 05/10] cmd/swarm/swarm-smoke: address PR comments --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 12 ++- cmd/swarm/swarm-smoke/sliding_window.go | 79 +++++++++---------- cmd/swarm/swarm-smoke/upload_and_sync.go | 17 ++-- cmd/swarm/swarm-smoke/upload_speed.go | 16 ++-- cmd/swarm/swarm-smoke/util.go | 6 +- 5 files changed, 59 insertions(+), 71 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 59b72380998b..a322ba89c882 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -3,6 +3,7 @@ package main import ( "bytes" "crypto/md5" + crand "crypto/rand" "fmt" "io" "io/ioutil" @@ -16,7 +17,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage/feed" - "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -203,9 +203,10 @@ func feedUploadAndSync(c *cli.Context) error { seed := int(time.Now().UnixNano() / 1e6) log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed) - randomBytes := testutil.RandomBytes(seed, filesize*1000) + h = md5.New() + r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h) - hash, err := upload(&randomBytes, endpoints[0]) + hash, err := upload(r, filesize*1000, endpoints[0]) if err != nil { return err } @@ -214,10 +215,7 @@ func feedUploadAndSync(c *cli.Context) error { return err } multihashHex := hexutil.Encode(hashBytes) - fileHash, err := digest(bytes.NewReader(randomBytes)) - if err != nil { - return err - } + fileHash := h.Sum(nil) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash)) diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 7cf17685a24c..d1abd32cc4a8 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -17,31 +17,32 @@ package main import ( - "bytes" + "crypto/md5" + crand "crypto/rand" "fmt" + "io" "math/rand" - "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) +var seed = time.Now().UTC().UnixNano() + +func init() { + rand.Seed(seed) +} + type uploadResult struct { hash string digest []byte } func slidingWindow(c *cli.Context) error { - // test dscription: - // 1. upload repeatedly the same file size, maintain a slice in which swarm hashes are stored, first hash at idx=0 - // 2. select a random node, start downloading the hashes, starting with the LAST one first (it should always be availble), till the FIRST hash - // 3. when - defer func(now time.Time) { totalTime := time.Since(now) @@ -50,69 +51,61 @@ func slidingWindow(c *cli.Context) error { }(time.Now()) generateEndpoints(scheme, cluster, appName, from, to) - storeSize = storeSize * 4096 //store size is in chunks - transform to bytes - hashes := []uploadResult{} //swarm hashes of the uploads + hashes := []uploadResult{} //swarm hashes of the uploads nodes := to - from - networkCapacity := float64(storeSize) * float64(nodes) const iterationTimeout = 30 * time.Second - log.Info("sliding window test started", "store size(kb)", int(storeSize/1000), "nodes", nodes, "filesize(kb)", filesize, "network capacity(kb)", int(networkCapacity/1000), "timeout", timeout) + log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) uploadedBytes := 0 networkDepth := 0 errored := false + outer: for { - seed := int(time.Now().UnixNano() / 1e6) log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) - randomBytes := testutil.RandomBytes(seed, filesize*1000) - + h := md5.New() + r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h) t1 := time.Now() - hash, err := upload(&randomBytes, endpoints[0]) - if err != nil { - log.Error(err.Error()) - return err - } - metrics.GetOrRegisterCounter("sliding-window.upload-time", nil).Inc(int64(time.Since(t1))) - fhash, err := digest(bytes.NewReader(randomBytes)) + hash, err := upload(r, filesize*1000, endpoints[0]) if err != nil { log.Error(err.Error()) return err } + metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1) + + fhash := h.Sum(nil) + log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay) hashes = append(hashes, uploadResult{hash: hash, digest: fhash}) time.Sleep(time.Duration(syncDelay) * time.Second) uploadedBytes += filesize * 1000 - var wg sync.WaitGroup + for i, v := range hashes { timeout := time.After(30 * time.Second) errored = false - wg.Add(1) - go func(i int, v uploadResult) { - defer wg.Done() - for { - select { - case <-timeout: - errored = true - log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) - metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) - return - default: - } - rand.Seed(time.Now().UTC().UnixNano()) + + inner: + for { + select { + case <-timeout: + errored = true + log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) + metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) + default: randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] start := time.Now() err := fetch(v.hash, endpoints[randIndex], v.digest, ruid) - fetchTime := time.Since(start) if err != nil { - continue + continue inner } - metrics.GetOrRegisterMeter("sliding-window.single.fetch-time", nil).Mark(int64(fetchTime)) + metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start) + break inner } - }(i, v) - wg.Wait() + } + if errored { break outer } @@ -120,8 +113,8 @@ outer: } } - log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", int(networkDepth*filesize)) - log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize, "networkCapacityKb", int(networkCapacity/1000), "networkCapacityMb", int(networkCapacity/1000000)) + log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", networkDepth*filesize) + log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize) metrics.GetOrRegisterMeter("sliding-window.network-depth", nil).Mark(int64(networkDepth)) return nil diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 567363a17a08..b0e1f3ee4887 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -17,15 +17,16 @@ package main import ( - "bytes" + "crypto/md5" + crand "crypto/rand" "fmt" + "io" "math/rand" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/testutil" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" @@ -40,23 +41,21 @@ func uploadAndSync(c *cli.Context) error { generateEndpoints(scheme, cluster, appName, from, to) seed := int(time.Now().UnixNano() / 1e6) + log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) - randomBytes := testutil.RandomBytes(seed, filesize*1000) + h := md5.New() + r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h) t1 := time.Now() - hash, err := upload(&randomBytes, endpoints[0]) + hash, err := upload(r, filesize*1000, endpoints[0]) if err != nil { log.Error(err.Error()) return err } metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).UpdateSince(t1) - fhash, err := digest(bytes.NewReader(randomBytes)) - if err != nil { - log.Error(err.Error()) - return err - } + fhash := h.Sum(nil) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go index ed6256753c92..943c93240ffe 100644 --- a/cmd/swarm/swarm-smoke/upload_speed.go +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -17,13 +17,14 @@ package main import ( - "bytes" + "crypto/md5" + crand "crypto/rand" "fmt" + "io" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/swarm/testutil" cli "gopkg.in/urfave/cli.v1" ) @@ -40,21 +41,18 @@ func uploadSpeed(c *cli.Context) error { seed := int(time.Now().UnixNano() / 1e6) log.Info("uploading to "+endpoint, "seed", seed) - randomBytes := testutil.RandomBytes(seed, filesize*1000) + h := md5.New() + r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h) t1 := time.Now() - hash, err := upload(&randomBytes, endpoint) + hash, err := upload(r, filesize*1000, endpoint) if err != nil { log.Error(err.Error()) return err } metrics.GetOrRegisterCounter("upload-speed.upload-time", nil).Inc(int64(time.Since(t1))) - fhash, err := digest(bytes.NewReader(randomBytes)) - if err != nil { - log.Error(err.Error()) - return err - } + fhash := h.Sum(nil) log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) return nil diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 4e92447b575d..21eeffa65fbe 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -203,14 +203,14 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { } // upload an arbitrary byte as a plaintext file to `endpoint` using the api client -func upload(dataBytes *[]byte, endpoint string) (string, error) { +func upload(r io.Reader, size int, endpoint string) (string, error) { swarm := client.NewClient(endpoint) f := &client.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), + ReadCloser: ioutil.NopCloser(r), ManifestEntry: api.ManifestEntry{ ContentType: "text/plain", Mode: 0660, - Size: int64(len(*dataBytes)), + Size: int64(size), }, } From 983eb8c7a3a597807b2b0f04ed5b2f57df8181a8 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 28 Jan 2019 18:54:35 +0700 Subject: [PATCH 06/10] cmd/swarm/swarm-smoke: more refinements --- cmd/swarm/swarm-smoke/sliding_window.go | 3 ++- cmd/swarm/swarm-smoke/util.go | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index d1abd32cc4a8..182dd071dc69 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -83,7 +83,7 @@ outer: uploadedBytes += filesize * 1000 for i, v := range hashes { - timeout := time.After(30 * time.Second) + timeout := time.After(time.Duration(timeout) * time.Second) errored = false inner: @@ -93,6 +93,7 @@ outer: errored = true log.Error("error retrieving hash. timeout", "hash idx", i, "err", err) metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1) + break inner default: randIndex := 1 + rand.Intn(len(endpoints)-1) ruid := uuid.New()[:8] diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 21eeffa65fbe..e029558d5371 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -39,11 +39,14 @@ import ( cli "gopkg.in/urfave/cli.v1" ) +var commandName = "" + func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) log.Info("smoke test starting", "task", name, "timeout", timeout) + commandName = name metrics.GetOrRegisterCounter(name, nil).Inc(1) errc := make(chan error) @@ -162,7 +165,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) - trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) + trace := client.GetClientTrace(commandName+" - http get", commandName, ruid, &tn) req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) transport := http.DefaultTransport From 2bc8addb42578c24d7e7894c055c467d94804edb Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 28 Jan 2019 20:03:57 +0700 Subject: [PATCH 07/10] cmd/swarm/swarm-smoke: address more pr comments --- cmd/swarm/swarm-smoke/util.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index e029558d5371..6cd0d294011f 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -39,7 +39,9 @@ import ( cli "gopkg.in/urfave/cli.v1" ) -var commandName = "" +var ( + commandName = "" +) func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { @@ -72,6 +74,10 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) } + } else if cluster == "private-internal" { + for port := from; port < to; port++ { + endpoints = append(endpoints, fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, port)) + } } else { for port := from; port < to; port++ { endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) @@ -87,6 +93,8 @@ func generateEndpoints(scheme string, cluster string, app string, from int, to i func generateEndpoint(scheme string, cluster string, app string, from int) string { if cluster == "prod" { return fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, from) + } else if cluster == "private-internal" { + return fmt.Sprintf("%s://swarm-private-internal-%v:8500", scheme, from) } else { return fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, from, cluster) } From e3a5773600d994d5c5f08fcc578859318e545386 Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Mon, 28 Jan 2019 20:19:23 +0700 Subject: [PATCH 08/10] cmd/swarm/swarm-smoke: dont fail hard on timeout for sliding window --- cmd/swarm/swarm-smoke/main.go | 8 ++++---- cmd/swarm/swarm-smoke/sliding_window.go | 2 +- cmd/swarm/swarm-smoke/util.go | 20 ++++++++++++++++++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 788021e47ee9..15a6911efcaf 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -147,25 +147,25 @@ func main() { Name: "upload_and_sync", Aliases: []string{"c"}, Usage: "upload and sync", - Action: wrapCliCommand("upload-and-sync", uploadAndSync), + Action: wrapCliCommand("upload-and-sync", true, uploadAndSync), }, { Name: "feed_sync", Aliases: []string{"f"}, Usage: "feed update generate, upload and sync", - Action: wrapCliCommand("feed-and-sync", feedUploadAndSync), + Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync), }, { Name: "upload_speed", Aliases: []string{"u"}, Usage: "measure upload speed", - Action: wrapCliCommand("upload-speed", uploadSpeed), + Action: wrapCliCommand("upload-speed", true, uploadSpeed), }, { Name: "sliding_window", Aliases: []string{"s"}, Usage: "measure network aggregate capacity", - Action: wrapCliCommand("sliding-window", slidingWindow), + Action: wrapCliCommand("sliding-window", false, slidingWindow), }, } diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index 182dd071dc69..3dd404c50879 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -111,12 +111,12 @@ outer: break outer } networkDepth = i + metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth)) } } log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", networkDepth*filesize) log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize) - metrics.GetOrRegisterMeter("sliding-window.network-depth", nil).Mark(int64(networkDepth)) return nil } diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 6cd0d294011f..2a3083bf5755 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -43,15 +43,31 @@ var ( commandName = "" ) -func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { +func wrapCliCommand(name string, killOnTimeout bool, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) + defer func(now time.Time) { + totalTime := time.Since(now) + + log.Info("total time", "time", totalTime) + metrics.GetOrRegisterCounter(name+".total-time", nil).Inc(int64(totalTime)) + }(time.Now()) + log.Info("smoke test starting", "task", name, "timeout", timeout) commandName = name metrics.GetOrRegisterCounter(name, nil).Inc(1) errc := make(chan error) + done := make(chan struct{}) + + if killOnTimeout { + go func() { + <-time.After(time.Duration(timeout) * time.Second) + close(done) + }() + } + go func() { errc <- command(ctx) }() @@ -62,7 +78,7 @@ func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Con metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", name), nil).Inc(1) } return err - case <-time.After(time.Duration(timeout) * time.Second): + case <-done: metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", name), nil).Inc(1) return fmt.Errorf("timeout after %v sec", timeout) } From c3f46ea7c8c08fc6c5b87d99cf36e546d6261e7d Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Tue, 29 Jan 2019 11:29:52 +0700 Subject: [PATCH 09/10] cmd/swarm/swarm-smoke: remove unused store size cli flag --- cmd/swarm/swarm-smoke/main.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 15a6911efcaf..8ff1c5f2ff8f 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -123,12 +123,6 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, - cli.IntFlag{ - Name: "store", - Value: 5000, - Usage: "individual node store size", - Destination: &storeSize, - }, } app.Flags = append(app.Flags, []cli.Flag{ From 2140921a7c13fe5eb3247ac743e34d843fa6896f Mon Sep 17 00:00:00 2001 From: Elad Nachmias Date: Wed, 30 Jan 2019 08:19:49 +0700 Subject: [PATCH 10/10] cmd/swarm/swarm-smoke: remove unused var --- cmd/swarm/swarm-smoke/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 8ff1c5f2ff8f..55967146052e 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -49,7 +49,6 @@ var ( verbosity int timeout int single bool - storeSize int ) func main() {