-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #108 from puerco/import-nozzle-throttle
Add new throttler pkg forked from nozzle/throttler
- Loading branch information
Showing
4 changed files
with
680 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Throttler - Intelligent WaitGroups | ||
|
||
[![GoDoc](https://pkg.go.dev/sigs.k8s.io/release-utils/throttler?status.svg)](https://pkg.go.dev/sigs.k8s.io/release-utils/throttler?status.svg) | ||
|
||
__Note:__ This package was adopted by the Kubernetes RelEng team to continue its | ||
maintenance, it was forked from github.com/nozzle/throttle at | ||
[2ea9822](https://github.com/nozzle/throttler/commit/2ea982251481626167b7f83be1434b5c42540c1a). | ||
|
||
Throttler fills the gap between sync.WaitGroup and manually monitoring your | ||
goroutines with channels. The API is almost identical to Wait Groups, but it | ||
allows you to set a max number of workers that can be running simultaneously. | ||
It uses channels internally to block until a job completes by calling Done() or | ||
until all jobs have been completed. It also provides a built in error channel | ||
that captures your goroutine errors and provides access to them as `[]error` | ||
after you exit the loop. | ||
|
||
See a fully functional example of the original module on the playground at http://bit.ly/throttler-v3 | ||
|
||
Compare the Throttler example to the sync.WaitGroup example from http://golang.org/pkg/sync/#example_WaitGroup | ||
|
||
### How to use Throttler | ||
|
||
```golang | ||
// This example fetches several URLs concurrently, | ||
// using a Throttler to block until all the fetches are complete. | ||
// Compare to http://golang.org/pkg/sync/#example_WaitGroup | ||
func ExampleThrottler() { | ||
var urls = []string{ | ||
"http://www.golang.org/", | ||
"http://www.google.com/", | ||
"http://www.somestupidname.com/", | ||
} | ||
// Create a new Throttler that will get 2 urls at a time | ||
t := throttler.New(2, len(urls)) | ||
for _, url := range urls { | ||
// Launch a goroutine to fetch the URL. | ||
go func(url string) { | ||
// Fetch the URL. | ||
err := http.Get(url) | ||
// Let Throttler know when the goroutine completes | ||
// so it can dispatch another worker | ||
t.Done(err) | ||
}(url) | ||
// Pauses until a worker is available or all jobs have been completed | ||
// Returning the total number of goroutines that have errored | ||
// lets you choose to break out of the loop without starting any more | ||
errorCount := t.Throttle() | ||
} | ||
} | ||
``` | ||
|
||
### vs How to use a sync.WaitGroup | ||
|
||
```golang | ||
// This example fetches several URLs concurrently, | ||
// using a WaitGroup to block until all the fetches are complete. | ||
func ExampleWaitGroup() { | ||
var wg sync.WaitGroup | ||
var urls = []string{ | ||
"http://www.golang.org/", | ||
"http://www.google.com/", | ||
"http://www.somestupidname.com/", | ||
} | ||
for _, url := range urls { | ||
// Increment the WaitGroup counter. | ||
wg.Add(1) | ||
// Launch a goroutine to fetch the URL. | ||
go func(url string) { | ||
// Decrement the counter when the goroutine completes. | ||
defer wg.Done() | ||
// Fetch the URL. | ||
http.Get(url) | ||
}(url) | ||
} | ||
// Wait for all HTTP fetches to complete. | ||
wg.Wait() | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// This package was forked and adapted from the original at | ||
// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a | ||
// full commit history has been preserved. | ||
|
||
package throttler | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
) | ||
|
||
type httpPkg struct{} | ||
|
||
func (httpPkg) Get(_ string) error { return nil } | ||
|
||
var http httpPkg | ||
|
||
// This example fetches several URLs concurrently, | ||
// using a WaitGroup to block until all the fetches are complete. | ||
// | ||
//nolint:testableexamples // TODO - Rewrite examples | ||
func ExampleWaitGroup() { | ||
} | ||
|
||
// This example fetches several URLs concurrently, | ||
// using a Throttler to block until all the fetches are complete. | ||
// Compare to http://golang.org/pkg/sync/#example_WaitGroup | ||
// | ||
//nolint:testableexamples // TODO - Rewrite examples | ||
func ExampleThrottler() { | ||
urls := []string{ | ||
"http://www.golang.org/", | ||
"http://www.google.com/", | ||
"http://www.somestupidname.com/", | ||
} | ||
// Create a new Throttler that will get 2 urls at a time | ||
t := New(2, len(urls)) | ||
for _, url := range urls { | ||
// Launch a goroutine to fetch the URL. | ||
go func(url string) { | ||
// Fetch the URL. | ||
err := http.Get(url) | ||
// Let Throttler know when the goroutine completes | ||
// so it can dispatch another worker | ||
t.Done(err) | ||
}(url) | ||
// Pauses until a worker is available or all jobs have been completed | ||
// Returning the total number of goroutines that have errored | ||
// lets you choose to break out of the loop without starting any more | ||
errorCount := t.Throttle() | ||
if errorCount > 0 { | ||
break | ||
} | ||
} | ||
} | ||
|
||
// This example fetches several URLs concurrently, | ||
// using a Throttler to block until all the fetches are complete | ||
// and checks the errors returned. | ||
// Compare to http://golang.org/pkg/sync/#example_WaitGroup | ||
// | ||
//nolint:testableexamples // TODO - Rewrite examples | ||
func ExampleThrottler_errors() { | ||
urls := []string{ | ||
"http://www.golang.org/", | ||
"http://www.google.com/", | ||
"http://www.somestupidname.com/", | ||
} | ||
// Create a new Throttler that will get 2 urls at a time | ||
t := New(2, len(urls)) | ||
for _, url := range urls { | ||
// Launch a goroutine to fetch the URL. | ||
go func(url string) { | ||
// Let Throttler know when the goroutine completes | ||
// so it can dispatch another worker | ||
defer t.Done(nil) | ||
// Fetch the URL. | ||
if err := http.Get(url); err != nil { | ||
fmt.Fprintf(os.Stderr, "error fetching %q: %v", url, err) | ||
} | ||
}(url) | ||
// Pauses until a worker is available or all jobs have been completed | ||
t.Throttle() | ||
} | ||
|
||
if t.Err() != nil { | ||
// Loop through the errors to see the details | ||
for i, err := range t.Errs() { | ||
fmt.Printf("error #%d: %s", i, err) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// This package was forked and adapted from the original at | ||
// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a | ||
// full commit history has been preserved. | ||
|
||
// Package throttler fills the gap between sync.WaitGroup and manually monitoring your goroutines | ||
// with channels. The API is almost identical to Wait Groups, but it allows you to set | ||
// a max number of workers that can be running simultaneously. It uses channels internally | ||
// to block until a job completes by calling Done(err) or until all jobs have been completed. | ||
// | ||
// After exiting the loop where you are using Throttler, you can call the `Err` or `Errs` method to check | ||
// for errors. `Err` will return a single error representative of all the errors Throttler caught. The | ||
// `Errs` method will return all the errors as a slice of errors (`[]error`). | ||
// | ||
// Compare the Throttler example to the sync.WaitGroup example http://golang.org/pkg/sync/#example_WaitGroup | ||
// | ||
// This package was forked and adapted from the original at | ||
// pkg:golang/github.com/nozzle/throttler@2ea982251481626167b7f83be1434b5c42540c1a | ||
// full commit history has been preserved. | ||
// | ||
// See a fully functional example of the original package on the playground at http://bit.ly/throttler-v3 | ||
package throttler | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
// Throttler stores all the information about the number of workers, the active | ||
// workers and error information. | ||
type Throttler struct { | ||
maxWorkers int32 | ||
workerCount int32 | ||
batchingTotal int32 | ||
batchSize int32 | ||
totalJobs int32 | ||
jobsStarted int32 | ||
jobsCompleted int32 | ||
doneChan chan struct{} | ||
errsMutex *sync.Mutex | ||
errs []error | ||
errorCount int32 | ||
} | ||
|
||
// New returns a Throttler that will govern the max number of workers and will | ||
// work with the total number of jobs. It panics if maxWorkers < 1. | ||
func New(maxWorkers, totalJobs int) *Throttler { | ||
if maxWorkers < 1 { | ||
panic("maxWorkers has to be at least 1") | ||
} | ||
return &Throttler{ | ||
maxWorkers: int32(maxWorkers), | ||
batchSize: 1, | ||
totalJobs: int32(totalJobs), | ||
doneChan: make(chan struct{}, totalJobs), | ||
errsMutex: &sync.Mutex{}, | ||
} | ||
} | ||
|
||
// NewBatchedThrottler returns a Throttler (just like New), but also enables batching. | ||
func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler { | ||
totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize))) | ||
t := New(maxWorkers, totalJobs) | ||
t.batchSize = int32(batchSize) | ||
t.batchingTotal = int32(batchingTotal) | ||
return t | ||
} | ||
|
||
// SetMaxWorkers lets you change the total number of workers that can run concurrently. NOTE: If | ||
// all workers are currently running, this setting is not guaranteed to take effect until one of them | ||
// completes and Throttle() is called again. | ||
func (t *Throttler) SetMaxWorkers(maxWorkers int) { | ||
if maxWorkers < 1 { | ||
panic("maxWorkers has to be at least 1") | ||
} | ||
atomic.StoreInt32(&t.maxWorkers, int32(maxWorkers)) | ||
} | ||
|
||
// Throttle works similarly to sync.WaitGroup, except inside your goroutine dispatch | ||
// loop rather than after. It will not block until the number of active workers | ||
// matches the max number of workers designated in the call to NewThrottler or | ||
// all of the jobs have been dispatched. It stops blocking when Done has been called | ||
// as many times as totalJobs. | ||
func (t *Throttler) Throttle() int { | ||
if atomic.LoadInt32(&t.totalJobs) < 1 { | ||
return int(atomic.LoadInt32(&t.errorCount)) | ||
} | ||
atomic.AddInt32(&t.jobsStarted, 1) | ||
atomic.AddInt32(&t.workerCount, 1) | ||
|
||
// check to see if the current number of workers equals the max number of workers | ||
// if they are equal, wait for one to finish before continuing. | ||
if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) { | ||
atomic.AddInt32(&t.jobsCompleted, 1) | ||
atomic.AddInt32(&t.workerCount, -1) | ||
<-t.doneChan | ||
} | ||
|
||
// check to see if all of the jobs have been started, and if so, wait until all | ||
// jobs have been completed before continuing. | ||
if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) { | ||
for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) { | ||
atomic.AddInt32(&t.jobsCompleted, 1) | ||
<-t.doneChan | ||
} | ||
} | ||
|
||
return int(atomic.LoadInt32(&t.errorCount)) | ||
} | ||
|
||
// Done lets Throttler know that a job has been completed so that another worker | ||
// can be activated. If Done is called less times than totalJobs, | ||
// Throttle will block forever. | ||
func (t *Throttler) Done(err error) { | ||
if err != nil { | ||
t.errsMutex.Lock() | ||
t.errs = append(t.errs, err) | ||
atomic.AddInt32(&t.errorCount, 1) | ||
t.errsMutex.Unlock() | ||
} | ||
t.doneChan <- struct{}{} | ||
} | ||
|
||
// Err returns an error representative of all errors caught by throttler. | ||
func (t *Throttler) Err() error { | ||
t.errsMutex.Lock() | ||
defer t.errsMutex.Unlock() | ||
if atomic.LoadInt32(&t.errorCount) == 0 { | ||
return nil | ||
} | ||
return multiErrors(t.errs) | ||
} | ||
|
||
// Errs returns a slice of any errors that were received from calling Done(). | ||
func (t *Throttler) Errs() []error { | ||
t.errsMutex.Lock() | ||
defer t.errsMutex.Unlock() | ||
return t.errs | ||
} | ||
|
||
type multiErrors []error | ||
|
||
func (te multiErrors) Error() string { | ||
errString := te[0].Error() | ||
if len(te) > 1 { | ||
errString += fmt.Sprintf(" (and %d more errors)", len(te)-1) | ||
} | ||
return errString | ||
} | ||
|
||
// BatchStartIndex returns the starting index for the next batch. The job count isn't modified | ||
// until th.Throttle() is called, so if you don't call Throttle before executing this | ||
// again, it will return the same index as before. | ||
func (t *Throttler) BatchStartIndex() int { | ||
return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize)) | ||
} | ||
|
||
// BatchEndIndex returns the ending index for the next batch. It either returns the full batch size | ||
// or the remaining amount of jobs. The job count isn't modified | ||
// until th.Throttle() is called, so if you don't call Throttle before executing this | ||
// again, it will return the same index as before. | ||
func (t *Throttler) BatchEndIndex() int { | ||
end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize) | ||
if end > atomic.LoadInt32(&t.batchingTotal) { | ||
end = atomic.LoadInt32(&t.batchingTotal) | ||
} | ||
return int(end) | ||
} | ||
|
||
// TotalJobs returns the total number of jobs throttler is performing. | ||
func (t *Throttler) TotalJobs() int { | ||
return int(atomic.LoadInt32(&t.totalJobs)) | ||
} |
Oops, something went wrong.