Skip to content

Commit

Permalink
wip on pace
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Oct 20, 2020
1 parent afa1227 commit 7fb2f0d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 77 deletions.
6 changes: 5 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ issues:

# Debug log Sync() error check in defer in main
- path: cmd/ghz/main.go
text: "Error return value of `logger.Sync` is not checked"
text: "Error return value of `logger.Sync` is not checked"

# sync.once copy in pacer test
- path: load/pacer_test.go
text: "copylocks"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
go.uber.org/multierr v1.3.0
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e // indirect
golang.org/x/tools v0.0.0-20200502202811-ed308ab3e770 // indirect
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
6 changes: 3 additions & 3 deletions load/pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (cp *ConstantPacer) String() string {
// Pace determines the length of time to sleep until the next hit is sent.
func (cp *ConstantPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {

if hits >= cp.Max {
if cp.Max > 0 && hits >= cp.Max {
return 0, true
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (p *StepPacer) initialize() {
// Pace determines the length of time to sleep until the next hit is sent.
func (p *StepPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {

if hits >= p.Max {
if p.Max > 0 && hits >= p.Max {
return 0, true
}

Expand Down Expand Up @@ -294,7 +294,7 @@ func (p *LinearPacer) initialize() {

// Pace determines the length of time to sleep until the next hit is sent.
func (p *LinearPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {
if hits >= p.Max {
if p.Max > 0 && hits >= p.Max {
return 0, true
}

Expand Down
39 changes: 18 additions & 21 deletions load/worker_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,25 @@ func (c *StepWorkerTicker) Run() {

go func() {
fmt.Println("step ticker start")
for {
select {
case <-ticker.C:
fmt.Println("worker ticker", time.Since(begin))
if c.LoadDuration > 0 && time.Since(begin) > c.LoadDuration {
fmt.Println("duration stop reached", wc, time.Since(begin))

if c.Stop > 0 {
c.C <- TickValue{Delta: int(c.Stop - uint(wc))}
}

done <- true
return
} else if (c.Stop > 0 && stepUp && wc >= int(c.Stop)) ||
(!stepUp && wc <= int(c.Stop)) || wc <= 0 {
fmt.Println("stop reached", wc, time.Since(begin))
done <- true
return
} else {
c.C <- TickValue{Delta: c.Step}
wc = wc + c.Step
for range ticker.C {
fmt.Println("worker ticker", time.Since(begin))
if c.LoadDuration > 0 && time.Since(begin) > c.LoadDuration {
fmt.Println("duration stop reached", wc, time.Since(begin))

if c.Stop > 0 {
c.C <- TickValue{Delta: int(c.Stop - uint(wc))}
}

done <- true
return
} else if (c.Stop > 0 && stepUp && wc >= int(c.Stop)) ||
(!stepUp && wc <= int(c.Stop)) || wc <= 0 {
fmt.Println("stop reached", wc, time.Since(begin))
done <- true
return
} else {
c.C <- TickValue{Delta: c.Step}
wc = wc + c.Step
}
}
}()
Expand Down
84 changes: 40 additions & 44 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type Requester struct {
stopCh chan bool
start time.Time

qpsTick time.Duration

arrayJSONData []string

lock sync.Mutex
Expand Down Expand Up @@ -190,7 +188,6 @@ func (b *Requester) Run() (*Report, error) {

// Stop stops the test
func (b *Requester) Stop(reason StopReason) {
fmt.Println("stop():", reason)

b.stopCh <- true

Expand Down Expand Up @@ -228,7 +225,12 @@ func (b *Requester) Finish() *Report {
b.config.log.Debug("Finilizing report")
}

return b.reporter.Finalize(b.stopReason, total)
var r StopReason
b.lock.Lock()
r = b.stopReason
b.lock.Unlock()

return b.reporter.Finalize(r, total)
}

func (b *Requester) openClientConns() ([]*grpc.ClientConn, error) {
Expand Down Expand Up @@ -345,49 +347,51 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {

go func() {
n := 0
wc := 0
for tv := range wct {
fmt.Println(tv)
if tv.Delta > 0 {
wID := "g" + strconv.Itoa(len(b.workers)+1) + "c" + strconv.Itoa(n)

fmt.Println("wid:", wID)
for i := 0; i < tv.Delta; i++ {
wID := "g" + strconv.Itoa(wc) + "c" + strconv.Itoa(n)

if len(b.config.name) > 0 {
wID = b.config.name + ":" + wID
}
if len(b.config.name) > 0 {
wID = b.config.name + ":" + wID
}

if b.config.hasLog {
b.config.log.Debugw("Creating worker with ID: "+wID,
"workerID", wID, "requests per worker")
}
if b.config.hasLog {
b.config.log.Debugw("Creating worker with ID: "+wID,
"workerID", wID, "requests per worker")
}

w := Worker{
ticks: ticks,
active: true,
counter: &counter,
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: make(chan bool),
workerID: wID,
arrayJSONData: b.arrayJSONData,
}
w := Worker{
ticks: ticks,
active: true,
counter: &counter,
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: make(chan bool),
workerID: wID,
arrayJSONData: b.arrayJSONData,
}

n++ // increment connection counter
wc++ // increment worker id

// wrap around connections if needed
if n == b.config.nConns {
n = 0
}
n++ // increment connection counter

wm.Lock()
b.workers = append(b.workers, &w)
wm.Unlock()
// wrap around connections if needed
if n == b.config.nConns {
n = 0
}

go func() {
errC <- w.runWorker()
}()
wm.Lock()
b.workers = append(b.workers, &w)
wm.Unlock()

go func() {
errC <- w.runWorker()
}()
}
} else {
nd := -1 * tv.Delta
wm.Lock()
Expand All @@ -409,8 +413,6 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {
}()

go func() {
fmt.Println("ticker goroutine")

defer close(ticks)
defer wt.Finish()

Expand Down Expand Up @@ -447,19 +449,13 @@ func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error {
}
}()

fmt.Println("waiting for done.")

<-done

fmt.Println("done. waiting for multi err.")

var err error
for i := 0; i < len(b.workers); i++ {
err = multierr.Append(err, <-errC)
}

fmt.Println("done run: ", counter.Get())

return err
}

Expand Down
19 changes: 11 additions & 8 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
Expand All @@ -37,20 +38,22 @@ type Worker struct {

func (w *Worker) runWorker() error {
var err error
g := new(errgroup.Group)

for {
select {
case <-w.stopCh:
return err
if w.config.async {
return g.Wait()
} else {
return err
}
case <-w.ticks:
if w.config.async {
go func() {
rErr := w.makeRequest()
multierr.Append(err, rErr)
}()
g.Go(w.makeRequest)
} else {
rErr := w.makeRequest()
multierr.Append(err, rErr)
err = multierr.Append(err, rErr)
}
}
}
Expand All @@ -76,7 +79,7 @@ func (w *Worker) makeRequest() error {

// try the optimized path for JSON data for non client-streaming
if !w.config.binary && !w.mtd.IsClientStreaming() && len(w.arrayJSONData) > 0 {
indx := int((reqNum - 1) % int64(len(w.arrayJSONData))) // we want to start from inputs[0] so dec reqNum
indx := int(reqNum % int64(len(w.arrayJSONData))) // we want to start from inputs[0] so dec reqNum
if inputs, err = w.getMessages(ctd, []byte(w.arrayJSONData[indx])); err != nil {
return err
}
Expand Down Expand Up @@ -138,7 +141,7 @@ func (w *Worker) makeRequest() error {
if inputsLen == 0 {
return fmt.Errorf("no data provided for request")
}
inputIdx := int((reqNum - 1) % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum
inputIdx := int(reqNum % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum
unaryInput := inputs[inputIdx]

// RPC errors are handled via stats handler
Expand Down

0 comments on commit 7fb2f0d

Please sign in to comment.