diff --git a/testplans/graphsync/go.mod b/testplans/graphsync/go.mod index 29fe70ee..c3f52c05 100644 --- a/testplans/graphsync/go.mod +++ b/testplans/graphsync/go.mod @@ -27,6 +27,7 @@ require ( github.com/libp2p/go-libp2p-secio v0.2.2 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-sockaddr v0.1.0 // indirect + github.com/multiformats/go-multiaddr v0.3.1 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 diff --git a/testplans/graphsync/main.go b/testplans/graphsync/main.go index a1cbbd05..a8168602 100644 --- a/testplans/graphsync/main.go +++ b/testplans/graphsync/main.go @@ -3,13 +3,18 @@ package main import ( "context" "crypto/rand" + "encoding/json" "fmt" "io" + "io/ioutil" + "net" + "net/http" "os" "path/filepath" goruntime "runtime" "runtime/pprof" "strings" + gosync "sync" "time" dgbadger "github.com/dgraph-io/badger/v2" @@ -36,6 +41,7 @@ import ( noise "github.com/libp2p/go-libp2p-noise" secio "github.com/libp2p/go-libp2p-secio" tls "github.com/libp2p/go-libp2p-tls" + ma "github.com/multiformats/go-multiaddr" "github.com/testground/sdk-go/network" "github.com/testground/sdk-go/run" "github.com/testground/sdk-go/runtime" @@ -48,6 +54,60 @@ import ( "github.com/ipfs/go-graphsync/storeutil" ) +type AddrInfo struct { + peerAddr *peer.AddrInfo + ip net.IP +} + +func (pi AddrInfo) MarshalJSON() ([]byte, error) { + out := make(map[string]interface{}) + peerJSON, err := pi.peerAddr.MarshalJSON() + if err != nil { + panic(fmt.Sprintf("error marshaling: %v", err)) + } + out["PEER"] = string(peerJSON) + + ip, err := pi.ip.MarshalText() + if err != nil { + panic(fmt.Sprintf("error marshaling: %v", err)) + } + out["IP"] = string(ip) + return json.Marshal(out) +} + +func (pi *AddrInfo) UnmarshalJSON(b []byte) error { + var data map[string]interface{} + err := json.Unmarshal(b, &data) + if err != nil { + panic(fmt.Sprintf("error unmarshaling: %v", err)) + } + + var pa peer.AddrInfo + pi.peerAddr = &pa + peerAddrData := data["PEER"].(string) + var peerData map[string]interface{} + err = json.Unmarshal([]byte(peerAddrData), &peerData) + if err != nil { + panic(err) + } + pid, err := peer.Decode(peerData["ID"].(string)) + if err != nil { + panic(err) + } + pi.peerAddr.ID = pid + addrs, ok := peerData["Addrs"].([]interface{}) + if ok { + for _, a := range addrs { + pi.peerAddr.Addrs = append(pi.peerAddr.Addrs, ma.StringCast(a.(string))) + } + } + + if err := pi.ip.UnmarshalText([]byte(data["IP"].(string))); err != nil { + panic(fmt.Sprintf("error unmarshaling: %v", err)) + } + return nil +} + var testcases = map[string]interface{}{ "stress": run.InitializedTestCaseFn(runStress), } @@ -80,7 +140,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { initCtx.MustWaitAllInstancesInitialized(ctx) - host, peers, _ := makeHost(ctx, runenv, initCtx) + host, ip, peers, _ := makeHost(ctx, runenv, initCtx) defer host.Close() datastore, err := createDatastore(runenv.BooleanParam("disk_store")) @@ -88,6 +148,9 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { runenv.RecordMessage("datastore error: %s", err.Error()) return err } + + maxMemoryPerPeer := runenv.SizeParam("max_memory_per_peer") + maxMemoryTotal := runenv.SizeParam("max_memory_total") var ( // make datastore, blockstore, dag service, graphsync bs = blockstore.NewBlockstore(dss.MutexWrap(datastore)) @@ -96,6 +159,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { gsnet.NewFromLibp2pHost(host), storeutil.LoaderForBlockstore(bs), storeutil.StorerForBlockstore(bs), + gsi.MaxMemoryPerPeerResponder(maxMemoryPerPeer), + gsi.MaxMemoryResponder(maxMemoryTotal), ) recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv} ) @@ -111,13 +176,36 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { runenv.RecordMessage("we are the provider") defer runenv.RecordMessage("done provider") + startTimes := make(map[struct { + peer.ID + gs.RequestID + }]time.Time) + var startTimesLk gosync.Mutex gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) { hookActions.ValidateRequest() + startTimesLk.Lock() + startTimes[struct { + peer.ID + gs.RequestID + }{p, request.ID()}] = time.Now() + startTimesLk.Unlock() + }) + gsync.RegisterCompletedResponseListener(func(p peer.ID, request gs.RequestData, status gs.ResponseStatusCode) { + startTimesLk.Lock() + startTime, ok := startTimes[struct { + peer.ID + gs.RequestID + }{p, request.ID()}] + startTimesLk.Unlock() + if ok && status == gs.RequestCompletedFull { + duration := time.Since(startTime) + recorder.recordRun(duration) + } }) gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) { recorder.recordBlock() }) - err := runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder) + err := runProvider(ctx, runenv, initCtx, dagsrv, size, ip, networkParams, concurrency, memorySnapshots, recorder) if err != nil { runenv.RecordMessage("Error running provider: %s", err.Error()) } @@ -129,8 +217,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { recorder.recordBlock() }) - p := *peers[0] - if err := host.Connect(ctx, p); err != nil { + p := peers[0] + if err := host.Connect(ctx, *p.peerAddr); err != nil { return err } runenv.RecordMessage("done dialling provider") @@ -159,8 +247,12 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams { // prepend bandwidth=0 and latency=0 zero values; the first iteration will // be a control iteration. The sidecar interprets zero values as no // limitation on that attribute. - bandwidths = append([]uint64{0}, bandwidths...) - latencies = append([]time.Duration{0}, latencies...) + if runenv.BooleanParam("unlimited_bandwidth_case") { + bandwidths = append([]uint64{0}, bandwidths...) + } + if runenv.BooleanParam("no_latency_case") { + latencies = append([]time.Duration{0}, latencies...) + } var ret []networkParams for _, bandwidth := range bandwidths { @@ -200,13 +292,15 @@ func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode { } } -func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error { +func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error { var ( cids []cid.Cid // create a selector for the whole UnixFS dag sel = allselector.AllSelector ) + runHTTPTest := runenv.BooleanParam("compare_http") + for round, np := range networkParams { var ( topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{}) @@ -215,17 +309,14 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init stateFinish = sync.State(fmt.Sprintf("finish-%d", round)) ) - recorder.beginRun(np, size, concurrency) - // wait for all instances to be ready for the next state. initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount) + recorder.beginRun(np, size, concurrency, round) // clean up previous CIDs to attempt to free memory // TODO does this work? _ = dagsrv.RemoveMany(ctx, cids) - runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth) - sctx, scancel := context.WithCancel(ctx) cidCh := make(chan []cid.Cid, 1) initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh) @@ -240,8 +331,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init errgrp, grpctx := errgroup.WithContext(ctx) for _, c := range cids { - c := c // capture - np := np // capture + c := c // capture errgrp.Go(func() error { // make a go-ipld-prime link for the root UnixFS node @@ -251,7 +341,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init runenv.RecordMessage("\t>>> requesting CID %s", c) start := time.Now() - respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel) + respCh, errCh := gsync.Request(grpctx, p.peerAddr.ID, clink, sel) for range respCh { } for err := range errCh { @@ -259,20 +349,31 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init } dur := time.Since(start) - runenv.RecordMessage("\t<<< request complete with no errors") - runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur) - - measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size)) - measurement = strings.Replace(measurement, " ", "", -1) - runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second)) - + recorder.recordRun(dur) // verify that we have the CID now. if node, err := dagsrv.Get(grpctx, c); err != nil { return err } else if node == nil { return fmt.Errorf("finished graphsync request, but CID not in store") } - + if runHTTPTest { + // request file directly over http + start = time.Now() + file, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-", c.String())) + if err != nil { + panic(err) + } + resp, err := http.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String())) + if err != nil { + panic(err) + } + bytesRead, err := io.Copy(file, resp.Body) + if err != nil { + panic(err) + } + dur = time.Since(start) + recorder.recordHTTPRun(dur, bytesRead) + } return nil }) } @@ -292,12 +393,26 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init return nil } -func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error { +func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error { var ( cids []cid.Cid bufferedDS = format.NewBufferedDAG(ctx, dagsrv) ) + runHTTPTest := runenv.BooleanParam("compare_http") + var svr *http.Server + if runHTTPTest { + // start an http server on port 8080 + runenv.RecordMessage("creating http server at http://%s:8080", ip.String()) + svr = &http.Server{Addr: ":8080"} + + go func() { + if err := svr.ListenAndServe(); err != nil { + runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String()) + } + }() + } + for round, np := range networkParams { var ( topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{}) @@ -305,10 +420,10 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC stateFinish = sync.State(fmt.Sprintf("finish-%d", round)) stateNet = sync.State(fmt.Sprintf("network-configured-%d", round)) ) - recorder.beginRun(np, size, concurrency) // wait for all instances to be ready for the next state. initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount) + recorder.beginRun(np, size, concurrency, round) // remove the previous CIDs from the dag service; hopefully this // will delete them from the store and free up memory. @@ -317,12 +432,17 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC } cids = cids[:0] - runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth) - // generate as many random files as the concurrency level. for i := 0; i < concurrency; i++ { // file with random data - file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size))) + data := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size))) + file, err := ioutil.TempFile(os.TempDir(), "unixfs-") + if err != nil { + panic(err) + } + if _, err := io.Copy(file, data); err != nil { + panic(err) + } unixfsChunkSize := uint64(1) << runenv.IntParam("chunk_size") unixfsLinksPerLevel := runenv.IntParam("links_per_level") @@ -334,6 +454,9 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC Dagserv: bufferedDS, } + if _, err := file.Seek(0, 0); err != nil { + panic(err) + } db, err := params.New(chunk.NewSizeSplitter(file, int64(unixfsChunkSize))) if err != nil { return fmt.Errorf("unable to setup dag builder: %w", err) @@ -344,6 +467,20 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC return fmt.Errorf("unable to create unix fs node: %w", err) } + if runHTTPTest { + // set up http server to send file + http.HandleFunc(fmt.Sprintf("/%s", node.Cid()), func(w http.ResponseWriter, r *http.Request) { + fileReader, err := os.Open(file.Name()) + defer fileReader.Close() + if err != nil { + panic(err) + } + _, err = io.Copy(w, fileReader) + if err != nil { + panic(err) + } + }) + } cids = append(cids, node.Cid()) } @@ -382,10 +519,15 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC } + if runHTTPTest { + if err := svr.Shutdown(ctx); err != nil { + panic(err) + } + } return nil } -func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) { +func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, net.IP, []*AddrInfo, *metrics.BandwidthCounter) { secureChannel := runenv.StringParam("secure_channel") var security libp2p.Option @@ -421,18 +563,21 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont ai = &peer.AddrInfo{ID: id, Addrs: host.Addrs()} // the peers topic where all instances will advertise their AddrInfo. - peersTopic = sync.NewTopic("peers", new(peer.AddrInfo)) + peersTopic = sync.NewTopic("peers", new(AddrInfo)) // initialize a slice to store the AddrInfos of all other peers in the run. - peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount-1) + peers = make([]*AddrInfo, 0, runenv.TestInstanceCount-1) ) // Publish our own. - initCtx.SyncClient.MustPublish(ctx, peersTopic, ai) + initCtx.SyncClient.MustPublish(ctx, peersTopic, &AddrInfo{ + peerAddr: ai, + ip: ip, + }) // Now subscribe to the peers topic and consume all addresses, storing them // in the peers slice. - peersCh := make(chan *peer.AddrInfo) + peersCh := make(chan *AddrInfo) sctx, scancel := context.WithCancel(ctx) defer scancel() @@ -442,7 +587,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont for len(peers) < cap(peers) { select { case ai := <-peersCh: - if ai.ID == id { + if ai.peerAddr.ID == id { continue // skip over ourselves. } peers = append(peers, ai) @@ -451,7 +596,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont } } - return host, peers, bwcounter + return host, ip, peers, bwcounter } func createDatastore(diskStore bool) (ds.Datastore, error) { @@ -521,7 +666,9 @@ type runRecorder struct { np networkParams size uint64 concurrency int + round int runenv *runtime.RunEnv + measurement string } func (rr *runRecorder) recordBlock() { @@ -533,9 +680,27 @@ func (rr *runRecorder) recordBlock() { rr.index++ } -func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int) { +func (rr *runRecorder) recordRun(duration time.Duration) { + rr.runenv.RecordMessage("\t<<< graphsync request complete with no errors") + rr.runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration) + rr.runenv.R().RecordPoint(rr.measurement+",transport=graphsync", float64(duration)/float64(time.Second)) +} + +func (rr *runRecorder) recordHTTPRun(duration time.Duration, bytesRead int64) { + rr.runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead)) + rr.runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration) + rr.runenv.R().RecordPoint(rr.measurement+",transport=http", float64(duration)/float64(time.Second)) +} + +func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int, round int) { + rr.concurrency = concurrency rr.np = np rr.size = size rr.index = 0 + rr.round = round + rr.runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", rr.round, rr.np.latency, rr.np.bandwidth) + measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", rr.np.latency, humanize.IBytes(rr.np.bandwidth), rr.concurrency, humanize.Bytes(rr.size)) + measurement = strings.Replace(measurement, " ", "", -1) + rr.measurement = measurement } diff --git a/testplans/graphsync/manifest.toml b/testplans/graphsync/manifest.toml index ba0c289e..a5cfbbe0 100644 --- a/testplans/graphsync/manifest.toml +++ b/testplans/graphsync/manifest.toml @@ -27,3 +27,8 @@ links_per_level = { type = "int", desc = "unixfs links per level", default = "10 raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"} disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"} memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" } +compare_http = { type = "bool", desc = "run a comparison against http", default = "true"} +max_memory_per_peer = { type = "int", desc = "max memory a responder can queue up per peer", default = "64MiB"} +max_memory_total = { type = "int", desc = "max memory a responder can queue up total", default = "512MiB"} +unlimited_bandwidth_case = { type = "bool", desc = "run a comparison against http", default = "true"} +no_latency_case = { type = "bool", desc = "run a comparison against http", default = "true"} \ No newline at end of file