Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global variables from spam script #1659

Merged
merged 8 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 11 additions & 54 deletions throughput/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package throughput
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/exp/rand"
Expand All @@ -30,10 +30,15 @@ type issuer struct {
ws *ws.WebSocketClient
outstandingTxs int
abandoned error

// injected from the spammer
issuerWg *sync.WaitGroup
inflight *atomic.Int64
logger *logger
}

func (i *issuer) Start(ctx context.Context) {
issuerWg.Add(1)
i.issuerWg.Add(1)
go func() {
for {
_, wsErr, result, err := i.ws.ListenTx(context.TODO())
Expand All @@ -43,28 +48,14 @@ func (i *issuer) Start(ctx context.Context) {
i.l.Lock()
i.outstandingTxs--
i.l.Unlock()
inflight.Add(-1)
l.Lock()
if result != nil {
if result.Success {
confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
totalTxs++
l.Unlock()
i.inflight.Add(-1)
i.logger.Log(result, wsErr)
}
}()
go func() {
defer func() {
_ = i.ws.Close()
issuerWg.Done()
i.issuerWg.Done()
}()

<-ctx.Done()
Expand Down Expand Up @@ -98,7 +89,7 @@ func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain
i.l.Lock()
i.outstandingTxs++
i.l.Unlock()
inflight.Add(1)
i.inflight.Add(1)

// Register transaction and recover upon failure
if err := i.ws.RegisterTx(tx); err != nil {
Expand Down Expand Up @@ -135,37 +126,3 @@ func getRandomIssuer(issuers []*issuer) *issuer {
index := rand.Int() % len(issuers)
return issuers[index]
}

func (i *issuer) logStats(cctx context.Context) {
// Log stats
t := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer t.Stop()
for {
select {
case <-t.C:
current := sent.Load()
l.Lock()
if totalTxs > 0 {
unitPrices, err := i.cli.UnitPrices(cctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
totalTxs,
float64(confirmedTxs)/float64(totalTxs)*100,
inflight.Load(),
current-psent,
unitPrices,
)
}
l.Unlock()
psent = current
case <-cctx.Done():
return
}
}
}()
}
78 changes: 78 additions & 0 deletions throughput/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package throughput

import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ava-labs/hypersdk/api/ws"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/utils"
)

type logger struct {
l sync.Mutex

confirmedTxs int
totalTxs int
}

func (l *logger) Log(
samliok marked this conversation as resolved.
Show resolved Hide resolved
result *chain.Result,
wsErr error,
) {
l.l.Lock()
if result != nil {
if result.Success {
l.confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
l.totalTxs++
l.l.Unlock()
}

func (l *logger) logStats(ctx context.Context, issuer *issuer, inflight *atomic.Int64, sent *atomic.Int64) {
// Log stats
t := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer t.Stop()
for {
select {
case <-t.C:
current := sent.Load()
l.l.Lock()
if l.totalTxs > 0 {
unitPrices, err := issuer.cli.UnitPrices(ctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
l.totalTxs,
float64(l.confirmedTxs)/float64(l.totalTxs)*100,
inflight.Load(),
current-psent,
unitPrices,
)
}
l.l.Unlock()
psent = current
case <-ctx.Done():
return
}
}
}()
}
70 changes: 40 additions & 30 deletions throughput/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package throughput

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -41,14 +42,6 @@ const (
// TODO: remove the use of global variables
var (
maxConcurrency = runtime.NumCPU()
issuerWg sync.WaitGroup

l sync.Mutex
confirmedTxs uint64
totalTxs uint64

inflight atomic.Int64
sent atomic.Int64
Comment on lines -44 to -51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

type Spammer struct {
Expand All @@ -69,6 +62,11 @@ type Spammer struct {

// Number of accounts
numAccounts int

// keep track of variables shared across issuers
sent atomic.Int64
inflight atomic.Int64
issuerWg sync.WaitGroup
}

func NewSpammer(
Expand All @@ -80,19 +78,18 @@ func NewSpammer(
) *Spammer {
// Log Zipf participants
zipfSeed := rand.New(rand.NewSource(0)) //nolint:gosec

return &Spammer{
uris,
key,
balance,
zipfSeed,
sZipf,
vZipf,
txsPerSecond,
minTxsPerSecond,
txsPerSecondStep,
numClients,
numAccounts,
uris: uris,
key: key,
balance: balance,
zipfSeed: zipfSeed,
sZipf: sZipf,
vZipf: vZipf,
txsPerSecond: txsPerSecond,
minTxsPerSecond: minTxsPerSecond,
txsPerSecondStep: txsPerSecond,
numClients: numClients,
numAccounts: numAccounts,
}
}

Expand All @@ -104,6 +101,8 @@ func NewSpammer(
// [terminate] if true, the spammer will stop after reaching the target TPS.
// [symbol] and [decimals] are used to format the output.
func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbol string) error {
logger := &logger{}

// log distribution
s.logZipf(s.zipfSeed)

Expand All @@ -120,7 +119,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
if err != nil {
return err
}
actions := sh.GetTransfer(s.key.Address, 0, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, 0, s.uniqueBytes())
maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
Expand All @@ -143,7 +142,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}

// create issuers
issuers, err := s.createIssuers(parser)
issuers, err := s.createIssuers(parser, logger)
if err != nil {
return err
}
Expand All @@ -159,7 +158,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}

// set logging
issuers[0].logStats(cctx)
logger.logStats(cctx, issuers[0], &s.inflight, &s.sent)

// Broadcast txs
var (
Expand All @@ -180,7 +179,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
start := time.Now()

// Check to see if we should wait for pending txs
if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
if int64(currentTarget)+s.inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
consecutiveUnderBacklog = 0
consecutiveAboveBacklog++
if consecutiveAboveBacklog >= failedRunsToDecreaseTarget {
Expand Down Expand Up @@ -225,7 +224,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
fundsL.Unlock()

// Send transaction
actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes())
actions := sh.GetTransfer(recipient, amountToTransfer, s.uniqueBytes())
return issuer.Send(cctx, actions, factory, feePerTx)
})
}
Expand Down Expand Up @@ -267,7 +266,8 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}
// Wait for all issuers to finish
utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n")
issuerWg.Wait()
s.issuerWg.Wait()

maxUnits, err = chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
Expand All @@ -286,16 +286,22 @@ func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
}

// createIssuer creates an [numClients] transaction issuers for each URI in [uris]
func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) {
func (s *Spammer) createIssuers(parser chain.Parser, logger *logger) ([]*issuer, error) {
issuers := []*issuer{}

for i := 0; i < len(s.uris); i++ {
for j := 0; j < s.numClients; j++ {
cli := jsonrpc.NewJSONRPCClient(s.uris[i])
webSocketClient, err := ws.NewWebSocketClient(s.uris[i], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read
if err != nil {
return nil, err
}
issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i]}
issuer := &issuer{
i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i],
logger: logger,
issuerWg: &s.issuerWg,
inflight: &s.inflight,
}
issuers = append(issuers, issuer)
}
}
Expand Down Expand Up @@ -344,7 +350,7 @@ func (s *Spammer) distributeFunds(ctx context.Context, cli *jsonrpc.JSONRPCClien
factories[i] = f

// Send funds
actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes())
actions := sh.GetTransfer(pk.Address, distAmount, s.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -398,7 +404,7 @@ func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, p

// Send funds
returnAmt := balance - feePerTx
actions := sh.GetTransfer(s.key.Address, returnAmt, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, returnAmt, s.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx)
if err != nil {
return err
Expand All @@ -424,3 +430,7 @@ func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, p
)
return nil
}

func (s *Spammer) uniqueBytes() []byte {
return binary.BigEndian.AppendUint64(nil, uint64(s.sent.Add(1)))
}
12 changes: 0 additions & 12 deletions throughput/utils.go

This file was deleted.