diff --git a/throughput/issuer.go b/throughput/issuer.go index f74f2e3209..5f5443040b 100644 --- a/throughput/issuer.go +++ b/throughput/issuer.go @@ -6,7 +6,6 @@ package throughput import ( "context" "fmt" - "strings" "sync" "time" @@ -30,10 +29,13 @@ type issuer struct { ws *ws.WebSocketClient outstandingTxs int abandoned error + + // injected from the spammer + tracker *tracker } func (i *issuer) Start(ctx context.Context) { - issuerWg.Add(1) + i.tracker.issuerWg.Add(1) go func() { for { _, wsErr, result, err := i.ws.ListenTx(context.TODO()) @@ -43,28 +45,14 @@ func (i *issuer) Start(ctx context.Context) { i.l.Lock() i.outstandingTxs-- i.l.Unlock() - inflight.Add(-1) - l.Lock() - if result != nil { - if result.Success { - confirmedTxs++ - } else { - utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success) - } - } else { - // We can't error match here because we receive it over the wire. - if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) { - utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr) - } - } - totalTxs++ - l.Unlock() + i.tracker.inflight.Add(-1) + i.tracker.logResult(result, wsErr) } }() go func() { defer func() { _ = i.ws.Close() - issuerWg.Done() + i.tracker.issuerWg.Done() }() <-ctx.Done() @@ -98,7 +86,7 @@ func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain i.l.Lock() i.outstandingTxs++ i.l.Unlock() - inflight.Add(1) + i.tracker.inflight.Add(1) // Register transaction and recover upon failure if err := i.ws.RegisterTx(tx); err != nil { @@ -135,37 +123,3 @@ func getRandomIssuer(issuers []*issuer) *issuer { index := rand.Int() % len(issuers) return issuers[index] } - -func (i *issuer) logStats(cctx context.Context) { - // Log stats - t := time.NewTicker(1 * time.Second) // ensure no duplicates created - var psent int64 - go func() { - defer t.Stop() - for { - select { - case <-t.C: - current := sent.Load() - l.Lock() - if totalTxs > 0 { - unitPrices, err := i.cli.UnitPrices(cctx, false) - if err != nil { - continue - } - utils.Outf( - "{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll - totalTxs, - float64(confirmedTxs)/float64(totalTxs)*100, - inflight.Load(), - current-psent, - unitPrices, - ) - } - l.Unlock() - psent = current - case <-cctx.Done(): - return - } - } - }() -} diff --git a/throughput/spam.go b/throughput/spam.go index 4a6aaf22c8..7a06486528 100644 --- a/throughput/spam.go +++ b/throughput/spam.go @@ -11,7 +11,6 @@ import ( "os/signal" "runtime" "sync" - "sync/atomic" "syscall" "time" @@ -41,14 +40,6 @@ const ( // TODO: remove the use of global variables var ( maxConcurrency = runtime.NumCPU() - issuerWg sync.WaitGroup - - l sync.Mutex - confirmedTxs uint64 - totalTxs uint64 - - inflight atomic.Int64 - sent atomic.Int64 ) type Spammer struct { @@ -69,11 +60,15 @@ type Spammer struct { // Number of accounts numAccounts int + + // keep track of variables shared across issuers + tracker *tracker } func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) { // Log Zipf participants zipfSeed := rand.New(rand.NewSource(0)) //nolint:gosec + tracker := &tracker{} balance, err := sh.LookupBalance(sc.key.Address) if err != nil { return nil, err @@ -92,6 +87,8 @@ func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) { txsPerSecondStep: sc.txsPerSecondStep, numClients: sc.numClients, numAccounts: sc.numAccounts, + + tracker: tracker, }, nil } @@ -119,7 +116,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo if err != nil { return err } - actions := sh.GetTransfer(s.key.Address, 0, uniqueBytes()) + actions := sh.GetTransfer(s.key.Address, 0, s.tracker.uniqueBytes()) maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory) if err != nil { return err @@ -154,7 +151,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo } // set logging - issuers[0].logStats(cctx) + s.tracker.logState(cctx, issuers[0].cli) // broadcast transactions s.broadcast(cctx, cancel, sh, accounts, funds, factories, issuers, feePerTx, terminate) @@ -202,7 +199,7 @@ func (s Spammer) broadcast( start := time.Now() // Check to see if we should wait for pending txs - if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) { + if int64(currentTarget)+s.tracker.inflight.Load() > int64(currentTarget*pendingTargetMultiplier) { consecutiveUnderBacklog = 0 consecutiveAboveBacklog++ if consecutiveAboveBacklog >= failedRunsToDecreaseTarget { @@ -247,7 +244,7 @@ func (s Spammer) broadcast( fundsL.Unlock() // Send transaction - actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes()) + actions := sh.GetTransfer(recipient, amountToTransfer, s.tracker.uniqueBytes()) return issuer.Send(ctx, actions, factory, feePerTx) }) } @@ -290,7 +287,7 @@ func (s Spammer) broadcast( // Wait for all issuers to finish utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n") - issuerWg.Wait() + s.tracker.issuerWg.Wait() } func (s *Spammer) logZipf(zipfSeed *rand.Rand) { @@ -306,6 +303,7 @@ func (s *Spammer) logZipf(zipfSeed *rand.Rand) { // createIssuer creates an [numClients] transaction issuers for each URI in [uris] func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) { issuers := []*issuer{} + for i := 0; i < len(s.uris); i++ { for j := 0; j < s.numClients; j++ { cli := jsonrpc.NewJSONRPCClient(s.uris[i]) @@ -313,7 +311,14 @@ func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) { if err != nil { return nil, err } - issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i]} + issuer := &issuer{ + i: len(issuers), + cli: cli, + ws: webSocketClient, + parser: parser, + uri: s.uris[i], + tracker: s.tracker, + } issuers = append(issuers, issuer) } } @@ -362,7 +367,7 @@ func (s *Spammer) distributeFunds(ctx context.Context, cli *jsonrpc.JSONRPCClien factories[i] = f // Send funds - actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes()) + actions := sh.GetTransfer(pk.Address, distAmount, s.tracker.uniqueBytes()) _, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx) if err != nil { return nil, nil, nil, err @@ -416,7 +421,7 @@ func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, p // Send funds returnAmt := balance - feePerTx - actions := sh.GetTransfer(s.key.Address, returnAmt, uniqueBytes()) + actions := sh.GetTransfer(s.key.Address, returnAmt, s.tracker.uniqueBytes()) _, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx) if err != nil { return err diff --git a/throughput/tracker.go b/throughput/tracker.go new file mode 100644 index 0000000000..c0fc520a7d --- /dev/null +++ b/throughput/tracker.go @@ -0,0 +1,88 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throughput + +import ( + "context" + "encoding/binary" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/hypersdk/api/jsonrpc" + "github.com/ava-labs/hypersdk/api/ws" + "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/utils" +) + +type tracker struct { + issuerWg sync.WaitGroup + inflight atomic.Int64 + + l sync.Mutex + confirmedTxs int + totalTxs int + + sent atomic.Int64 +} + +func (t *tracker) logResult( + result *chain.Result, + wsErr error, +) { + t.l.Lock() + if result != nil { + if result.Success { + t.confirmedTxs++ + } else { + utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success) + } + } else { + // We can't error match here because we receive it over the wire. + if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) { + utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr) + } + } + t.totalTxs++ + t.l.Unlock() +} + +func (t *tracker) logState(ctx context.Context, cli *jsonrpc.JSONRPCClient) { + // Log stats + tick := time.NewTicker(1 * time.Second) // ensure no duplicates created + var psent int64 + go func() { + defer tick.Stop() + for { + select { + case <-tick.C: + current := t.sent.Load() + t.l.Lock() + if t.totalTxs > 0 { + unitPrices, err := cli.UnitPrices(ctx, false) + if err != nil { + continue + } + utils.Outf( + "{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll + t.totalTxs, + float64(t.confirmedTxs)/float64(t.totalTxs)*100, + t.inflight.Load(), + current-psent, + unitPrices, + ) + } + t.l.Unlock() + psent = current + case <-ctx.Done(): + return + } + } + }() +} + +func (t *tracker) uniqueBytes() []byte { + return binary.BigEndian.AppendUint64(nil, uint64(t.sent.Add(1))) +} diff --git a/throughput/utils.go b/throughput/utils.go deleted file mode 100644 index c1e072db2f..0000000000 --- a/throughput/utils.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package throughput - -import ( - "encoding/binary" -) - -func uniqueBytes() []byte { - return binary.BigEndian.AppendUint64(nil, uint64(sent.Add(1))) -}