diff --git a/go.mod b/go.mod index 5b2078a64fc..3a266ea6e3e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/blang/semver v3.5.1+incompatible // indirect github.com/davecgh/go-spew v1.1.1 github.com/dustinkirkland/golang-petname v0.0.0-20170921220637-d3c2ba80e75e // indirect + github.com/gammazero/deque v0.0.0-20180920172122-f6adf94963e4 // indirect + github.com/gammazero/workerpool v0.0.0-20181230203049-86a96b5d5d92 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect github.com/googleapis/gax-go v2.0.2+incompatible // indirect diff --git a/go.sum b/go.sum index aa0952efe82..b34685eb2eb 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,10 @@ github.com/dustinkirkland/golang-petname v0.0.0-20170921220637-d3c2ba80e75e/go.m github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gammazero/deque v0.0.0-20180920172122-f6adf94963e4 h1:R+19WKQClnfMXS60cP5BmMe1wjZ4u0evY2p2Ar0ZTXo= +github.com/gammazero/deque v0.0.0-20180920172122-f6adf94963e4/go.mod h1:GeIq9qoE43YdGnDXURnmKTnGg15pQz4mYkXSTChbneI= +github.com/gammazero/workerpool v0.0.0-20181230203049-86a96b5d5d92 h1:EipXK6U05IQ2wtuFRn4k3h0+2lXypzItoXGVyf4r9Io= +github.com/gammazero/workerpool v0.0.0-20181230203049-86a96b5d5d92/go.mod h1:w9RqFVO2BM3xwWEcAB8Fwp0OviTBBEiRmSBDfbXnd3w= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-test/deep v1.0.1/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= diff --git a/google/resource_storage_bucket.go b/google/resource_storage_bucket.go index 34d6e559241..1d3b913154f 100644 --- a/google/resource_storage_bucket.go +++ b/google/resource_storage_bucket.go @@ -5,10 +5,12 @@ import ( "errors" "fmt" "log" + "runtime" "strconv" "strings" "time" + "github.com/gammazero/workerpool" "github.com/hashicorp/terraform/helper/hashcode" "github.com/hashicorp/terraform/helper/resource" "github.com/hashicorp/terraform/helper/schema" @@ -516,22 +518,46 @@ func resourceStorageBucketDelete(d *schema.ResourceData, meta interface{}) error if len(res.Items) != 0 { if d.Get("force_destroy").(bool) { - // purge the bucket... + // GCS requires that a bucket be empty (have no objects or object + // versions) before it can be deleted. log.Printf("[DEBUG] GCS Bucket attempting to forceDestroy\n\n") + // Create a workerpool for parallel deletion of resources. In the + // future, it would be great to expose Terraform's global parallelism + // flag here, but that's currently reserved for core use. Testing + // shows that NumCPUs-1 is the most performant on average networks. + // + // The challenge with making this user-configurable is that the + // configuration would reside in the Terraform configuration file, + // decreasing its portability. Ideally we'd want this to connect to + // Terraform's top-level -parallelism flag, but that's not plumbed nor + // is it scheduled to be plumbed to individual providers. + wp := workerpool.New(runtime.NumCPU() - 1) + for _, object := range res.Items { log.Printf("[DEBUG] Found %s", object.Name) - if err := config.clientStorage.Objects.Delete(bucket, object.Name).Generation(object.Generation).Do(); err != nil { - log.Fatalf("Error trying to delete object: %s %s\n\n", object.Name, err) - } else { - log.Printf("Object deleted: %s \n\n", object.Name) - } + object := object + + wp.Submit(func() { + log.Printf("[TRACE] Attempting to delete %s", object.Name) + if err := config.clientStorage.Objects.Delete(bucket, object.Name).Generation(object.Generation).Do(); err != nil { + // We should really return an error here, but it doesn't really + // matter since the following step (bucket deletion) will fail + // with an error indicating objects are still present, and this + // log line will point to that object. + log.Printf("[ERR] Failed to delete storage object %s: %s", object.Name, err) + } else { + log.Printf("[TRACE] Successfully deleted %s", object.Name) + } + }) } + // Wait for everything to finish. + wp.StopWait() } else { - delete_err := errors.New("Error trying to delete a bucket containing objects without `force_destroy` set to true") - log.Printf("Error! %s : %s\n\n", bucket, delete_err) - return delete_err + deleteErr := errors.New("Error trying to delete a bucket containing objects without `force_destroy` set to true") + log.Printf("Error! %s : %s\n\n", bucket, deleteErr) + return deleteErr } } else { break // 0 items, bucket empty diff --git a/vendor/github.com/gammazero/deque/.gitignore b/vendor/github.com/gammazero/deque/.gitignore new file mode 100644 index 00000000000..b33406fb097 --- /dev/null +++ b/vendor/github.com/gammazero/deque/.gitignore @@ -0,0 +1,26 @@ +*~ + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/gammazero/deque/.travis.yml b/vendor/github.com/gammazero/deque/.travis.yml new file mode 100644 index 00000000000..9608f4cbd5e --- /dev/null +++ b/vendor/github.com/gammazero/deque/.travis.yml @@ -0,0 +1,17 @@ +language: go + +go: + - "1.8" + - "1.9" + - "1.10" + - "1.11" + - "tip" + +before_script: + - go vet ./... + +script: + - ./go.test.sh + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/gammazero/deque/LICENSE b/vendor/github.com/gammazero/deque/LICENSE new file mode 100644 index 00000000000..0566f2661ba --- /dev/null +++ b/vendor/github.com/gammazero/deque/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Andrew J. Gillis + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/gammazero/deque/README.md b/vendor/github.com/gammazero/deque/README.md new file mode 100644 index 00000000000..0cc979e1fc4 --- /dev/null +++ b/vendor/github.com/gammazero/deque/README.md @@ -0,0 +1,73 @@ +# deque + +[![Build Status](https://travis-ci.org/gammazero/deque.svg)](https://travis-ci.org/gammazero/deque) +[![Go Report Card](https://goreportcard.com/badge/github.com/gammazero/deque)](https://goreportcard.com/report/github.com/gammazero/deque) +[![codecov](https://codecov.io/gh/gammazero/deque/branch/master/graph/badge.svg)](https://codecov.io/gh/gammazero/deque) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) + +Extremely fast ring-buffer deque ([double-ended queue](https://en.wikipedia.org/wiki/Double-ended_queue)) implementation. + +[![GoDoc](https://godoc.org/github.com/gammazero/deque?status.svg)](https://godoc.org/github.com/gammazero/deque) + +For a pictorial description, see the [Deque diagram](https://github.com/gammazero/deque/wiki) + +## Installation + +``` +$ go get github.com/gammazero/deque +``` + +## Deque data structure + +Deque generalizes a queue and a stack, to efficiently add and remove items at either end with O(1) performance. [Queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)) (FIFO) operations are supported using `PushBack()` and `PopFront()`. [Stack](https://en.wikipedia.org/wiki/Stack_(abstract_data_type)) (LIFO) operations are supported using `PushBack()` and `PopBack()`. + +## Ring-buffer Performance + +This deque implementation is optimized for CPU and GC performance. The circular buffer automatically re-sizes by powers of two, growing when additional capacity is needed and shrinking when only a quarter of the capacity is used, and uses bitwise arithmetic for all calculations. Since growth is by powers of two, adding elements will only cause O(log n) allocations. + +The ring-buffer implementation significantly improves memory and time performance with fewer GC pauses, compared to implementations based on slices and linked lists. By wrapping around the buffer, previously used space is reused, making allocation unnecessary until all buffer capacity is used. + +For maximum speed, this deque implementation leaves concurrency safety up to the application to provide, however the application chooses, if needed at all. + +## Reading Empty Deque + +Since it is OK for the deque to contain a nil value, it is necessary to either panic or return a second boolean value to indicate the deque is empty, when reading or removing an element. This deque panics when reading from an empty deque. This is a run-time check to help catch programming errors, which may be missed if a second return value is ignored. Simply check Deque.Len() before reading from the deque. + +## Example + +```go +package main + +import ( + "fmt" + "github.com/gammazero/deque" +) + +func main() { + var q deque.Deque + q.PushBack("foo") + q.PushBack("bar") + q.PushBack("baz") + + fmt.Println(q.Len()) // Prints: 3 + fmt.Println(q.Front()) // Prints: foo + fmt.Println(q.Back()) // Prints: baz + + q.PopFront() // remove "foo" + q.PopBack() // remove "baz" + + q.PushFront("hello") + q.PushBack("world") + + // Consume deque and print elements. + for q.Len() != 0 { + fmt.Println(q.PopFront()) + } +} +``` + +## Uses + +Deque can be used as both a: +- [Queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)) using `PushBack` and `PopFront` +- [Stack](https://en.wikipedia.org/wiki/Stack_(abstract_data_type)) using `PushBack` and `PopBack` diff --git a/vendor/github.com/gammazero/deque/deque.go b/vendor/github.com/gammazero/deque/deque.go new file mode 100644 index 00000000000..d9d9863c101 --- /dev/null +++ b/vendor/github.com/gammazero/deque/deque.go @@ -0,0 +1,224 @@ +package deque + +// minCapacity is the smallest capacity that deque may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minCapacity = 16 + +// Deque represents a single instance of the deque data structure. +type Deque struct { + buf []interface{} + head int + tail int + count int +} + +// Len returns the number of elements currently stored in the queue. +func (q *Deque) Len() int { + return q.count +} + +// PushBack appends an element to the back of the queue. Implements FIFO when +// elements are removed with PopFront(), and LIFO when elements are removed +// with PopBack(). +func (q *Deque) PushBack(elem interface{}) { + q.growIfFull() + + q.buf[q.tail] = elem + // Calculate new tail position. + q.tail = q.next(q.tail) + q.count++ +} + +// PushFront prepends an element to the front of the queue. +func (q *Deque) PushFront(elem interface{}) { + q.growIfFull() + + // Calculate new head position. + q.head = q.prev(q.head) + q.buf[q.head] = elem + q.count++ +} + +// PopFront removes and returns the element from the front of the queue. +// Implements FIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque) PopFront() interface{} { + if q.count <= 0 { + panic("deque: PopFront() called on empty queue") + } + ret := q.buf[q.head] + q.buf[q.head] = nil + // Calculate new head position. + q.head = q.next(q.head) + q.count-- + + q.shrinkIfExcess() + return ret +} + +// PopBack removes and returns the element from the back of the queue. +// Implements LIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque) PopBack() interface{} { + if q.count <= 0 { + panic("deque: PopBack() called on empty queue") + } + + // Calculate new tail position + q.tail = q.prev(q.tail) + + // Remove value at tail. + ret := q.buf[q.tail] + q.buf[q.tail] = nil + q.count-- + + q.shrinkIfExcess() + return ret +} + +// Front returns the element at the front of the queue. This is the element +// that would be returned by PopFront(). This call panics if the queue is +// empty. +func (q *Deque) Front() interface{} { + if q.count <= 0 { + panic("deque: Front() called when empty") + } + return q.buf[q.head] +} + +// Back returns the element at the back of the queue. This is the element +// that would be returned by PopBack(). This call panics if the queue is +// empty. +func (q *Deque) Back() interface{} { + if q.count <= 0 { + panic("deque: Back() called when empty") + } + return q.buf[q.prev(q.tail)] +} + +// At returns the element at index i in the queue without removing the element +// from the queue. This method accepts only non-negative index values. At(0) +// refers to the first element and is the same as Front(). At(Len()-1) refers +// to the last element and is the same as Back(). If the index is invalid, the +// call panics. +// +// The purpose of At is to allow Deque to serve as a more general purpose +// circular buffer, where items are only added to and removed from the the ends +// of the deque, but may be read from any place within the deque. Consider the +// case of a fixed-size circular log buffer: A new entry is pushed onto one end +// and when full the oldest is popped from the other end. All the log entries +// in the buffer must be readable without altering the buffer contents. +func (q *Deque) At(i int) interface{} { + if i < 0 || i >= q.count { + panic("deque: At() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Clear removes all elements from the queue, but retains the current capacity. +// This is useful when repeatedly reusing the queue at high frequency to avoid +// GC during reuse. The queue will not be resized smaller as long as items are +// only added. Only when items are removed is the queue subject to getting +// resized smaller. +func (q *Deque) Clear() { + // bitwise modulus + modBits := len(q.buf) - 1 + for h := q.head; h != q.tail; h = (h + 1) & modBits { + q.buf[h] = nil + } + q.head = 0 + q.tail = 0 + q.count = 0 +} + +// Rotate rotates the deque n steps front-to-back. If n is negative, rotates +// back-to-front. Having Deque provide Rotate() avoids resizing that could +// happen if implementing rotation using only Pop and Push methods. +func (q *Deque) Rotate(n int) { + if q.count <= 1 { + return + } + // Rotating a multiple of q.count is same as no rotation. + n %= q.count + if n == 0 { + return + } + + modBits := len(q.buf) - 1 + // If no empty space in buffer, only move head and tail indexes. + if q.head == q.tail { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + n) & modBits + q.tail = (q.tail + n) & modBits + return + } + + if n < 0 { + // Rotate back to front. + for ; n < 0; n++ { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head - 1) & modBits + q.tail = (q.tail - 1) & modBits + // Put tail value at head and remove value at tail. + q.buf[q.head] = q.buf[q.tail] + q.buf[q.tail] = nil + } + return + } + + // Rotate front to back. + for ; n > 0; n-- { + // Put head value at tail and remove value at head. + q.buf[q.tail] = q.buf[q.head] + q.buf[q.head] = nil + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + 1) & modBits + q.tail = (q.tail + 1) & modBits + } +} + +// prev returns the previous buffer position wrapping around buffer. +func (q *Deque) prev(i int) int { + return (i - 1) & (len(q.buf) - 1) // bitwise modulus +} + +// next returns the next buffer position wrapping around buffer. +func (q *Deque) next(i int) int { + return (i + 1) & (len(q.buf) - 1) // bitwise modulus +} + +// growIfFull resizes up if the buffer is full. +func (q *Deque) growIfFull() { + if len(q.buf) == 0 { + q.buf = make([]interface{}, minCapacity) + return + } + if q.count == len(q.buf) { + q.resize() + } +} + +// shrinkIfExcess resize down if the buffer 1/4 full. +func (q *Deque) shrinkIfExcess() { + if len(q.buf) > minCapacity && (q.count<<2) == len(q.buf) { + q.resize() + } +} + +// resize resizes the deque to fit exactly twice its current contents. This is +// used to grow the queue when it is full, and also to shrink it when it is +// only a quarter full. +func (q *Deque) resize() { + newBuf := make([]interface{}, q.count<<1) + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} diff --git a/vendor/github.com/gammazero/deque/doc.go b/vendor/github.com/gammazero/deque/doc.go new file mode 100644 index 00000000000..c9647f983c2 --- /dev/null +++ b/vendor/github.com/gammazero/deque/doc.go @@ -0,0 +1,34 @@ +/* +Package deque provides a fast ring-buffer deque (double-ended queue) +implementation. + +Deque generalizes a queue and a stack, to efficiently add and remove items at +either end with O(1) performance. Queue (FIFO) operations are supported using +PushBack() and PopFront(). Stack (LIFO) operations are supported using +PushBack() and PopBack(). + +Ring-buffer Performance + +The ring-buffer automatically resizes by +powers of two, growing when additional capacity is needed and shrinking when +only a quarter of the capacity is used, and uses bitwise arithmetic for all +calculations. + +The ring-buffer implementation significantly improves memory and time +performance with fewer GC pauses, compared to implementations based on slices +and linked lists. + +For maximum speed, this deque implementation leaves concurrency safety up to +the application to provide, however the application chooses, if needed at all. + +Reading Empty Deque + +Since it is OK for the deque to contain a nil value, it is necessary to either +panic or return a second boolean value to indicate the deque is empty, when +reading or removing an element. This deque panics when reading from an empty +deque. This is a run-time check to help catch programming errors, which may be +missed if a second return value is ignored. Simply check Deque.Len() before +reading from the deque. + +*/ +package deque diff --git a/vendor/github.com/gammazero/deque/go.test.sh b/vendor/github.com/gammazero/deque/go.test.sh new file mode 100644 index 00000000000..494b176ba10 --- /dev/null +++ b/vendor/github.com/gammazero/deque/go.test.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -e +echo "" > coverage.txt + +for d in $(go list ./... | grep -v vendor); do + go test -coverprofile=profile.out -covermode=atomic $d + if [ -f profile.out ]; then + cat profile.out >> coverage.txt + rm profile.out + fi +done diff --git a/vendor/github.com/gammazero/workerpool/.gitignore b/vendor/github.com/gammazero/workerpool/.gitignore new file mode 100644 index 00000000000..b33406fb097 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/.gitignore @@ -0,0 +1,26 @@ +*~ + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/vendor/github.com/gammazero/workerpool/.travis.yml b/vendor/github.com/gammazero/workerpool/.travis.yml new file mode 100644 index 00000000000..9608f4cbd5e --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/.travis.yml @@ -0,0 +1,17 @@ +language: go + +go: + - "1.8" + - "1.9" + - "1.10" + - "1.11" + - "tip" + +before_script: + - go vet ./... + +script: + - ./go.test.sh + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/gammazero/workerpool/LICENSE b/vendor/github.com/gammazero/workerpool/LICENSE new file mode 100644 index 00000000000..f6ff6ce9da5 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Andrew J. Gillis + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/gammazero/workerpool/README.md b/vendor/github.com/gammazero/workerpool/README.md new file mode 100644 index 00000000000..2a75ca6f588 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/README.md @@ -0,0 +1,44 @@ +# workerpool +[![Build Status](https://travis-ci.org/gammazero/workerpool.svg)](https://travis-ci.org/gammazero/workerpool) +[![Go Report Card](https://goreportcard.com/badge/github.com/gammazero/workerpool)](https://goreportcard.com/report/github.com/gammazero/workerpool) +[![codecov](https://codecov.io/gh/gammazero/workerpool/branch/master/graph/badge.svg)](https://codecov.io/gh/gammazero/workerpool) +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/gammazero/workerpool/blob/master/LICENSE) + +Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued. + +[![GoDoc](https://godoc.org/github.com/gammazero/workerpool?status.svg)](https://godoc.org/github.com/gammazero/workerpool) + +This implementation builds on ideas from the following: + +- http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang +- http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html + +## Installation +To install this package, you need to setup your Go workspace. The simplest way to install the library is to run: +``` +$ go get github.com/gammazero/workerpool +``` + +## Example +```go +package main + +import ( + "fmt" + "github.com/gammazero/workerpool" +) + +func main() { + wp := workerpool.New(2) + requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"} + + for _, r := range requests { + r := r + wp.Submit(func() { + fmt.Println("Handling request:", r) + }) + } + + wp.StopWait() +} +``` diff --git a/vendor/github.com/gammazero/workerpool/doc.go b/vendor/github.com/gammazero/workerpool/doc.go new file mode 100644 index 00000000000..2b24ea461dd --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/doc.go @@ -0,0 +1,66 @@ +/* +Package workerpool queues work to a limited number of goroutines. + +The purpose of the worker pool is to limit the concurrency of tasks +executed by the workers. This is useful when performing tasks that require +sufficient resources (CPU, memory, etc.), and running too many tasks at the +same time would exhaust resources. + +Non-blocking task submission + +A task is a function submitted to the worker pool for execution. Submitting +tasks to this worker pool will not block, regardless of the number of tasks. +Tasks read from the input task queue are immediately dispatched to an available +worker. If no worker is immediately available, or there are already tasks +waiting for an available worker, then the task is put on a waiting queue to +wait for an available worker. This clears the incoming task from the input +task queue immediately, whether or not a worker is currently available, and +will not block the submission of tasks. + +The intent of the worker pool is to limit the concurrency of task execution, +not limit the number of tasks queued to be executed. Therefore, this unbounded +input of tasks is acceptable as the tasks cannot be discarded. If the number +of inbound tasks is too many to even queue for pending processing, then the +solution is outside the scope of workerpool, and should be solved by +distributing load over multiple systems, and/or storing input for pending +processing in intermediate storage such as a database, file system, distributed +message queue, etc. + +Dispatcher + +This worker pool uses a single dispatcher goroutine to read tasks from the +input task queue and dispatch them to a worker goroutine. This allows for a +small input channel, and lets the dispatcher queue as many tasks as are +submitted when there are no available workers. Additionally, the dispatcher +can adjust the number of workers as appropriate for the work load, without +having to utilize locked counters and checks incurred on task submission. + +When no tasks have been submitted for a period of time, a worker is removed by +the dispatcher. This is done until there are no more workers to remove. The +minimum number of workers is always zero, because the time to start new workers +is insignificant. + +Usage note + +It is advisable to use different worker pools for tasks that are bound by +different resources, or that have different resource use patterns. For +example, tasks that use X Mb of memory may need different concurrency limits +than tasks that use Y Mb of memory. + +Waiting queue vs goroutines + +When there are no available workers to handle incoming tasks, the tasks are put +on a waiting queue. In previous versions of workerpool, these tasks were +passed to goroutines. Using a queue is faster and has less memory overhead +than creating a separate goroutine for each waiting task, allowing a much +higher number of waiting tasks. + +Credits + +This implementation builds on ideas from the following: + +http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang +http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html + +*/ +package workerpool diff --git a/vendor/github.com/gammazero/workerpool/go.test.sh b/vendor/github.com/gammazero/workerpool/go.test.sh new file mode 100644 index 00000000000..34dbbfb3161 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/go.test.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -e +echo "" > coverage.txt + +for d in $(go list ./... | grep -v vendor); do + go test -race -coverprofile=profile.out -covermode=atomic $d + if [ -f profile.out ]; then + cat profile.out >> coverage.txt + rm profile.out + fi +done diff --git a/vendor/github.com/gammazero/workerpool/workerpool.go b/vendor/github.com/gammazero/workerpool/workerpool.go new file mode 100644 index 00000000000..2993411ca70 --- /dev/null +++ b/vendor/github.com/gammazero/workerpool/workerpool.go @@ -0,0 +1,276 @@ +package workerpool + +import ( + "github.com/gammazero/deque" + "sync" + "time" +) + +const ( + // This value is the size of the queue that workers register their + // availability to the dispatcher. There may be hundreds of workers, but + // only a small channel is needed to register some of the workers. + readyQueueSize = 16 + + // If worker pool receives no new work for this period of time, then stop + // a worker goroutine. + idleTimeoutSec = 5 +) + +// New creates and starts a pool of worker goroutines. +// +// The maxWorkers parameter specifies the maximum number of workers that will +// execute tasks concurrently. After each timeout period, a worker goroutine +// is stopped until there are no remaining workers. +func New(maxWorkers int) *WorkerPool { + // There must be at least one worker. + if maxWorkers < 1 { + maxWorkers = 1 + } + + pool := &WorkerPool{ + taskQueue: make(chan func(), 1), + maxWorkers: maxWorkers, + readyWorkers: make(chan chan func(), readyQueueSize), + timeout: time.Second * idleTimeoutSec, + stoppedChan: make(chan struct{}), + } + + // Start the task dispatcher. + go pool.dispatch() + + return pool +} + +// WorkerPool is a collection of goroutines, where the number of concurrent +// goroutines processing requests does not exceed the specified maximum. +type WorkerPool struct { + maxWorkers int + timeout time.Duration + taskQueue chan func() + readyWorkers chan chan func() + stoppedChan chan struct{} + waitingQueue deque.Deque + stopMutex sync.Mutex + stopped bool +} + +// Stop stops the worker pool and waits for only currently running tasks to +// complete. Pending tasks that are not currently running are abandoned. +// Tasks must not be submitted to the worker pool after calling stop. +// +// Since creating the worker pool starts at least one goroutine, for the +// dispatcher, Stop() or StopWait() should be called when the worker pool is no +// longer needed. +func (p *WorkerPool) Stop() { + p.stop(false) +} + +// StopWait stops the worker pool and waits for all queued tasks tasks to +// complete. No additional tasks may be submitted, but all pending tasks are +// executed by workers before this function returns. +func (p *WorkerPool) StopWait() { + p.stop(true) +} + +// Stopped returns true if this worker pool has been stopped. +func (p *WorkerPool) Stopped() bool { + p.stopMutex.Lock() + defer p.stopMutex.Unlock() + return p.stopped +} + +// Submit enqueues a function for a worker to execute. +// +// Any external values needed by the task function must be captured in a +// closure. Any return values should be returned over a channel that is +// captured in the task function closure. +// +// Submit will not block regardless of the number of tasks submitted. Each +// task is immediately given to an available worker or passed to a goroutine to +// be given to the next available worker. If there are no available workers, +// the dispatcher adds a worker, until the maximum number of workers is +// running. +// +// After the maximum number of workers are running, and no workers are +// available, incoming tasks are put onto a queue and will be executed as +// workers become available. +// +// When no new tasks have been submitted for time period and a worker is +// available, the worker is shutdown. As long as no new tasks arrive, one +// available worker is shutdown each time period until there are no more idle +// workers. Since the time to start new goroutines is not significant, there +// is no need to retain idle workers. +func (p *WorkerPool) Submit(task func()) { + if task != nil { + p.taskQueue <- task + } +} + +// SubmitWait enqueues the given function and waits for it to be executed. +func (p *WorkerPool) SubmitWait(task func()) { + if task == nil { + return + } + doneChan := make(chan struct{}) + p.taskQueue <- func() { + task() + close(doneChan) + } + <-doneChan +} + +// dispatch sends the next queued task to an available worker. +func (p *WorkerPool) dispatch() { + defer close(p.stoppedChan) + timeout := time.NewTimer(p.timeout) + var ( + workerCount int + task func() + ok, wait bool + workerTaskChan chan func() + ) + startReady := make(chan chan func()) +Loop: + for { + // As long as tasks are in the waiting queue, remove and execute these + // tasks as workers become available, and place new incoming tasks on + // the queue. Once the queue is empty, then go back to submitting + // incoming tasks directly to available workers. + if p.waitingQueue.Len() != 0 { + select { + case task, ok = <-p.taskQueue: + if !ok { + break Loop + } + if task == nil { + wait = true + break Loop + } + p.waitingQueue.PushBack(task) + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so give task to worker. + workerTaskChan <- p.waitingQueue.PopFront().(func()) + } + continue + } + timeout.Reset(p.timeout) + select { + case task, ok = <-p.taskQueue: + if !ok || task == nil { + break Loop + } + // Got a task to do. + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so give task to worker. + workerTaskChan <- task + default: + // No workers ready. + // Create a new worker, if not at max. + if workerCount < p.maxWorkers { + workerCount++ + go func(t func()) { + startWorker(startReady, p.readyWorkers) + // Submit the task when the new worker. + taskChan := <-startReady + taskChan <- t + }(task) + } else { + // Enqueue task to be executed by next available worker. + p.waitingQueue.PushBack(task) + } + } + case <-timeout.C: + // Timed out waiting for work to arrive. Kill a ready worker. + if workerCount > 0 { + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so kill. + close(workerTaskChan) + workerCount-- + default: + // No work, but no ready workers. All workers are busy. + } + } + } + } + + // If instructed to wait for all queued tasks, then remove from queue and + // give to workers until queue is empty. + if wait { + for p.waitingQueue.Len() != 0 { + workerTaskChan = <-p.readyWorkers + // A worker is ready, so give task to worker. + workerTaskChan <- p.waitingQueue.PopFront().(func()) + } + } + + // Stop all remaining workers as they become ready. + for workerCount > 0 { + workerTaskChan = <-p.readyWorkers + close(workerTaskChan) + workerCount-- + } +} + +// startWorker starts a goroutine that executes tasks given by the dispatcher. +// +// When a new worker starts, it registers its availability on the startReady +// channel. This ensures that the goroutine associated with starting the +// worker gets to use the worker to execute its task. Otherwise, the main +// dispatcher loop could steal the new worker and not know to start up another +// worker for the waiting goroutine. The task would then have to wait for +// another existing worker to become available, even though capacity is +// available to start additional workers. +// +// A worker registers that is it available to do work by putting its task +// channel on the readyWorkers channel. The dispatcher reads a worker's task +// channel from the readyWorkers channel, and writes a task to the worker over +// the worker's task channel. To stop a worker, the dispatcher closes a +// worker's task channel, instead of writing a task to it. +func startWorker(startReady, readyWorkers chan chan func()) { + go func() { + taskChan := make(chan func()) + var task func() + var ok bool + // Register availability on starReady channel. + startReady <- taskChan + for { + // Read task from dispatcher. + task, ok = <-taskChan + if !ok { + // Dispatcher has told worker to stop. + break + } + + // Execute the task. + task() + + // Register availability on readyWorkers channel. + readyWorkers <- taskChan + } + }() +} + +// stop tells the dispatcher to exit, and whether or not to complete queued +// tasks. +func (p *WorkerPool) stop(wait bool) { + p.stopMutex.Lock() + defer p.stopMutex.Unlock() + if p.stopped { + return + } + p.stopped = true + if wait { + p.taskQueue <- nil + } + // Close task queue and wait for currently running tasks to finish. + close(p.taskQueue) + <-p.stoppedChan +} + +// WaitingQueueSize will return the size of the waiting queue +func (p *WorkerPool) WaitingQueueSize() int { + return p.waitingQueue.Len() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5d7f6ec9fe3..e49af58ce88 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -62,6 +62,10 @@ github.com/davecgh/go-spew/spew github.com/dustinkirkland/golang-petname # github.com/fatih/color v1.7.0 github.com/fatih/color +# github.com/gammazero/deque v0.0.0-20180920172122-f6adf94963e4 +github.com/gammazero/deque +# github.com/gammazero/workerpool v0.0.0-20181230203049-86a96b5d5d92 +github.com/gammazero/workerpool # github.com/golang/protobuf v1.2.0 github.com/golang/protobuf/proto github.com/golang/protobuf/ptypes