Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Adolfo García Veytia (Puerco) <[email protected]>
  • Loading branch information
derekperkins authored and puerco committed Jul 23, 2024
1 parent 557c7f2 commit ae4180e
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 1 deletion.
6 changes: 6 additions & 0 deletions throttler/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.glue
!empty
.idea
server.command
*.bak
*.out
70 changes: 69 additions & 1 deletion throttler/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,69 @@
# throttler
# Throttler - intelligent WaitGroups

[![GoDoc](https://godoc.org/github.com/nozzle/throttler?status.svg)](http://godoc.org/github.com/nozzle/throttler)


Throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines with channels. The API is almost identical to Wait Groups, but it allows you to set a max number of workers that can be running simultaneously. It uses channels internally to block until a job completes by calling Done() or until all jobs have been completed.

See a fully functional example on the playground at http://bit.ly/throttler-docs

Compare the Throttler example to the sync.WaitGroup example from http://golang.org/pkg/sync/#example_WaitGroup

*100% test coverage*

### How to use Throttler

```
// This example fetches several URLs concurrently,
// using a Throttler to block until all the fetches are complete.
// Compare to http://golang.org/pkg/sync/#example_WaitGroup
func ExampleThrottler() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
// Create a new Throttler that will get 2 urls at a time
t := NewThrottler(2, len(urls))
for _, url := range urls {
// Launch a goroutine to fetch the URL.
go func(url string) {
// Let Throttler know when the goroutine completes
// so it can dispatch another worker
defer t.Done()
// Fetch the URL.
http.Get(url)
}(url)
// Pauses until a worker is available or all jobs have been completed
t.Throttle()
}
}
```

### vs How to use a sync.WaitGroup

```
// This example fetches several URLs concurrently,
// using a WaitGroup to block until all the fetches are complete.
func ExampleWaitGroup() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
```
37 changes: 37 additions & 0 deletions throttler/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package throttler

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

// This example fetches several URLs concurrently,
// using a WaitGroup to block until all the fetches are complete.
func ExampleWaitGroup() {
}

// This example fetches several URLs concurrently,
// using a Throttler to block until all the fetches are complete.
// Compare to http://golang.org/pkg/sync/#example_WaitGroup
func ExampleThrottler() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
// Create a new Throttler that will get 2 urls at a time
t := NewThrottler(2, len(urls))
for _, url := range urls {
// Launch a goroutine to fetch the URL.
go func(url string) {
// Let Throttler know when the goroutine completes
// so it can dispatch another worker
defer t.Done()
// Fetch the URL.
http.Get(url)
}(url)
// Pauses until a worker is available or all jobs have been completed
t.Throttle()
}
}
64 changes: 64 additions & 0 deletions throttler/throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines
// with channels. The API is almost identical to Wait Groups, but it allows you to set
// a max number of workers that can be running simultaneously. It uses channels internally
// to block until a job completes by calling Done() or until all jobs have been completed.
//
// Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup
//
// See a fully functional example on the playground at http://bit.ly/throttler-docs
package throttler

type Throttler struct {
maxWorkers int
workerCount int
totalJobs int
jobsStarted int
jobsCompleted int
doneChan chan bool
}

// NewThrottler returns a Throttler that will govern the max number of workers and will
// work with the total number of jobs. It panics if maxWorkers < 1.
func NewThrottler(maxWorkers, totalJobs int) *Throttler {
if maxWorkers < 1 {
panic("maxWorkers has to be at least 1")
}
return &Throttler{
maxWorkers: maxWorkers,
totalJobs: totalJobs,
doneChan: make(chan bool, totalJobs),
}
}

// Throttle works similarly to sync.WaitGroup, except inside your goroutine dispatch
// loop rather than after. It will not block until the number of active workers
// matches the max number of workers designated in the call to NewThrottler or
// all of the jobs have been dispatched. It stops blocking when Done has been called
// as many times as totalJobs.
func (t *Throttler) Throttle() {
if t.totalJobs < 1 {
return
}
t.jobsStarted++
t.workerCount++

if t.workerCount == t.maxWorkers {
<-t.doneChan
t.jobsCompleted++
t.workerCount--
}

if t.jobsStarted == t.totalJobs {
for t.jobsCompleted < t.totalJobs {
<-t.doneChan
t.jobsCompleted++
}
}
}

// Done lets Throttler know that a job has been completed so that another worker
// can be activated. If Done is called less times than totalJobs,
// Throttle will block forever
func (t *Throttler) Done() {
t.doneChan <- true
}
69 changes: 69 additions & 0 deletions throttler/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package throttler

import (
"math/rand"
"testing"
"time"
)

func TestThrottle(t *testing.T) {
var tests = []struct {
Desc string
Jobs []string
MaxWorkers int
TotalJobs int
}{
{
"Standard implementation",
[]string{"job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10",
"job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20",
"job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30",
"job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40",
"job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50"},
5,
-1,
}, {
"Incorrectly has 0 as TotalWorkers",
[]string{"job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10",
"job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20",
"job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30",
"job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40",
"job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50"},
5,
0,
}, {
"More workers than jobs",
[]string{"job01", "job02", "job03", "job04", "job05", "job06", "job07", "job08", "job09", "job10",
"job11", "job12", "job13", "job14", "job15", "job16", "job17", "job18", "job19", "job20",
"job21", "job22", "job23", "job24", "job25", "job26", "job27", "job28", "job29", "job30",
"job31", "job32", "job33", "job34", "job35", "job36", "job37", "job38", "job39", "job40",
"job41", "job42", "job43", "job44", "job45", "job46", "job47", "job48", "job49", "job50"},
50000,
-1,
},
}

for _, test := range tests {
totalJobs := len(test.Jobs)
if test.TotalJobs != -1 {
totalJobs = test.TotalJobs
}
th := NewThrottler(test.MaxWorkers, totalJobs)
for _, job := range test.Jobs {
go func(job string, th *Throttler) {
defer th.Done()
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}(job, th)
th.Throttle()
}
}
}

func TestThrottlePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal()
}
}()
NewThrottler(0, 100)
}

0 comments on commit ae4180e

Please sign in to comment.