Skip to content

Commit

Permalink
Remove global variables from spam script (#1659)
Browse files Browse the repository at this point in the history
  • Loading branch information
samliok authored Oct 13, 2024
1 parent bca7739 commit e3d911a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 83 deletions.
62 changes: 8 additions & 54 deletions throughput/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package throughput
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -30,10 +29,13 @@ type issuer struct {
ws *ws.WebSocketClient
outstandingTxs int
abandoned error

// injected from the spammer
tracker *tracker
}

func (i *issuer) Start(ctx context.Context) {
issuerWg.Add(1)
i.tracker.issuerWg.Add(1)
go func() {
for {
_, wsErr, result, err := i.ws.ListenTx(context.TODO())
Expand All @@ -43,28 +45,14 @@ func (i *issuer) Start(ctx context.Context) {
i.l.Lock()
i.outstandingTxs--
i.l.Unlock()
inflight.Add(-1)
l.Lock()
if result != nil {
if result.Success {
confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
totalTxs++
l.Unlock()
i.tracker.inflight.Add(-1)
i.tracker.logResult(result, wsErr)
}
}()
go func() {
defer func() {
_ = i.ws.Close()
issuerWg.Done()
i.tracker.issuerWg.Done()
}()

<-ctx.Done()
Expand Down Expand Up @@ -98,7 +86,7 @@ func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain
i.l.Lock()
i.outstandingTxs++
i.l.Unlock()
inflight.Add(1)
i.tracker.inflight.Add(1)

// Register transaction and recover upon failure
if err := i.ws.RegisterTx(tx); err != nil {
Expand Down Expand Up @@ -135,37 +123,3 @@ func getRandomIssuer(issuers []*issuer) *issuer {
index := rand.Int() % len(issuers)
return issuers[index]
}

func (i *issuer) logStats(cctx context.Context) {
// Log stats
t := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer t.Stop()
for {
select {
case <-t.C:
current := sent.Load()
l.Lock()
if totalTxs > 0 {
unitPrices, err := i.cli.UnitPrices(cctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
totalTxs,
float64(confirmedTxs)/float64(totalTxs)*100,
inflight.Load(),
current-psent,
unitPrices,
)
}
l.Unlock()
psent = current
case <-cctx.Done():
return
}
}
}()
}
39 changes: 22 additions & 17 deletions throughput/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os/signal"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -41,14 +40,6 @@ const (
// TODO: remove the use of global variables
var (
maxConcurrency = runtime.NumCPU()
issuerWg sync.WaitGroup

l sync.Mutex
confirmedTxs uint64
totalTxs uint64

inflight atomic.Int64
sent atomic.Int64
)

type Spammer struct {
Expand All @@ -69,11 +60,15 @@ type Spammer struct {

// Number of accounts
numAccounts int

// keep track of variables shared across issuers
tracker *tracker
}

func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) {
// Log Zipf participants
zipfSeed := rand.New(rand.NewSource(0)) //nolint:gosec
tracker := &tracker{}
balance, err := sh.LookupBalance(sc.key.Address)
if err != nil {
return nil, err
Expand All @@ -92,6 +87,8 @@ func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) {
txsPerSecondStep: sc.txsPerSecondStep,
numClients: sc.numClients,
numAccounts: sc.numAccounts,

tracker: tracker,
}, nil
}

Expand Down Expand Up @@ -119,7 +116,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
if err != nil {
return err
}
actions := sh.GetTransfer(s.key.Address, 0, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, 0, s.tracker.uniqueBytes())
maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +151,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}

// set logging
issuers[0].logStats(cctx)
s.tracker.logState(cctx, issuers[0].cli)

// broadcast transactions
s.broadcast(cctx, cancel, sh, accounts, funds, factories, issuers, feePerTx, terminate)
Expand Down Expand Up @@ -202,7 +199,7 @@ func (s Spammer) broadcast(
start := time.Now()

// Check to see if we should wait for pending txs
if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
if int64(currentTarget)+s.tracker.inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
consecutiveUnderBacklog = 0
consecutiveAboveBacklog++
if consecutiveAboveBacklog >= failedRunsToDecreaseTarget {
Expand Down Expand Up @@ -247,7 +244,7 @@ func (s Spammer) broadcast(
fundsL.Unlock()

// Send transaction
actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes())
actions := sh.GetTransfer(recipient, amountToTransfer, s.tracker.uniqueBytes())
return issuer.Send(ctx, actions, factory, feePerTx)
})
}
Expand Down Expand Up @@ -290,7 +287,7 @@ func (s Spammer) broadcast(

// Wait for all issuers to finish
utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n")
issuerWg.Wait()
s.tracker.issuerWg.Wait()
}

func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
Expand All @@ -306,14 +303,22 @@ func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
// createIssuer creates an [numClients] transaction issuers for each URI in [uris]
func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) {
issuers := []*issuer{}

for i := 0; i < len(s.uris); i++ {
for j := 0; j < s.numClients; j++ {
cli := jsonrpc.NewJSONRPCClient(s.uris[i])
webSocketClient, err := ws.NewWebSocketClient(s.uris[i], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read
if err != nil {
return nil, err
}
issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i]}
issuer := &issuer{
i: len(issuers),
cli: cli,
ws: webSocketClient,
parser: parser,
uri: s.uris[i],
tracker: s.tracker,
}
issuers = append(issuers, issuer)
}
}
Expand Down Expand Up @@ -362,7 +367,7 @@ func (s *Spammer) distributeFunds(ctx context.Context, cli *jsonrpc.JSONRPCClien
factories[i] = f

// Send funds
actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes())
actions := sh.GetTransfer(pk.Address, distAmount, s.tracker.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -416,7 +421,7 @@ func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, p

// Send funds
returnAmt := balance - feePerTx
actions := sh.GetTransfer(s.key.Address, returnAmt, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, returnAmt, s.tracker.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx)
if err != nil {
return err
Expand Down
88 changes: 88 additions & 0 deletions throughput/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package throughput

import (
"context"
"encoding/binary"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ava-labs/hypersdk/api/jsonrpc"
"github.com/ava-labs/hypersdk/api/ws"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/utils"
)

type tracker struct {
issuerWg sync.WaitGroup
inflight atomic.Int64

l sync.Mutex
confirmedTxs int
totalTxs int

sent atomic.Int64
}

func (t *tracker) logResult(
result *chain.Result,
wsErr error,
) {
t.l.Lock()
if result != nil {
if result.Success {
t.confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
t.totalTxs++
t.l.Unlock()
}

func (t *tracker) logState(ctx context.Context, cli *jsonrpc.JSONRPCClient) {
// Log stats
tick := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer tick.Stop()
for {
select {
case <-tick.C:
current := t.sent.Load()
t.l.Lock()
if t.totalTxs > 0 {
unitPrices, err := cli.UnitPrices(ctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
t.totalTxs,
float64(t.confirmedTxs)/float64(t.totalTxs)*100,
t.inflight.Load(),
current-psent,
unitPrices,
)
}
t.l.Unlock()
psent = current
case <-ctx.Done():
return
}
}
}()
}

func (t *tracker) uniqueBytes() []byte {
return binary.BigEndian.AppendUint64(nil, uint64(t.sent.Add(1)))
}
12 changes: 0 additions & 12 deletions throughput/utils.go

This file was deleted.

0 comments on commit e3d911a

Please sign in to comment.