Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global variables from spam script #1659

Merged
merged 8 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 8 additions & 54 deletions throughput/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package throughput
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -30,10 +29,13 @@ type issuer struct {
ws *ws.WebSocketClient
outstandingTxs int
abandoned error

// injected from the spammer
txProcessor *txProcessor
}

func (i *issuer) Start(ctx context.Context) {
issuerWg.Add(1)
i.txProcessor.issuerWg.Add(1)
go func() {
for {
_, wsErr, result, err := i.ws.ListenTx(context.TODO())
Expand All @@ -43,28 +45,14 @@ func (i *issuer) Start(ctx context.Context) {
i.l.Lock()
i.outstandingTxs--
i.l.Unlock()
inflight.Add(-1)
l.Lock()
if result != nil {
if result.Success {
confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
totalTxs++
l.Unlock()
i.txProcessor.inflight.Add(-1)
i.txProcessor.logResult(result, wsErr)
}
}()
go func() {
defer func() {
_ = i.ws.Close()
issuerWg.Done()
i.txProcessor.issuerWg.Done()
}()

<-ctx.Done()
Expand Down Expand Up @@ -98,7 +86,7 @@ func (i *issuer) Send(ctx context.Context, actions []chain.Action, factory chain
i.l.Lock()
i.outstandingTxs++
i.l.Unlock()
inflight.Add(1)
i.txProcessor.inflight.Add(1)

// Register transaction and recover upon failure
if err := i.ws.RegisterTx(tx); err != nil {
Expand Down Expand Up @@ -135,37 +123,3 @@ func getRandomIssuer(issuers []*issuer) *issuer {
index := rand.Int() % len(issuers)
return issuers[index]
}

func (i *issuer) logStats(cctx context.Context) {
// Log stats
t := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer t.Stop()
for {
select {
case <-t.C:
current := sent.Load()
l.Lock()
if totalTxs > 0 {
unitPrices, err := i.cli.UnitPrices(cctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
totalTxs,
float64(confirmedTxs)/float64(totalTxs)*100,
inflight.Load(),
current-psent,
unitPrices,
)
}
l.Unlock()
psent = current
case <-cctx.Done():
return
}
}
}()
}
87 changes: 87 additions & 0 deletions throughput/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package throughput

import (
"context"
"encoding/binary"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ava-labs/hypersdk/api/ws"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/utils"
)

type txProcessor struct {
samliok marked this conversation as resolved.
Show resolved Hide resolved
issuerWg sync.WaitGroup
inflight atomic.Int64

l sync.Mutex
confirmedTxs int
totalTxs int

sent atomic.Int64
}

func (tp *txProcessor) logResult(
result *chain.Result,
wsErr error,
) {
tp.l.Lock()
if result != nil {
if result.Success {
tp.confirmedTxs++
} else {
utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success)
}
} else {
// We can't error match here because we receive it over the wire.
if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) {
utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr)
}
}
tp.totalTxs++
tp.l.Unlock()
}

func (tp *txProcessor) logIssuerState(ctx context.Context, issuer *issuer) {
samliok marked this conversation as resolved.
Show resolved Hide resolved
// Log stats
t := time.NewTicker(1 * time.Second) // ensure no duplicates created
var psent int64
go func() {
defer t.Stop()
for {
select {
case <-t.C:
current := tp.sent.Load()
tp.l.Lock()
if tp.totalTxs > 0 {
unitPrices, err := issuer.cli.UnitPrices(ctx, false)
if err != nil {
continue
}
utils.Outf(
"{{yellow}}txs seen:{{/}} %d {{yellow}}success rate:{{/}} %.2f%% {{yellow}}inflight:{{/}} %d {{yellow}}issued/s:{{/}} %d {{yellow}}unit prices:{{/}} [%s]\n", //nolint:lll
tp.totalTxs,
float64(tp.confirmedTxs)/float64(tp.totalTxs)*100,
tp.inflight.Load(),
current-psent,
unitPrices,
)
}
tp.l.Unlock()
psent = current
case <-ctx.Done():
return
}
}
}()
}

func (tp *txProcessor) uniqueBytes() []byte {
return binary.BigEndian.AppendUint64(nil, uint64(tp.sent.Add(1)))
}
39 changes: 22 additions & 17 deletions throughput/spam.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os/signal"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -41,14 +40,6 @@ const (
// TODO: remove the use of global variables
var (
maxConcurrency = runtime.NumCPU()
issuerWg sync.WaitGroup

l sync.Mutex
confirmedTxs uint64
totalTxs uint64

inflight atomic.Int64
sent atomic.Int64
Comment on lines -44 to -51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

type Spammer struct {
Expand All @@ -69,11 +60,15 @@ type Spammer struct {

// Number of accounts
numAccounts int

// keep track of variables shared across issuers
txProcessor *txProcessor
}

func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) {
// Log Zipf participants
zipfSeed := rand.New(rand.NewSource(0)) //nolint:gosec
txProcessor := txProcessor{}
balance, err := sh.LookupBalance(sc.key.Address)
if err != nil {
return nil, err
Expand All @@ -92,6 +87,8 @@ func NewSpammer(sc *Config, sh SpamHelper) (*Spammer, error) {
txsPerSecondStep: sc.txsPerSecondStep,
numClients: sc.numClients,
numAccounts: sc.numAccounts,

txProcessor: &txProcessor,
samliok marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

Expand Down Expand Up @@ -119,7 +116,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
if err != nil {
return err
}
actions := sh.GetTransfer(s.key.Address, 0, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, 0, s.txProcessor.uniqueBytes())
maxUnits, err := chain.EstimateUnits(parser.Rules(time.Now().UnixMilli()), actions, factory)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +151,7 @@ func (s *Spammer) Spam(ctx context.Context, sh SpamHelper, terminate bool, symbo
}

// set logging
issuers[0].logStats(cctx)
s.txProcessor.logIssuerState(cctx, issuers[0])

// broadcast transactions
s.broadcast(cctx, cancel, sh, accounts, funds, factories, issuers, feePerTx, terminate)
Expand Down Expand Up @@ -202,7 +199,7 @@ func (s Spammer) broadcast(
start := time.Now()

// Check to see if we should wait for pending txs
if int64(currentTarget)+inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
if int64(currentTarget)+s.txProcessor.inflight.Load() > int64(currentTarget*pendingTargetMultiplier) {
consecutiveUnderBacklog = 0
consecutiveAboveBacklog++
if consecutiveAboveBacklog >= failedRunsToDecreaseTarget {
Expand Down Expand Up @@ -247,7 +244,7 @@ func (s Spammer) broadcast(
fundsL.Unlock()

// Send transaction
actions := sh.GetTransfer(recipient, amountToTransfer, uniqueBytes())
actions := sh.GetTransfer(recipient, amountToTransfer, s.txProcessor.uniqueBytes())
return issuer.Send(ctx, actions, factory, feePerTx)
})
}
Expand Down Expand Up @@ -290,7 +287,7 @@ func (s Spammer) broadcast(

// Wait for all issuers to finish
utils.Outf("{{yellow}}waiting for issuers to return{{/}}\n")
issuerWg.Wait()
s.txProcessor.issuerWg.Wait()
}

func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
Expand All @@ -306,14 +303,22 @@ func (s *Spammer) logZipf(zipfSeed *rand.Rand) {
// createIssuer creates an [numClients] transaction issuers for each URI in [uris]
func (s *Spammer) createIssuers(parser chain.Parser) ([]*issuer, error) {
issuers := []*issuer{}

for i := 0; i < len(s.uris); i++ {
for j := 0; j < s.numClients; j++ {
cli := jsonrpc.NewJSONRPCClient(s.uris[i])
webSocketClient, err := ws.NewWebSocketClient(s.uris[i], ws.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) // we write the max read
if err != nil {
return nil, err
}
issuer := &issuer{i: len(issuers), cli: cli, ws: webSocketClient, parser: parser, uri: s.uris[i]}
issuer := &issuer{
i: len(issuers),
cli: cli,
ws: webSocketClient,
parser: parser,
uri: s.uris[i],
txProcessor: s.txProcessor,
}
issuers = append(issuers, issuer)
}
}
Expand Down Expand Up @@ -362,7 +367,7 @@ func (s *Spammer) distributeFunds(ctx context.Context, cli *jsonrpc.JSONRPCClien
factories[i] = f

// Send funds
actions := sh.GetTransfer(pk.Address, distAmount, uniqueBytes())
actions := sh.GetTransfer(pk.Address, distAmount, s.txProcessor.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factory, feePerTx)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -416,7 +421,7 @@ func (s *Spammer) returnFunds(ctx context.Context, cli *jsonrpc.JSONRPCClient, p

// Send funds
returnAmt := balance - feePerTx
actions := sh.GetTransfer(s.key.Address, returnAmt, uniqueBytes())
actions := sh.GetTransfer(s.key.Address, returnAmt, s.txProcessor.uniqueBytes())
_, tx, err := cli.GenerateTransactionManual(parser, actions, factories[i], feePerTx)
if err != nil {
return err
Expand Down
12 changes: 0 additions & 12 deletions throughput/utils.go

This file was deleted.

Loading