Skip to content

Commit

Permalink
.Done() now requires an error / nil to be passed into it Added an err…
Browse files Browse the repository at this point in the history
…or channel + errorCount

Signed-off-by: Adolfo García Veytia (Puerco) <[email protected]>
  • Loading branch information
derekperkins authored and puerco committed Jul 23, 2024
1 parent da4f04a commit 7d4550d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 10 deletions.
7 changes: 4 additions & 3 deletions throttler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
[![GoDoc](https://godoc.org/github.com/nozzle/throttler?status.svg)](http://godoc.org/github.com/nozzle/throttler) [![Coverage Status](https://coveralls.io/repos/nozzle/throttler/badge.svg?branch=master)](https://coveralls.io/r/nozzle/throttler?branch=master) [ ![Codeship Status for nozzle/throttler](https://codeship.com/projects/02d33900-a744-0132-4353-2eb3789e9959/status?branch=master)](https://codeship.com/projects/67187)


Throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines with channels. The API is almost identical to Wait Groups, but it allows you to set a max number of workers that can be running simultaneously. It uses channels internally to block until a job completes by calling Done() or until all jobs have been completed.
Throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines with channels. The API is almost identical to Wait Groups, but it allows you to set a max number of workers that can be running simultaneously. It uses channels internally to block until a job completes by calling Done() or until all jobs have been completed. It also provides a built in error channel that captures your goroutine errors and provides access to them as `[]error` after you exit the loop.

See a fully functional example on the playground at http://bit.ly/throttler-docs
See a fully functional example on the playground at http://bit.ly/throttler-docs-v2

Compare the Throttler example to the sync.WaitGroup example from http://golang.org/pkg/sync/#example_WaitGroup

*100% test coverage*
*3/12 - Breaking change*
Throttler handles errors by default now and `Done` requires an error to be passed into it. If your goroutine doesn't generate errors, just call `Done(nil)` and there won't be any performance impact.

### How to use Throttler

Expand Down
2 changes: 1 addition & 1 deletion throttler/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ExampleThrottler() {
go func(url string) {
// Let Throttler know when the goroutine completes
// so it can dispatch another worker
defer t.Done()
defer t.Done(nil)
// Fetch the URL.
http.Get(url)
}(url)
Expand Down
32 changes: 27 additions & 5 deletions throttler/throttler.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
// Throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines
// with channels. The API is almost identical to Wait Groups, but it allows you to set
// a max number of workers that can be running simultaneously. It uses channels internally
// to block until a job completes by calling Done() or until all jobs have been completed.
// to block until a job completes by calling Done(err) or until all jobs have been completed.
//
// After exiting the loop where you are using Throttler, you can call the .Err method to get
// an array of all the goroutine errors that occurred.
//
// Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup
//
// See a fully functional example on the playground at http://bit.ly/throttler-docs
// See a fully functional example on the playground at http://bit.ly/throttler-docs-v2
package throttler

import "sync"

type Throttler struct {
maxWorkers int
workerCount int
totalJobs int
jobsStarted int
jobsCompleted int
doneChan chan struct{}
errsMutex *sync.Mutex
errs []error
errorCount int
}

// New returns a Throttler that will govern the max number of workers and will
Expand All @@ -27,6 +35,7 @@ func New(maxWorkers, totalJobs int) *Throttler {
maxWorkers: maxWorkers,
totalJobs: totalJobs,
doneChan: make(chan struct{}, totalJobs),
errsMutex: &sync.Mutex{},
}
}

Expand All @@ -35,9 +44,9 @@ func New(maxWorkers, totalJobs int) *Throttler {
// matches the max number of workers designated in the call to NewThrottler or
// all of the jobs have been dispatched. It stops blocking when Done has been called
// as many times as totalJobs.
func (t *Throttler) Throttle() {
func (t *Throttler) Throttle() int {
if t.totalJobs < 1 {
return
return t.errorCount
}
t.jobsStarted++
t.workerCount++
Expand All @@ -54,11 +63,24 @@ func (t *Throttler) Throttle() {
t.jobsCompleted++
}
}

return t.errorCount
}

// Done lets Throttler know that a job has been completed so that another worker
// can be activated. If Done is called less times than totalJobs,
// Throttle will block forever
func (t *Throttler) Done() {
func (t *Throttler) Done(err error) {
t.doneChan <- struct{}{}
if err != nil {
t.errsMutex.Lock()
t.errs = append(t.errs, err)
t.errorCount++
t.errsMutex.Unlock()
}
}

// Err returns a slice of any errors that were received from calling Done()
func (t *Throttler) Err() []error {
return t.errs
}
48 changes: 47 additions & 1 deletion throttler/throttler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package throttler

import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
)
Expand Down Expand Up @@ -51,14 +53,58 @@ func TestThrottle(t *testing.T) {
th := New(test.MaxWorkers, totalJobs)
for _, job := range test.Jobs {
go func(job string, th *Throttler) {
defer th.Done()
defer th.Done(nil)
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}(job, th)
th.Throttle()
}
}
}

func TestThrottleWithErrors(t *testing.T) {
var tests = []struct {
Desc string
Jobs []string
MaxWorkers int
TotalJobs int
}{
{
"Standard implementation",
[]string{"job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10",
"job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20",
"job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30",
"job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40",
"job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50"},
5,
-1,
},
}

for _, test := range tests {
totalJobs := len(test.Jobs)
if test.TotalJobs != -1 {
totalJobs = test.TotalJobs
}
th := New(test.MaxWorkers, totalJobs)
for _, job := range test.Jobs {
go func(job string, th *Throttler) {
jobNum, _ := strconv.ParseInt(job[len(job)-2:], 10, 8)
var err error
if jobNum%2 != 0 {
err = fmt.Errorf("Error on %s", job)
}
defer th.Done(err)

time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}(job, th)
th.Throttle()
}
if len(th.Err()) != totalJobs/2 {
t.Fatal()
}
}
}

func TestThrottlePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
Expand Down

0 comments on commit 7d4550d

Please sign in to comment.