From 404c871c51864869a8e635cc46219debc626235b Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Tue, 13 Feb 2018 14:54:06 -0300 Subject: [PATCH 1/2] Use a ring channel to avoid blocking write of events --- internal/ingress/controller/nginx.go | 21 ++++++----- .../ingress/controller/store/backend_ssl.go | 2 +- internal/ingress/controller/store/store.go | 31 ++++++++-------- .../ingress/controller/store/store_test.go | 36 ++++++++++--------- 4 files changed, 50 insertions(+), 40 deletions(-) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index efee0b9ee8..9a8aa262df 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -33,6 +33,7 @@ import ( "github.com/golang/glog" proxyproto "github.com/armon/go-proxyproto" + "github.com/eapache/channels" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/client-go/kubernetes/scheme" @@ -106,7 +107,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl }), stopCh: make(chan struct{}), - updateCh: make(chan store.Event, 1024), + updateCh: channels.NewRingChannel(1024), stopLock: &sync.Mutex{}, @@ -209,7 +210,7 @@ type NGINXController struct { stopLock *sync.Mutex stopCh chan struct{} - updateCh chan store.Event + updateCh *channels.RingChannel // ngxErrCh channel used to detect errors with the nginx processes ngxErrCh chan error @@ -290,16 +291,20 @@ func (n *NGINXController) Start() { // start a new nginx master process if the controller is not being stopped n.start(cmd) } - case evt := <-n.updateCh: + case event := <-n.updateCh.Out(): if n.isShuttingDown { break } - glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj) - if evt.Type == store.ConfigurationEvent { - n.SetForceReload(true) - } + if evt, ok := event.(store.Event); ok { + glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj) + if evt.Type == store.ConfigurationEvent { + n.SetForceReload(true) + } - n.syncQueue.Enqueue(evt.Obj) + n.syncQueue.Enqueue(evt.Obj) + } else { + glog.Warningf("unexpected event type received %T", event) + } case <-n.stopCh: break } diff --git a/internal/ingress/controller/store/backend_ssl.go b/internal/ingress/controller/store/backend_ssl.go index 8d8158f8eb..9f4fbc0ee2 100644 --- a/internal/ingress/controller/store/backend_ssl.go +++ b/internal/ingress/controller/store/backend_ssl.go @@ -226,7 +226,7 @@ func (s k8sStore) ReadSecrets(ing *extensions.Ingress) { // sendDummyEvent sends a dummy event to trigger an update // This is used in when a secret change func (s *k8sStore) sendDummyEvent() { - s.updateCh <- Event{ + s.updateCh.In() <- Event{ Type: UpdateEvent, Obj: &extensions.Ingress{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index b6b26499d5..a45a5d57b7 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/eapache/channels" "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" @@ -194,7 +195,7 @@ type k8sStore struct { filesystem file.Filesystem // updateCh - updateCh chan Event + updateCh *channels.RingChannel // mu mutex used to avoid simultaneous incovations to syncSecret mu *sync.Mutex @@ -208,7 +209,7 @@ func New(checkOCSP bool, resyncPeriod time.Duration, client clientset.Interface, fs file.Filesystem, - updateCh chan Event) Storer { + updateCh *channels.RingChannel) Storer { store := &k8sStore{ isOCSPCheckEnabled: checkOCSP, @@ -246,7 +247,7 @@ func New(checkOCSP bool, store.extractAnnotations(addIng) recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - updateCh <- Event{ + updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } @@ -272,7 +273,7 @@ func New(checkOCSP bool, } recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) store.listers.IngressAnnotation.Delete(delIng) - updateCh <- Event{ + updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } @@ -293,7 +294,7 @@ func New(checkOCSP bool, } store.extractAnnotations(curIng) - updateCh <- Event{ + updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } @@ -312,7 +313,7 @@ func New(checkOCSP bool, _, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec)) if err == nil { store.syncSecret(key) - updateCh <- Event{ + updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } @@ -323,7 +324,7 @@ func New(checkOCSP bool, store.extractAnnotations(ing) } - updateCh <- Event{ + updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: cur, } @@ -346,7 +347,7 @@ func New(checkOCSP bool, } } store.sslStore.Delete(k8s.MetaNamespaceKey(sec)) - updateCh <- Event{ + updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } @@ -362,7 +363,7 @@ func New(checkOCSP bool, } } - updateCh <- Event{ + updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: sec, } @@ -372,13 +373,13 @@ func New(checkOCSP bool, eventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - updateCh <- Event{ + updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: func(obj interface{}) { - updateCh <- Event{ + updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } @@ -387,7 +388,7 @@ func New(checkOCSP bool, oep := old.(*apiv1.Endpoints) ocur := cur.(*apiv1.Endpoints) if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { - updateCh <- Event{ + updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } @@ -402,7 +403,7 @@ func New(checkOCSP bool, if mapKey == configmap { glog.V(2).Infof("adding configmap %v to backend", mapKey) store.setConfig(m) - updateCh <- Event{ + updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: obj, } @@ -415,7 +416,7 @@ func New(checkOCSP bool, if mapKey == configmap { recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) store.setConfig(m) - updateCh <- Event{ + updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: cur, } @@ -423,7 +424,7 @@ func New(checkOCSP bool, // updates to configuration configmaps can trigger an update if mapKey == tcp || mapKey == udp { recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - updateCh <- Event{ + updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: cur, } diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go index 9522f13d07..9f506aa7af 100644 --- a/internal/ingress/controller/store/store_test.go +++ b/internal/ingress/controller/store/store_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/eapache/channels" apiv1 "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" extensions "k8s.io/api/extensions/v1beta1" @@ -56,11 +57,11 @@ func TestStore(t *testing.T) { defer deleteNamespace(ns, clientSet, t) stopCh := make(chan struct{}) - updateCh := make(chan Event, 1024) + updateCh := channels.NewRingChannel(1024) - go func(ch chan Event) { + go func(ch *channels.RingChannel) { for { - <-ch + <-ch.Out() } }(updateCh) @@ -111,7 +112,7 @@ func TestStore(t *testing.T) { t.Errorf("expected an Ingres but none returned") } - close(updateCh) + updateCh.Close() close(stopCh) }) @@ -120,19 +121,20 @@ func TestStore(t *testing.T) { defer deleteNamespace(ns, clientSet, t) stopCh := make(chan struct{}) - updateCh := make(chan Event, 1024) + updateCh := channels.NewRingChannel(1024) var add uint64 var upd uint64 var del uint64 - go func(ch chan Event) { + go func(ch *channels.RingChannel) { for { - e, ok := <-ch + evt, ok := <-ch.Out() if !ok { return } + e := evt.(Event) if e.Obj == nil { continue } @@ -254,7 +256,7 @@ func TestStore(t *testing.T) { t.Errorf("expected 1 event of type Delete but %v occurred", del) } - close(updateCh) + updateCh.Close() close(stopCh) }) @@ -263,19 +265,20 @@ func TestStore(t *testing.T) { defer deleteNamespace(ns, clientSet, t) stopCh := make(chan struct{}) - updateCh := make(chan Event, 1024) + updateCh := channels.NewRingChannel(1024) var add uint64 var upd uint64 var del uint64 - go func(ch chan Event) { + go func(ch *channels.RingChannel) { for { - e, ok := <-ch + evt, ok := <-ch.Out() if !ok { return } + e := evt.(Event) if e.Obj == nil { continue } @@ -339,7 +342,7 @@ func TestStore(t *testing.T) { t.Errorf("expected 1 events of type Delete but %v occurred", del) } - close(updateCh) + updateCh.Close() close(stopCh) }) @@ -348,19 +351,20 @@ func TestStore(t *testing.T) { defer deleteNamespace(ns, clientSet, t) stopCh := make(chan struct{}) - updateCh := make(chan Event, 1024) + updateCh := channels.NewRingChannel(1024) var add uint64 var upd uint64 var del uint64 - go func(ch <-chan Event) { + go func(ch *channels.RingChannel) { for { - e, ok := <-ch + evt, ok := <-ch.Out() if !ok { return } + e := evt.(Event) if e.Obj == nil { continue } @@ -478,7 +482,7 @@ func TestStore(t *testing.T) { } }) - close(updateCh) + updateCh.Close() close(stopCh) }) From 17f23cbf9aa5c60d48074e01faf89ee43bcdb40a Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Tue, 13 Feb 2018 15:50:01 -0300 Subject: [PATCH 2/2] Add eapache/channels dependency --- Gopkg.lock | 558 ++++++++++++++++-- Gopkg.toml | 6 +- vendor/github.com/eapache/channels/.gitignore | 22 + .../github.com/eapache/channels/.travis.yml | 11 + .../github.com/eapache/channels/CHANGELOG.md | 17 + vendor/github.com/eapache/channels/LICENSE | 20 + vendor/github.com/eapache/channels/README.md | 27 + .../eapache/channels/batching_channel.go | 87 +++ .../eapache/channels/batching_channel_test.go | 45 ++ .../github.com/eapache/channels/black_hole.go | 54 ++ .../eapache/channels/black_hole_test.go | 26 + .../github.com/eapache/channels/channels.go | 277 +++++++++ .../eapache/channels/channels_test.go | 265 +++++++++ .../eapache/channels/infinite_channel.go | 72 +++ .../eapache/channels/infinite_channel_test.go | 48 ++ .../eapache/channels/native_channel.go | 92 +++ .../eapache/channels/native_channel_test.go | 58 ++ .../eapache/channels/overflowing_channel.go | 113 ++++ .../channels/overflowing_channel_test.go | 49 ++ .../eapache/channels/resizable_channel.go | 109 ++++ .../channels/resizable_channel_test.go | 61 ++ .../eapache/channels/ring_channel.go | 114 ++++ .../eapache/channels/ring_channel_test.go | 49 ++ .../eapache/channels/shared_buffer.go | 167 ++++++ .../eapache/channels/shared_buffer_test.go | 127 ++++ vendor/github.com/eapache/queue/.gitignore | 23 + vendor/github.com/eapache/queue/.travis.yml | 7 + vendor/github.com/eapache/queue/LICENSE | 21 + vendor/github.com/eapache/queue/README.md | 16 + vendor/github.com/eapache/queue/queue.go | 102 ++++ vendor/github.com/eapache/queue/queue_test.go | 178 ++++++ 31 files changed, 2783 insertions(+), 38 deletions(-) create mode 100644 vendor/github.com/eapache/channels/.gitignore create mode 100644 vendor/github.com/eapache/channels/.travis.yml create mode 100644 vendor/github.com/eapache/channels/CHANGELOG.md create mode 100644 vendor/github.com/eapache/channels/LICENSE create mode 100644 vendor/github.com/eapache/channels/README.md create mode 100644 vendor/github.com/eapache/channels/batching_channel.go create mode 100644 vendor/github.com/eapache/channels/batching_channel_test.go create mode 100644 vendor/github.com/eapache/channels/black_hole.go create mode 100644 vendor/github.com/eapache/channels/black_hole_test.go create mode 100644 vendor/github.com/eapache/channels/channels.go create mode 100644 vendor/github.com/eapache/channels/channels_test.go create mode 100644 vendor/github.com/eapache/channels/infinite_channel.go create mode 100644 vendor/github.com/eapache/channels/infinite_channel_test.go create mode 100644 vendor/github.com/eapache/channels/native_channel.go create mode 100644 vendor/github.com/eapache/channels/native_channel_test.go create mode 100644 vendor/github.com/eapache/channels/overflowing_channel.go create mode 100644 vendor/github.com/eapache/channels/overflowing_channel_test.go create mode 100644 vendor/github.com/eapache/channels/resizable_channel.go create mode 100644 vendor/github.com/eapache/channels/resizable_channel_test.go create mode 100644 vendor/github.com/eapache/channels/ring_channel.go create mode 100644 vendor/github.com/eapache/channels/ring_channel_test.go create mode 100644 vendor/github.com/eapache/channels/shared_buffer.go create mode 100644 vendor/github.com/eapache/channels/shared_buffer_test.go create mode 100644 vendor/github.com/eapache/queue/.gitignore create mode 100644 vendor/github.com/eapache/queue/.travis.yml create mode 100644 vendor/github.com/eapache/queue/LICENSE create mode 100644 vendor/github.com/eapache/queue/README.md create mode 100644 vendor/github.com/eapache/queue/queue.go create mode 100644 vendor/github.com/eapache/queue/queue_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 1965b66cc3..4eb4aed138 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -9,7 +9,12 @@ [[projects]] name = "github.com/Azure/go-autorest" - packages = ["autorest","autorest/adal","autorest/azure","autorest/date"] + packages = [ + "autorest", + "autorest/adal", + "autorest/azure", + "autorest/date" + ] revision = "ab5671379918d9af294b6f0e3d8aaa98c829416d" version = "v9.4.0" @@ -51,18 +56,39 @@ [[projects]] name = "github.com/docker/distribution" - packages = ["digestset","reference"] + packages = [ + "digestset", + "reference" + ] revision = "edc3ab29cdff8694dd6feb85cfeb4b5f1b38ed9c" [[projects]] branch = "master" name = "github.com/docker/spdystream" - packages = [".","spdy"] + packages = [ + ".", + "spdy" + ] revision = "bc6354cbbc295e925e4c611ffe90c1f287ee54db" +[[projects]] + name = "github.com/eapache/channels" + packages = ["."] + revision = "47238d5aae8c0fefd518ef2bee46290909cf8263" + version = "v1.1.0" + +[[projects]] + name = "github.com/eapache/queue" + packages = ["."] + revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" + version = "v1.1.0" + [[projects]] name = "github.com/emicklei/go-restful" - packages = [".","log"] + packages = [ + ".", + "log" + ] revision = "5741799b275a3c4a5a9623a993576d7545cf7b5c" version = "v2.4.0" @@ -110,7 +136,12 @@ [[projects]] name = "github.com/gogo/protobuf" - packages = ["gogoproto","proto","protoc-gen-gogo/descriptor","sortkeys"] + packages = [ + "gogoproto", + "proto", + "protoc-gen-gogo/descriptor", + "sortkeys" + ] revision = "342cbe0a04158f6dcb03ca0079991a51a4248c02" version = "v0.5" @@ -129,7 +160,13 @@ [[projects]] branch = "master" name = "github.com/golang/protobuf" - packages = ["proto","ptypes","ptypes/any","ptypes/duration","ptypes/timestamp"] + packages = [ + "proto", + "ptypes", + "ptypes/any", + "ptypes/duration", + "ptypes/timestamp" + ] revision = "1643683e1b54a9e88ad26d98f81400c8c9d9f4f9" [[projects]] @@ -146,26 +183,44 @@ [[projects]] name = "github.com/googleapis/gnostic" - packages = ["OpenAPIv2","compiler","extensions"] + packages = [ + "OpenAPIv2", + "compiler", + "extensions" + ] revision = "ee43cbb60db7bd22502942cccbc39059117352ab" version = "v0.1.0" [[projects]] branch = "master" name = "github.com/gophercloud/gophercloud" - packages = [".","openstack","openstack/identity/v2/tenants","openstack/identity/v2/tokens","openstack/identity/v3/tokens","openstack/utils","pagination"] + packages = [ + ".", + "openstack", + "openstack/identity/v2/tenants", + "openstack/identity/v2/tokens", + "openstack/identity/v3/tokens", + "openstack/utils", + "pagination" + ] revision = "0b6b13c4dd9e07a89f83cbe4617c13ad646d6362" [[projects]] branch = "master" name = "github.com/gregjones/httpcache" - packages = [".","diskcache"] + packages = [ + ".", + "diskcache" + ] revision = "22a0b1feae53974ed4cfe27bcce70dba061cc5fd" [[projects]] branch = "master" name = "github.com/hashicorp/golang-lru" - packages = [".","simplelru"] + packages = [ + ".", + "simplelru" + ] revision = "0a025b7e63adc15a622f29b0b2c4c3848243bbf6" [[projects]] @@ -201,13 +256,20 @@ [[projects]] branch = "master" name = "github.com/kylelemons/godebug" - packages = ["diff","pretty"] + packages = [ + "diff", + "pretty" + ] revision = "d65d576e9348f5982d7f6d83682b694e731a45c6" [[projects]] branch = "master" name = "github.com/mailru/easyjson" - packages = ["buffer","jlexer","jwriter"] + packages = [ + "buffer", + "jlexer", + "jwriter" + ] revision = "5f62e4f3afa2f576dc86531b7df4d966b19ef8f8" [[projects]] @@ -237,19 +299,54 @@ [[projects]] branch = "master" name = "github.com/ncabatoff/process-exporter" - packages = [".","proc"] + packages = [ + ".", + "proc" + ] revision = "5917bc766b95a1fa3c2ae85340f4de02a6b7e15e" source = "github.com/aledbf/process-exporter" [[projects]] name = "github.com/onsi/ginkgo" - packages = [".","config","internal/codelocation","internal/containernode","internal/failer","internal/leafnodes","internal/remote","internal/spec","internal/spec_iterator","internal/specrunner","internal/suite","internal/testingtproxy","internal/writer","reporters","reporters/stenographer","reporters/stenographer/support/go-colorable","reporters/stenographer/support/go-isatty","types"] + packages = [ + ".", + "config", + "internal/codelocation", + "internal/containernode", + "internal/failer", + "internal/leafnodes", + "internal/remote", + "internal/spec", + "internal/spec_iterator", + "internal/specrunner", + "internal/suite", + "internal/testingtproxy", + "internal/writer", + "reporters", + "reporters/stenographer", + "reporters/stenographer/support/go-colorable", + "reporters/stenographer/support/go-isatty", + "types" + ] revision = "9eda700730cba42af70d53180f9dcce9266bc2bc" version = "v1.4.0" [[projects]] name = "github.com/onsi/gomega" - packages = [".","format","internal/assertion","internal/asyncassertion","internal/oraclematcher","internal/testingtsupport","matchers","matchers/support/goraph/bipartitegraph","matchers/support/goraph/edge","matchers/support/goraph/node","matchers/support/goraph/util","types"] + packages = [ + ".", + "format", + "internal/assertion", + "internal/asyncassertion", + "internal/oraclematcher", + "internal/testingtsupport", + "matchers", + "matchers/support/goraph/bipartitegraph", + "matchers/support/goraph/edge", + "matchers/support/goraph/node", + "matchers/support/goraph/util", + "types" + ] revision = "c893efa28eb45626cdaa76c9f653b62488858837" version = "v1.2.0" @@ -297,7 +394,10 @@ [[projects]] name = "github.com/prometheus/client_golang" - packages = ["prometheus","prometheus/promhttp"] + packages = [ + "prometheus", + "prometheus/promhttp" + ] revision = "c5b7fccd204277076155f10851dad72b76a49317" version = "v0.8.0" @@ -310,18 +410,28 @@ [[projects]] branch = "master" name = "github.com/prometheus/common" - packages = ["expfmt","internal/bitbucket.org/ww/goautoneg","model"] + packages = [ + "expfmt", + "internal/bitbucket.org/ww/goautoneg", + "model" + ] revision = "e3fb1a1acd7605367a2b378bc2e2f893c05174b7" [[projects]] branch = "master" name = "github.com/prometheus/procfs" - packages = [".","xfs"] + packages = [ + ".", + "xfs" + ] revision = "a6e9df898b1336106c743392c48ee0b71f5c4efa" [[projects]] name = "github.com/spf13/afero" - packages = [".","mem"] + packages = [ + ".", + "mem" + ] revision = "8d919cbe7e2627e417f3e45c3c0e489a5b7e2536" version = "v1.0.0" @@ -346,30 +456,91 @@ [[projects]] branch = "master" name = "golang.org/x/net" - packages = ["context","context/ctxhttp","html","html/atom","html/charset","http2","http2/hpack","idna","internal/timeseries","lex/httplex","publicsuffix","trace"] + packages = [ + "context", + "context/ctxhttp", + "html", + "html/atom", + "html/charset", + "http2", + "http2/hpack", + "idna", + "internal/timeseries", + "lex/httplex", + "publicsuffix", + "trace" + ] revision = "a337091b0525af65de94df2eb7e98bd9962dcbe2" [[projects]] branch = "master" name = "golang.org/x/oauth2" - packages = [".","google","internal","jws","jwt"] + packages = [ + ".", + "google", + "internal", + "jws", + "jwt" + ] revision = "9ff8ebcc8e241d46f52ecc5bff0e5a2f2dbef402" [[projects]] branch = "master" name = "golang.org/x/sys" - packages = ["unix","windows"] + packages = [ + "unix", + "windows" + ] revision = "1e2299c37cc91a509f1b12369872d27be0ce98a6" [[projects]] branch = "master" name = "golang.org/x/text" - packages = ["collate","collate/build","encoding","encoding/charmap","encoding/htmlindex","encoding/internal","encoding/internal/identifier","encoding/japanese","encoding/korean","encoding/simplifiedchinese","encoding/traditionalchinese","encoding/unicode","internal/colltab","internal/gen","internal/tag","internal/triegen","internal/ucd","internal/utf8internal","language","runes","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable","width"] + packages = [ + "collate", + "collate/build", + "encoding", + "encoding/charmap", + "encoding/htmlindex", + "encoding/internal", + "encoding/internal/identifier", + "encoding/japanese", + "encoding/korean", + "encoding/simplifiedchinese", + "encoding/traditionalchinese", + "encoding/unicode", + "internal/colltab", + "internal/gen", + "internal/tag", + "internal/triegen", + "internal/ucd", + "internal/utf8internal", + "language", + "runes", + "secure/bidirule", + "transform", + "unicode/bidi", + "unicode/cldr", + "unicode/norm", + "unicode/rangetable", + "width" + ] revision = "88f656faf3f37f690df1a32515b479415e1a6769" [[projects]] name = "google.golang.org/appengine" - packages = [".","internal","internal/app_identity","internal/base","internal/datastore","internal/log","internal/modules","internal/remote_api","internal/urlfetch","urlfetch"] + packages = [ + ".", + "internal", + "internal/app_identity", + "internal/base", + "internal/datastore", + "internal/log", + "internal/modules", + "internal/remote_api", + "internal/urlfetch", + "urlfetch" + ] revision = "150dc57a1b433e64154302bdc40b6bb8aefa313a" version = "v1.0.0" @@ -381,7 +552,25 @@ [[projects]] name = "google.golang.org/grpc" - packages = [".","balancer","codes","connectivity","credentials","grpclb/grpc_lb_v1/messages","grpclog","internal","keepalive","metadata","naming","peer","resolver","stats","status","tap","transport"] + packages = [ + ".", + "balancer", + "codes", + "connectivity", + "credentials", + "grpclb/grpc_lb_v1/messages", + "grpclog", + "internal", + "keepalive", + "metadata", + "naming", + "peer", + "resolver", + "stats", + "status", + "tap", + "transport" + ] revision = "5ffe3083946d5603a0578721101dc8165b1d5b5f" version = "v1.7.2" @@ -412,42 +601,345 @@ [[projects]] branch = "release-1.9" name = "k8s.io/api" - packages = ["admissionregistration/v1alpha1","admissionregistration/v1beta1","apps/v1","apps/v1beta1","apps/v1beta2","authentication/v1","authentication/v1beta1","authorization/v1","authorization/v1beta1","autoscaling/v1","autoscaling/v2beta1","batch/v1","batch/v1beta1","batch/v2alpha1","certificates/v1beta1","core/v1","events/v1beta1","extensions/v1beta1","networking/v1","policy/v1beta1","rbac/v1","rbac/v1alpha1","rbac/v1beta1","scheduling/v1alpha1","settings/v1alpha1","storage/v1","storage/v1alpha1","storage/v1beta1"] + packages = [ + "admissionregistration/v1alpha1", + "admissionregistration/v1beta1", + "apps/v1", + "apps/v1beta1", + "apps/v1beta2", + "authentication/v1", + "authentication/v1beta1", + "authorization/v1", + "authorization/v1beta1", + "autoscaling/v1", + "autoscaling/v2beta1", + "batch/v1", + "batch/v1beta1", + "batch/v2alpha1", + "certificates/v1beta1", + "core/v1", + "events/v1beta1", + "extensions/v1beta1", + "networking/v1", + "policy/v1beta1", + "rbac/v1", + "rbac/v1alpha1", + "rbac/v1beta1", + "scheduling/v1alpha1", + "settings/v1alpha1", + "storage/v1", + "storage/v1alpha1", + "storage/v1beta1" + ] revision = "006a217681ae70cbacdd66a5e2fca1a61a8ff28e" [[projects]] branch = "master" name = "k8s.io/apiextensions-apiserver" - packages = ["pkg/apis/apiextensions","pkg/apis/apiextensions/v1beta1","pkg/client/clientset/clientset","pkg/client/clientset/clientset/scheme","pkg/client/clientset/clientset/typed/apiextensions/v1beta1","pkg/features"] + packages = [ + "pkg/apis/apiextensions", + "pkg/apis/apiextensions/v1beta1", + "pkg/client/clientset/clientset", + "pkg/client/clientset/clientset/scheme", + "pkg/client/clientset/clientset/typed/apiextensions/v1beta1", + "pkg/features" + ] revision = "51a1910459f074162eb4e25233e461fe91e99405" [[projects]] branch = "release-1.9" name = "k8s.io/apimachinery" - packages = ["pkg/api/equality","pkg/api/errors","pkg/api/meta","pkg/api/resource","pkg/api/validation","pkg/apimachinery","pkg/apimachinery/announced","pkg/apimachinery/registered","pkg/apis/meta/internalversion","pkg/apis/meta/v1","pkg/apis/meta/v1/unstructured","pkg/apis/meta/v1/validation","pkg/apis/meta/v1alpha1","pkg/conversion","pkg/conversion/queryparams","pkg/fields","pkg/labels","pkg/runtime","pkg/runtime/schema","pkg/runtime/serializer","pkg/runtime/serializer/json","pkg/runtime/serializer/protobuf","pkg/runtime/serializer/recognizer","pkg/runtime/serializer/streaming","pkg/runtime/serializer/versioning","pkg/selection","pkg/types","pkg/util/cache","pkg/util/clock","pkg/util/diff","pkg/util/errors","pkg/util/framer","pkg/util/httpstream","pkg/util/httpstream/spdy","pkg/util/intstr","pkg/util/json","pkg/util/mergepatch","pkg/util/net","pkg/util/remotecommand","pkg/util/runtime","pkg/util/sets","pkg/util/strategicpatch","pkg/util/uuid","pkg/util/validation","pkg/util/validation/field","pkg/util/wait","pkg/util/yaml","pkg/version","pkg/watch","third_party/forked/golang/json","third_party/forked/golang/netutil","third_party/forked/golang/reflect"] + packages = [ + "pkg/api/equality", + "pkg/api/errors", + "pkg/api/meta", + "pkg/api/resource", + "pkg/api/validation", + "pkg/apimachinery", + "pkg/apimachinery/announced", + "pkg/apimachinery/registered", + "pkg/apis/meta/internalversion", + "pkg/apis/meta/v1", + "pkg/apis/meta/v1/unstructured", + "pkg/apis/meta/v1/validation", + "pkg/apis/meta/v1alpha1", + "pkg/conversion", + "pkg/conversion/queryparams", + "pkg/fields", + "pkg/labels", + "pkg/runtime", + "pkg/runtime/schema", + "pkg/runtime/serializer", + "pkg/runtime/serializer/json", + "pkg/runtime/serializer/protobuf", + "pkg/runtime/serializer/recognizer", + "pkg/runtime/serializer/streaming", + "pkg/runtime/serializer/versioning", + "pkg/selection", + "pkg/types", + "pkg/util/cache", + "pkg/util/clock", + "pkg/util/diff", + "pkg/util/errors", + "pkg/util/framer", + "pkg/util/httpstream", + "pkg/util/httpstream/spdy", + "pkg/util/intstr", + "pkg/util/json", + "pkg/util/mergepatch", + "pkg/util/net", + "pkg/util/remotecommand", + "pkg/util/runtime", + "pkg/util/sets", + "pkg/util/strategicpatch", + "pkg/util/uuid", + "pkg/util/validation", + "pkg/util/validation/field", + "pkg/util/wait", + "pkg/util/yaml", + "pkg/version", + "pkg/watch", + "third_party/forked/golang/json", + "third_party/forked/golang/netutil", + "third_party/forked/golang/reflect" + ] revision = "68f9c3a1feb3140df59c67ced62d3a5df8e6c9c2" [[projects]] branch = "master" name = "k8s.io/apiserver" - packages = ["pkg/authentication/authenticator","pkg/authentication/serviceaccount","pkg/authentication/user","pkg/features","pkg/server/healthz","pkg/util/feature","pkg/util/logs"] + packages = [ + "pkg/authentication/authenticator", + "pkg/authentication/serviceaccount", + "pkg/authentication/user", + "pkg/features", + "pkg/server/healthz", + "pkg/util/feature", + "pkg/util/logs" + ] revision = "7001bc4df8883d4a0ec84cd4b2117655a0009b6c" [[projects]] name = "k8s.io/client-go" - packages = ["discovery","discovery/fake","informers","informers/admissionregistration","informers/admissionregistration/v1alpha1","informers/admissionregistration/v1beta1","informers/apps","informers/apps/v1","informers/apps/v1beta1","informers/apps/v1beta2","informers/autoscaling","informers/autoscaling/v1","informers/autoscaling/v2beta1","informers/batch","informers/batch/v1","informers/batch/v1beta1","informers/batch/v2alpha1","informers/certificates","informers/certificates/v1beta1","informers/core","informers/core/v1","informers/events","informers/events/v1beta1","informers/extensions","informers/extensions/v1beta1","informers/internalinterfaces","informers/networking","informers/networking/v1","informers/policy","informers/policy/v1beta1","informers/rbac","informers/rbac/v1","informers/rbac/v1alpha1","informers/rbac/v1beta1","informers/scheduling","informers/scheduling/v1alpha1","informers/settings","informers/settings/v1alpha1","informers/storage","informers/storage/v1","informers/storage/v1alpha1","informers/storage/v1beta1","kubernetes","kubernetes/fake","kubernetes/scheme","kubernetes/typed/admissionregistration/v1alpha1","kubernetes/typed/admissionregistration/v1alpha1/fake","kubernetes/typed/admissionregistration/v1beta1","kubernetes/typed/admissionregistration/v1beta1/fake","kubernetes/typed/apps/v1","kubernetes/typed/apps/v1/fake","kubernetes/typed/apps/v1beta1","kubernetes/typed/apps/v1beta1/fake","kubernetes/typed/apps/v1beta2","kubernetes/typed/apps/v1beta2/fake","kubernetes/typed/authentication/v1","kubernetes/typed/authentication/v1/fake","kubernetes/typed/authentication/v1beta1","kubernetes/typed/authentication/v1beta1/fake","kubernetes/typed/authorization/v1","kubernetes/typed/authorization/v1/fake","kubernetes/typed/authorization/v1beta1","kubernetes/typed/authorization/v1beta1/fake","kubernetes/typed/autoscaling/v1","kubernetes/typed/autoscaling/v1/fake","kubernetes/typed/autoscaling/v2beta1","kubernetes/typed/autoscaling/v2beta1/fake","kubernetes/typed/batch/v1","kubernetes/typed/batch/v1/fake","kubernetes/typed/batch/v1beta1","kubernetes/typed/batch/v1beta1/fake","kubernetes/typed/batch/v2alpha1","kubernetes/typed/batch/v2alpha1/fake","kubernetes/typed/certificates/v1beta1","kubernetes/typed/certificates/v1beta1/fake","kubernetes/typed/core/v1","kubernetes/typed/core/v1/fake","kubernetes/typed/events/v1beta1","kubernetes/typed/events/v1beta1/fake","kubernetes/typed/extensions/v1beta1","kubernetes/typed/extensions/v1beta1/fake","kubernetes/typed/networking/v1","kubernetes/typed/networking/v1/fake","kubernetes/typed/policy/v1beta1","kubernetes/typed/policy/v1beta1/fake","kubernetes/typed/rbac/v1","kubernetes/typed/rbac/v1/fake","kubernetes/typed/rbac/v1alpha1","kubernetes/typed/rbac/v1alpha1/fake","kubernetes/typed/rbac/v1beta1","kubernetes/typed/rbac/v1beta1/fake","kubernetes/typed/scheduling/v1alpha1","kubernetes/typed/scheduling/v1alpha1/fake","kubernetes/typed/settings/v1alpha1","kubernetes/typed/settings/v1alpha1/fake","kubernetes/typed/storage/v1","kubernetes/typed/storage/v1/fake","kubernetes/typed/storage/v1alpha1","kubernetes/typed/storage/v1alpha1/fake","kubernetes/typed/storage/v1beta1","kubernetes/typed/storage/v1beta1/fake","listers/admissionregistration/v1alpha1","listers/admissionregistration/v1beta1","listers/apps/v1","listers/apps/v1beta1","listers/apps/v1beta2","listers/autoscaling/v1","listers/autoscaling/v2beta1","listers/batch/v1","listers/batch/v1beta1","listers/batch/v2alpha1","listers/certificates/v1beta1","listers/core/v1","listers/events/v1beta1","listers/extensions/v1beta1","listers/networking/v1","listers/policy/v1beta1","listers/rbac/v1","listers/rbac/v1alpha1","listers/rbac/v1beta1","listers/scheduling/v1alpha1","listers/settings/v1alpha1","listers/storage/v1","listers/storage/v1alpha1","listers/storage/v1beta1","pkg/version","plugin/pkg/client/auth","plugin/pkg/client/auth/azure","plugin/pkg/client/auth/gcp","plugin/pkg/client/auth/oidc","plugin/pkg/client/auth/openstack","rest","rest/watch","testing","third_party/forked/golang/template","tools/auth","tools/cache","tools/clientcmd","tools/clientcmd/api","tools/clientcmd/api/latest","tools/clientcmd/api/v1","tools/leaderelection","tools/leaderelection/resourcelock","tools/metrics","tools/pager","tools/record","tools/reference","tools/remotecommand","transport","transport/spdy","util/buffer","util/cert","util/cert/triple","util/exec","util/flowcontrol","util/homedir","util/integer","util/jsonpath","util/retry","util/workqueue"] + packages = [ + "discovery", + "discovery/fake", + "informers", + "informers/admissionregistration", + "informers/admissionregistration/v1alpha1", + "informers/admissionregistration/v1beta1", + "informers/apps", + "informers/apps/v1", + "informers/apps/v1beta1", + "informers/apps/v1beta2", + "informers/autoscaling", + "informers/autoscaling/v1", + "informers/autoscaling/v2beta1", + "informers/batch", + "informers/batch/v1", + "informers/batch/v1beta1", + "informers/batch/v2alpha1", + "informers/certificates", + "informers/certificates/v1beta1", + "informers/core", + "informers/core/v1", + "informers/events", + "informers/events/v1beta1", + "informers/extensions", + "informers/extensions/v1beta1", + "informers/internalinterfaces", + "informers/networking", + "informers/networking/v1", + "informers/policy", + "informers/policy/v1beta1", + "informers/rbac", + "informers/rbac/v1", + "informers/rbac/v1alpha1", + "informers/rbac/v1beta1", + "informers/scheduling", + "informers/scheduling/v1alpha1", + "informers/settings", + "informers/settings/v1alpha1", + "informers/storage", + "informers/storage/v1", + "informers/storage/v1alpha1", + "informers/storage/v1beta1", + "kubernetes", + "kubernetes/fake", + "kubernetes/scheme", + "kubernetes/typed/admissionregistration/v1alpha1", + "kubernetes/typed/admissionregistration/v1alpha1/fake", + "kubernetes/typed/admissionregistration/v1beta1", + "kubernetes/typed/admissionregistration/v1beta1/fake", + "kubernetes/typed/apps/v1", + "kubernetes/typed/apps/v1/fake", + "kubernetes/typed/apps/v1beta1", + "kubernetes/typed/apps/v1beta1/fake", + "kubernetes/typed/apps/v1beta2", + "kubernetes/typed/apps/v1beta2/fake", + "kubernetes/typed/authentication/v1", + "kubernetes/typed/authentication/v1/fake", + "kubernetes/typed/authentication/v1beta1", + "kubernetes/typed/authentication/v1beta1/fake", + "kubernetes/typed/authorization/v1", + "kubernetes/typed/authorization/v1/fake", + "kubernetes/typed/authorization/v1beta1", + "kubernetes/typed/authorization/v1beta1/fake", + "kubernetes/typed/autoscaling/v1", + "kubernetes/typed/autoscaling/v1/fake", + "kubernetes/typed/autoscaling/v2beta1", + "kubernetes/typed/autoscaling/v2beta1/fake", + "kubernetes/typed/batch/v1", + "kubernetes/typed/batch/v1/fake", + "kubernetes/typed/batch/v1beta1", + "kubernetes/typed/batch/v1beta1/fake", + "kubernetes/typed/batch/v2alpha1", + "kubernetes/typed/batch/v2alpha1/fake", + "kubernetes/typed/certificates/v1beta1", + "kubernetes/typed/certificates/v1beta1/fake", + "kubernetes/typed/core/v1", + "kubernetes/typed/core/v1/fake", + "kubernetes/typed/events/v1beta1", + "kubernetes/typed/events/v1beta1/fake", + "kubernetes/typed/extensions/v1beta1", + "kubernetes/typed/extensions/v1beta1/fake", + "kubernetes/typed/networking/v1", + "kubernetes/typed/networking/v1/fake", + "kubernetes/typed/policy/v1beta1", + "kubernetes/typed/policy/v1beta1/fake", + "kubernetes/typed/rbac/v1", + "kubernetes/typed/rbac/v1/fake", + "kubernetes/typed/rbac/v1alpha1", + "kubernetes/typed/rbac/v1alpha1/fake", + "kubernetes/typed/rbac/v1beta1", + "kubernetes/typed/rbac/v1beta1/fake", + "kubernetes/typed/scheduling/v1alpha1", + "kubernetes/typed/scheduling/v1alpha1/fake", + "kubernetes/typed/settings/v1alpha1", + "kubernetes/typed/settings/v1alpha1/fake", + "kubernetes/typed/storage/v1", + "kubernetes/typed/storage/v1/fake", + "kubernetes/typed/storage/v1alpha1", + "kubernetes/typed/storage/v1alpha1/fake", + "kubernetes/typed/storage/v1beta1", + "kubernetes/typed/storage/v1beta1/fake", + "listers/admissionregistration/v1alpha1", + "listers/admissionregistration/v1beta1", + "listers/apps/v1", + "listers/apps/v1beta1", + "listers/apps/v1beta2", + "listers/autoscaling/v1", + "listers/autoscaling/v2beta1", + "listers/batch/v1", + "listers/batch/v1beta1", + "listers/batch/v2alpha1", + "listers/certificates/v1beta1", + "listers/core/v1", + "listers/events/v1beta1", + "listers/extensions/v1beta1", + "listers/networking/v1", + "listers/policy/v1beta1", + "listers/rbac/v1", + "listers/rbac/v1alpha1", + "listers/rbac/v1beta1", + "listers/scheduling/v1alpha1", + "listers/settings/v1alpha1", + "listers/storage/v1", + "listers/storage/v1alpha1", + "listers/storage/v1beta1", + "pkg/version", + "plugin/pkg/client/auth", + "plugin/pkg/client/auth/azure", + "plugin/pkg/client/auth/gcp", + "plugin/pkg/client/auth/oidc", + "plugin/pkg/client/auth/openstack", + "rest", + "rest/watch", + "testing", + "third_party/forked/golang/template", + "tools/auth", + "tools/cache", + "tools/clientcmd", + "tools/clientcmd/api", + "tools/clientcmd/api/latest", + "tools/clientcmd/api/v1", + "tools/leaderelection", + "tools/leaderelection/resourcelock", + "tools/metrics", + "tools/pager", + "tools/record", + "tools/reference", + "tools/remotecommand", + "transport", + "transport/spdy", + "util/buffer", + "util/cert", + "util/cert/triple", + "util/exec", + "util/flowcontrol", + "util/homedir", + "util/integer", + "util/jsonpath", + "util/retry", + "util/workqueue" + ] revision = "78700dec6369ba22221b72770783300f143df150" version = "v6.0.0" [[projects]] branch = "master" name = "k8s.io/kube-openapi" - packages = ["pkg/common","pkg/util/proto"] + packages = [ + "pkg/common", + "pkg/util/proto" + ] revision = "39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1" [[projects]] name = "k8s.io/kubernetes" - packages = ["pkg/api/legacyscheme","pkg/api/service","pkg/api/v1/pod","pkg/apis/autoscaling","pkg/apis/core","pkg/apis/core/helper","pkg/apis/core/install","pkg/apis/core/pods","pkg/apis/core/v1","pkg/apis/core/v1/helper","pkg/apis/core/validation","pkg/apis/extensions","pkg/apis/networking","pkg/capabilities","pkg/cloudprovider","pkg/controller","pkg/features","pkg/fieldpath","pkg/kubelet/apis","pkg/kubelet/apis/cri/v1alpha1/runtime","pkg/kubelet/container","pkg/kubelet/types","pkg/kubelet/util/format","pkg/kubelet/util/ioutils","pkg/kubelet/util/sliceutils","pkg/security/apparmor","pkg/serviceaccount","pkg/util/file","pkg/util/filesystem","pkg/util/hash","pkg/util/io","pkg/util/mount","pkg/util/net/sets","pkg/util/nsenter","pkg/util/parsers","pkg/util/pointer","pkg/util/sysctl","pkg/util/taints","pkg/volume","pkg/volume/util","third_party/forked/golang/expansion"] + packages = [ + "pkg/api/legacyscheme", + "pkg/api/service", + "pkg/api/v1/pod", + "pkg/apis/autoscaling", + "pkg/apis/core", + "pkg/apis/core/helper", + "pkg/apis/core/install", + "pkg/apis/core/pods", + "pkg/apis/core/v1", + "pkg/apis/core/v1/helper", + "pkg/apis/core/validation", + "pkg/apis/extensions", + "pkg/apis/networking", + "pkg/capabilities", + "pkg/cloudprovider", + "pkg/controller", + "pkg/features", + "pkg/fieldpath", + "pkg/kubelet/apis", + "pkg/kubelet/apis/cri/v1alpha1/runtime", + "pkg/kubelet/container", + "pkg/kubelet/types", + "pkg/kubelet/util/format", + "pkg/kubelet/util/ioutils", + "pkg/kubelet/util/sliceutils", + "pkg/security/apparmor", + "pkg/serviceaccount", + "pkg/util/file", + "pkg/util/filesystem", + "pkg/util/hash", + "pkg/util/io", + "pkg/util/mount", + "pkg/util/net/sets", + "pkg/util/nsenter", + "pkg/util/parsers", + "pkg/util/pointer", + "pkg/util/sysctl", + "pkg/util/taints", + "pkg/volume", + "pkg/volume/util", + "third_party/forked/golang/expansion" + ] revision = "3a1c9449a956b6026f075fa3134ff92f7d55f812" version = "v1.9.1" @@ -460,6 +952,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "8cbe0f3a256af6b405000523955abdc552a8cc35b9a3dfbcb2816851de6a5037" + inputs-digest = "085f93508ceea89a502baf7d00c4649c9fdfde67442b7588d9b39a0520abe147" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index ef78a712cf..5b769efd66 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -20,12 +20,8 @@ # name = "github.com/x/y" # version = "2.4.0" -[[override]] - name = "github.com/docker/distribution" - revision = "edc3ab29cdff8694dd6feb85cfeb4b5f1b38ed9c" - [[constraint]] - name = "github.com/opencontainers/go-digest" + name = "github.com/eapache/channels" branch = "master" [[constraint]] diff --git a/vendor/github.com/eapache/channels/.gitignore b/vendor/github.com/eapache/channels/.gitignore new file mode 100644 index 0000000000..00268614f0 --- /dev/null +++ b/vendor/github.com/eapache/channels/.gitignore @@ -0,0 +1,22 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe diff --git a/vendor/github.com/eapache/channels/.travis.yml b/vendor/github.com/eapache/channels/.travis.yml new file mode 100644 index 0000000000..b072a4c851 --- /dev/null +++ b/vendor/github.com/eapache/channels/.travis.yml @@ -0,0 +1,11 @@ +language: go +sudo: false + +script: go test -v -race -timeout 10s ./... + +go: + - 1.1 + - 1.2 + - 1.3 + - 1.4 + - 1.5 diff --git a/vendor/github.com/eapache/channels/CHANGELOG.md b/vendor/github.com/eapache/channels/CHANGELOG.md new file mode 100644 index 0000000000..63825cd2ff --- /dev/null +++ b/vendor/github.com/eapache/channels/CHANGELOG.md @@ -0,0 +1,17 @@ +# Changelog + +#### Version 1.1.0 (2015-11-22) + +Bug Fixes: + - The `Len()` and `Cap()` methods on several implementations were racy + ([#18](https://github.com/eapache/channels/issues/18)). + +Note: Fixing the above issue led to a fairly substantial performance hit +(anywhere from 10-25% in benchmarks depending on use case) and involved fairly +major refactoring, which is why this is being released as v1.1.0 instead +of v1.0.1. + +#### Version 1.0.0 (2015-01-24) + +Version 1.0.0 is the first tagged release. All core functionality was available +at this point. diff --git a/vendor/github.com/eapache/channels/LICENSE b/vendor/github.com/eapache/channels/LICENSE new file mode 100644 index 0000000000..8c4bddf755 --- /dev/null +++ b/vendor/github.com/eapache/channels/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/eapache/channels/README.md b/vendor/github.com/eapache/channels/README.md new file mode 100644 index 0000000000..aab2a53a3b --- /dev/null +++ b/vendor/github.com/eapache/channels/README.md @@ -0,0 +1,27 @@ +channels +======== + +[![Build Status](https://travis-ci.org/eapache/channels.svg?branch=master)](https://travis-ci.org/eapache/channels) +[![GoDoc](https://godoc.org/github.com/eapache/channels?status.png)](https://godoc.org/github.com/eapache/channels) +[![Code of Conduct](https://img.shields.io/badge/code%20of%20conduct-active-blue.svg)](https://eapache.github.io/conduct.html) + +A collection of helper functions and special types for working with and +extending [Go](https://golang.org/)'s existing channels. Due to limitations +of Go's type system, importing this library directly is often not practical for +production code. It serves equally well, however, as a reference guide and +template for implementing many common idioms; if you use it in this way I would +appreciate the inclusion of some sort of credit in the resulting code. + +See https://godoc.org/github.com/eapache/channels for full documentation or +https://gopkg.in/eapache/channels.v1 for a versioned import path. + +Requires Go version 1.1 or later, as certain necessary elements of the `reflect` +package were not present in 1.0. + +Most of the buffered channel types in this package are backed by a very fast +queue implementation that used to be built into this package but has now been +extracted into its own package at https://github.com/eapache/queue. + +*Note:* Several types in this package provide so-called "infinite" buffers. Be +very careful using these, as no buffer is truly infinite. If such a buffer +grows too large your program will run out of memory and crash. Caveat emptor. diff --git a/vendor/github.com/eapache/channels/batching_channel.go b/vendor/github.com/eapache/channels/batching_channel.go new file mode 100644 index 0000000000..5be622f2f3 --- /dev/null +++ b/vendor/github.com/eapache/channels/batching_channel.go @@ -0,0 +1,87 @@ +package channels + +// BatchingChannel implements the Channel interface, with the change that instead of producing individual elements +// on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel +// will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel). +type BatchingChannel struct { + input, output chan interface{} + length chan int + buffer []interface{} + size BufferCap +} + +func NewBatchingChannel(size BufferCap) *BatchingChannel { + if size == None { + panic("channels: BatchingChannel does not support unbuffered behaviour") + } + if size < 0 && size != Infinity { + panic("channels: invalid negative size in NewBatchingChannel") + } + ch := &BatchingChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + length: make(chan int), + size: size, + } + go ch.batchingBuffer() + return ch +} + +func (ch *BatchingChannel) In() chan<- interface{} { + return ch.input +} + +// Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided +// by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most +// recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net +// result is that you need an additional type assertion to access the underlying values. +func (ch *BatchingChannel) Out() <-chan interface{} { + return ch.output +} + +func (ch *BatchingChannel) Len() int { + return <-ch.length +} + +func (ch *BatchingChannel) Cap() BufferCap { + return ch.size +} + +func (ch *BatchingChannel) Close() { + close(ch.input) +} + +func (ch *BatchingChannel) batchingBuffer() { + var input, output, nextInput chan interface{} + nextInput = ch.input + input = nextInput + + for input != nil || output != nil { + select { + case elem, open := <-input: + if open { + ch.buffer = append(ch.buffer, elem) + } else { + input = nil + nextInput = nil + } + case output <- ch.buffer: + ch.buffer = nil + case ch.length <- len(ch.buffer): + } + + if len(ch.buffer) == 0 { + input = nextInput + output = nil + } else if ch.size != Infinity && len(ch.buffer) >= int(ch.size) { + input = nil + output = ch.output + } else { + input = nextInput + output = ch.output + } + } + + close(ch.output) + close(ch.length) +} diff --git a/vendor/github.com/eapache/channels/batching_channel_test.go b/vendor/github.com/eapache/channels/batching_channel_test.go new file mode 100644 index 0000000000..8fa329c9c8 --- /dev/null +++ b/vendor/github.com/eapache/channels/batching_channel_test.go @@ -0,0 +1,45 @@ +package channels + +import "testing" + +func testBatches(t *testing.T, ch Channel) { + go func() { + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + }() + + i := 0 + for val := range ch.Out() { + for _, elem := range val.([]interface{}) { + if i != elem.(int) { + t.Fatal("batching channel expected", i, "but got", elem.(int)) + } + i++ + } + } +} + +func TestBatchingChannel(t *testing.T) { + ch := NewBatchingChannel(Infinity) + testBatches(t, ch) + + ch = NewBatchingChannel(2) + testBatches(t, ch) + + ch = NewBatchingChannel(1) + testChannelConcurrentAccessors(t, "batching channel", ch) +} + +func TestBatchingChannelCap(t *testing.T) { + ch := NewBatchingChannel(Infinity) + if ch.Cap() != Infinity { + t.Error("incorrect capacity on infinite channel") + } + + ch = NewBatchingChannel(5) + if ch.Cap() != 5 { + t.Error("incorrect capacity on infinite channel") + } +} diff --git a/vendor/github.com/eapache/channels/black_hole.go b/vendor/github.com/eapache/channels/black_hole.go new file mode 100644 index 0000000000..0d1ba97b3d --- /dev/null +++ b/vendor/github.com/eapache/channels/black_hole.go @@ -0,0 +1,54 @@ +package channels + +// BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in +// the ioutil package - it never blocks, and simply discards every value it reads. The number of items +// discarded in this way is counted and returned from Len. +type BlackHole struct { + input chan interface{} + length chan int + count int +} + +func NewBlackHole() *BlackHole { + ch := &BlackHole{ + input: make(chan interface{}), + length: make(chan int), + } + go ch.discard() + return ch +} + +func (ch *BlackHole) In() chan<- interface{} { + return ch.input +} + +func (ch *BlackHole) Len() int { + val, open := <-ch.length + if open { + return val + } else { + return ch.count + } +} + +func (ch *BlackHole) Cap() BufferCap { + return Infinity +} + +func (ch *BlackHole) Close() { + close(ch.input) +} + +func (ch *BlackHole) discard() { + for { + select { + case _, open := <-ch.input: + if !open { + close(ch.length) + return + } + ch.count++ + case ch.length <- ch.count: + } + } +} diff --git a/vendor/github.com/eapache/channels/black_hole_test.go b/vendor/github.com/eapache/channels/black_hole_test.go new file mode 100644 index 0000000000..c149da6b03 --- /dev/null +++ b/vendor/github.com/eapache/channels/black_hole_test.go @@ -0,0 +1,26 @@ +package channels + +import "testing" + +func TestBlackHole(t *testing.T) { + discard := NewBlackHole() + + for i := 0; i < 1000; i++ { + discard.In() <- i + } + + discard.Close() + + if discard.Len() != 1000 { + t.Error("blackhole expected 1000 was", discard.Len()) + } + + // no asserts here, this is just for the race detector's benefit + ch := NewBlackHole() + go ch.Len() + go ch.Cap() + + go func() { + ch.In() <- nil + }() +} diff --git a/vendor/github.com/eapache/channels/channels.go b/vendor/github.com/eapache/channels/channels.go new file mode 100644 index 0000000000..efcb2b5c50 --- /dev/null +++ b/vendor/github.com/eapache/channels/channels.go @@ -0,0 +1,277 @@ +/* +Package channels provides a collection of helper functions, interfaces and implementations for +working with and extending the capabilities of golang's existing channels. The main interface of +interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface +cannot be met (for example, InChannel for write-only channels). + +For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the +appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions +are also provided for use with native channels which already carry values of type interface{}. + +The heart of the package consists of several distinct implementations of the Channel interface, including +channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A +"black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) +rounds out the set. + +Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix +namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which +do not close their output channel(s) on completion. + +Due to limitations of Go's type system, importing this library directly is often not practical for +production code. It serves equally well, however, as a reference guide and template for implementing +many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit +in the resulting code. + +Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using +these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of +memory and crash. Caveat emptor. +*/ +package channels + +import "reflect" + +// BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all +// positive integers, as well as the special values below. +type BufferCap int + +const ( + // None is the capacity for channels that have no buffer at all. + None BufferCap = 0 + // Infinity is the capacity for channels with no limit on their buffer size. + Infinity BufferCap = -1 +) + +// Buffer is an interface for any channel that provides access to query the state of its buffer. +// Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap(). +type Buffer interface { + Len() int // The number of elements currently buffered. + Cap() BufferCap // The maximum number of elements that can be buffered. +} + +// SimpleInChannel is an interface representing a writeable channel that does not necessarily +// implement the Buffer interface. +type SimpleInChannel interface { + In() chan<- interface{} // The writeable end of the channel. + Close() // Closes the channel. It is an error to write to In() after calling Close(). +} + +// InChannel is an interface representing a writeable channel with a buffer. +type InChannel interface { + SimpleInChannel + Buffer +} + +// SimpleOutChannel is an interface representing a readable channel that does not necessarily +// implement the Buffer interface. +type SimpleOutChannel interface { + Out() <-chan interface{} // The readable end of the channel. +} + +// OutChannel is an interface representing a readable channel implementing the Buffer interface. +type OutChannel interface { + SimpleOutChannel + Buffer +} + +// SimpleChannel is an interface representing a channel that is both readable and writeable, +// but does not necessarily implement the Buffer interface. +type SimpleChannel interface { + SimpleInChannel + SimpleOutChannel +} + +// Channel is an interface representing a channel that is readable, writeable and implements +// the Buffer interface +type Channel interface { + SimpleChannel + Buffer +} + +func pipe(input SimpleOutChannel, output SimpleInChannel, closeWhenDone bool) { + for elem := range input.Out() { + output.In() <- elem + } + if closeWhenDone { + output.Close() + } +} + +func multiplex(output SimpleInChannel, inputs []SimpleOutChannel, closeWhenDone bool) { + inputCount := len(inputs) + cases := make([]reflect.SelectCase, inputCount) + for i := range cases { + cases[i].Dir = reflect.SelectRecv + cases[i].Chan = reflect.ValueOf(inputs[i].Out()) + } + for inputCount > 0 { + chosen, recv, recvOK := reflect.Select(cases) + if recvOK { + output.In() <- recv.Interface() + } else { + cases[chosen].Chan = reflect.ValueOf(nil) + inputCount-- + } + } + if closeWhenDone { + output.Close() + } +} + +func tee(input SimpleOutChannel, outputs []SimpleInChannel, closeWhenDone bool) { + cases := make([]reflect.SelectCase, len(outputs)) + for i := range cases { + cases[i].Dir = reflect.SelectSend + } + for elem := range input.Out() { + for i := range cases { + cases[i].Chan = reflect.ValueOf(outputs[i].In()) + cases[i].Send = reflect.ValueOf(elem) + } + for _ = range cases { + chosen, _, _ := reflect.Select(cases) + cases[chosen].Chan = reflect.ValueOf(nil) + } + } + if closeWhenDone { + for i := range outputs { + outputs[i].Close() + } + } +} + +func distribute(input SimpleOutChannel, outputs []SimpleInChannel, closeWhenDone bool) { + cases := make([]reflect.SelectCase, len(outputs)) + for i := range cases { + cases[i].Dir = reflect.SelectSend + cases[i].Chan = reflect.ValueOf(outputs[i].In()) + } + for elem := range input.Out() { + for i := range cases { + cases[i].Send = reflect.ValueOf(elem) + } + reflect.Select(cases) + } + if closeWhenDone { + for i := range outputs { + outputs[i].Close() + } + } +} + +// Pipe connects the input channel to the output channel so that +// they behave as if a single channel. +func Pipe(input SimpleOutChannel, output SimpleInChannel) { + go pipe(input, output, true) +} + +// Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output +// channel. When all input channels have been closed, the output channel is closed. Multiplex with a single +// input channel is equivalent to Pipe (though slightly less efficient). +func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel) { + if len(inputs) == 0 { + panic("channels: Multiplex requires at least one input") + } + go multiplex(output, inputs, true) +} + +// Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels +// and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. +// Tee with a single output channel is equivalent to Pipe (though slightly less efficient). +func Tee(input SimpleOutChannel, outputs ...SimpleInChannel) { + if len(outputs) == 0 { + panic("channels: Tee requires at least one output") + } + go tee(input, outputs, true) +} + +// Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input +// into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the +// input channel is closed, all outputs channels are closed. Distribute with a single output channel is +// equivalent to Pipe (though slightly less efficient). +func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel) { + if len(outputs) == 0 { + panic("channels: Distribute requires at least one output") + } + go distribute(input, outputs, true) +} + +// WeakPipe behaves like Pipe (connecting the two channels) except that it does not close +// the output channel when the input channel is closed. +func WeakPipe(input SimpleOutChannel, output SimpleInChannel) { + go pipe(input, output, false) +} + +// WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close +// the output channel when the input channels are closed. +func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel) { + if len(inputs) == 0 { + panic("channels: WeakMultiplex requires at least one input") + } + go multiplex(output, inputs, false) +} + +// WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close +// the output channels when the input channel is closed. +func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel) { + if len(outputs) == 0 { + panic("channels: WeakTee requires at least one output") + } + go tee(input, outputs, false) +} + +// WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that +// it does not close the output channels when the input channel is closed. +func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel) { + if len(outputs) == 0 { + panic("channels: WeakDistribute requires at least one output") + } + go distribute(input, outputs, false) +} + +// Wrap takes any readable channel type (chan or <-chan but not chan<-) and +// exposes it as a SimpleOutChannel for easy integration with existing channel sources. +// It panics if the input is not a readable channel. +func Wrap(ch interface{}) SimpleOutChannel { + t := reflect.TypeOf(ch) + if t.Kind() != reflect.Chan || t.ChanDir()&reflect.RecvDir == 0 { + panic("channels: input to Wrap must be readable channel") + } + realChan := make(chan interface{}) + + go func() { + v := reflect.ValueOf(ch) + for { + x, ok := v.Recv() + if !ok { + close(realChan) + return + } + realChan <- x.Interface() + } + }() + + return NativeOutChannel(realChan) +} + +// Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for +// easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). +// It panics if the output is not a writable channel, or if a value is received that cannot be sent on the +// output channel. +func Unwrap(input SimpleOutChannel, output interface{}) { + t := reflect.TypeOf(output) + if t.Kind() != reflect.Chan || t.ChanDir()&reflect.SendDir == 0 { + panic("channels: input to Unwrap must be readable channel") + } + + go func() { + v := reflect.ValueOf(output) + for { + x, ok := <-input.Out() + if !ok { + v.Close() + return + } + v.Send(reflect.ValueOf(x)) + } + }() +} diff --git a/vendor/github.com/eapache/channels/channels_test.go b/vendor/github.com/eapache/channels/channels_test.go new file mode 100644 index 0000000000..1ca17a0eec --- /dev/null +++ b/vendor/github.com/eapache/channels/channels_test.go @@ -0,0 +1,265 @@ +package channels + +import ( + "math/rand" + "testing" + "time" +) + +func testChannel(t *testing.T, name string, ch Channel) { + go func() { + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + }() + for i := 0; i < 1000; i++ { + val := <-ch.Out() + if i != val.(int) { + t.Fatal(name, "expected", i, "but got", val.(int)) + } + } +} + +func testChannelPair(t *testing.T, name string, in InChannel, out OutChannel) { + go func() { + for i := 0; i < 1000; i++ { + in.In() <- i + } + in.Close() + }() + for i := 0; i < 1000; i++ { + val := <-out.Out() + if i != val.(int) { + t.Fatal("pair", name, "expected", i, "but got", val.(int)) + } + } +} + +func testChannelConcurrentAccessors(t *testing.T, name string, ch Channel) { + // no asserts here, this is just for the race detector's benefit + go ch.Len() + go ch.Cap() + + go func() { + ch.In() <- nil + }() + + go func() { + <-ch.Out() + }() +} + +func TestPipe(t *testing.T) { + a := NewNativeChannel(None) + b := NewNativeChannel(None) + + Pipe(a, b) + + testChannelPair(t, "pipe", a, b) +} + +func TestWeakPipe(t *testing.T) { + a := NewNativeChannel(None) + b := NewNativeChannel(None) + + WeakPipe(a, b) + + testChannelPair(t, "pipe", a, b) +} + +func testMultiplex(t *testing.T, multi func(output SimpleInChannel, inputs ...SimpleOutChannel)) { + a := NewNativeChannel(None) + b := NewNativeChannel(None) + + multi(b, a) + + testChannelPair(t, "simple multiplex", a, b) + + a = NewNativeChannel(None) + inputs := []Channel{ + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + } + + multi(a, inputs[0], inputs[1], inputs[2], inputs[3]) + + go func() { + rand.Seed(time.Now().Unix()) + for i := 0; i < 1000; i++ { + inputs[rand.Intn(len(inputs))].In() <- i + } + for i := range inputs { + inputs[i].Close() + } + }() + for i := 0; i < 1000; i++ { + val := <-a.Out() + if i != val.(int) { + t.Fatal("multiplexing expected", i, "but got", val.(int)) + } + } +} + +func TestMultiplex(t *testing.T) { + testMultiplex(t, Multiplex) +} + +func TestWeakMultiplex(t *testing.T) { + testMultiplex(t, WeakMultiplex) +} + +func testTee(t *testing.T, tee func(input SimpleOutChannel, outputs ...SimpleInChannel)) { + a := NewNativeChannel(None) + b := NewNativeChannel(None) + + tee(a, b) + + testChannelPair(t, "simple tee", a, b) + + a = NewNativeChannel(None) + outputs := []Channel{ + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + } + + tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) + + go func() { + for i := 0; i < 1000; i++ { + a.In() <- i + } + a.Close() + }() + for i := 0; i < 1000; i++ { + for _, output := range outputs { + val := <-output.Out() + if i != val.(int) { + t.Fatal("teeing expected", i, "but got", val.(int)) + } + } + } +} + +func TestTee(t *testing.T) { + testTee(t, Tee) +} + +func TestWeakTee(t *testing.T) { + testTee(t, WeakTee) +} + +func testDistribute(t *testing.T, dist func(input SimpleOutChannel, outputs ...SimpleInChannel)) { + a := NewNativeChannel(None) + b := NewNativeChannel(None) + + dist(a, b) + + testChannelPair(t, "simple distribute", a, b) + + a = NewNativeChannel(None) + outputs := []Channel{ + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + NewNativeChannel(None), + } + + dist(a, outputs[0], outputs[1], outputs[2], outputs[3]) + + go func() { + for i := 0; i < 1000; i++ { + a.In() <- i + } + a.Close() + }() + + received := make([]bool, 1000) + for _ = range received { + var val interface{} + select { + case val = <-outputs[0].Out(): + case val = <-outputs[1].Out(): + case val = <-outputs[2].Out(): + case val = <-outputs[3].Out(): + } + if received[val.(int)] { + t.Fatal("distribute got value twice", val.(int)) + } + received[val.(int)] = true + } + for i := range received { + if !received[i] { + t.Fatal("distribute missed", i) + } + } +} + +func TestDistribute(t *testing.T) { + testDistribute(t, Distribute) +} + +func TestWeakDistribute(t *testing.T) { + testDistribute(t, WeakDistribute) +} + +func TestWrap(t *testing.T) { + rawChan := make(chan int, 5) + ch := Wrap(rawChan) + + for i := 0; i < 5; i++ { + rawChan <- i + } + close(rawChan) + + for i := 0; i < 5; i++ { + x := (<-ch.Out()).(int) + if x != i { + t.Error("Wrapped value", x, "was expecting", i) + } + } + _, ok := <-ch.Out() + if ok { + t.Error("Wrapped channel didn't close") + } +} + +func TestUnwrap(t *testing.T) { + rawChan := make(chan int) + ch := NewNativeChannel(5) + Unwrap(ch, rawChan) + + for i := 0; i < 5; i++ { + ch.In() <- i + } + ch.Close() + + for i := 0; i < 5; i++ { + x := <-rawChan + if x != i { + t.Error("Unwrapped value", x, "was expecting", i) + } + } + _, ok := <-rawChan + if ok { + t.Error("Unwrapped channel didn't close") + } +} + +func ExampleChannel() { + var ch Channel + + ch = NewInfiniteChannel() + + for i := 0; i < 10; i++ { + ch.In() <- nil + } + + for i := 0; i < 10; i++ { + <-ch.Out() + } +} diff --git a/vendor/github.com/eapache/channels/infinite_channel.go b/vendor/github.com/eapache/channels/infinite_channel.go new file mode 100644 index 0000000000..3aa9e8e7eb --- /dev/null +++ b/vendor/github.com/eapache/channels/infinite_channel.go @@ -0,0 +1,72 @@ +package channels + +import "github.com/eapache/queue" + +// InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output. +type InfiniteChannel struct { + input, output chan interface{} + length chan int + buffer *queue.Queue +} + +func NewInfiniteChannel() *InfiniteChannel { + ch := &InfiniteChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + length: make(chan int), + buffer: queue.New(), + } + go ch.infiniteBuffer() + return ch +} + +func (ch *InfiniteChannel) In() chan<- interface{} { + return ch.input +} + +func (ch *InfiniteChannel) Out() <-chan interface{} { + return ch.output +} + +func (ch *InfiniteChannel) Len() int { + return <-ch.length +} + +func (ch *InfiniteChannel) Cap() BufferCap { + return Infinity +} + +func (ch *InfiniteChannel) Close() { + close(ch.input) +} + +func (ch *InfiniteChannel) infiniteBuffer() { + var input, output chan interface{} + var next interface{} + input = ch.input + + for input != nil || output != nil { + select { + case elem, open := <-input: + if open { + ch.buffer.Add(elem) + } else { + input = nil + } + case output <- next: + ch.buffer.Remove() + case ch.length <- ch.buffer.Length(): + } + + if ch.buffer.Length() > 0 { + output = ch.output + next = ch.buffer.Peek() + } else { + output = nil + next = nil + } + } + + close(ch.output) + close(ch.length) +} diff --git a/vendor/github.com/eapache/channels/infinite_channel_test.go b/vendor/github.com/eapache/channels/infinite_channel_test.go new file mode 100644 index 0000000000..a3b2a297ed --- /dev/null +++ b/vendor/github.com/eapache/channels/infinite_channel_test.go @@ -0,0 +1,48 @@ +package channels + +import "testing" + +func TestInfiniteChannel(t *testing.T) { + var ch Channel + + ch = NewInfiniteChannel() + testChannel(t, "infinite channel", ch) + + ch = NewInfiniteChannel() + testChannelPair(t, "infinite channel", ch, ch) + + ch = NewInfiniteChannel() + testChannelConcurrentAccessors(t, "infinite channel", ch) +} + +func BenchmarkInfiniteChannelSerial(b *testing.B) { + ch := NewInfiniteChannel() + for i := 0; i < b.N; i++ { + ch.In() <- nil + } + for i := 0; i < b.N; i++ { + <-ch.Out() + } +} + +func BenchmarkInfiniteChannelParallel(b *testing.B) { + ch := NewInfiniteChannel() + go func() { + for i := 0; i < b.N; i++ { + <-ch.Out() + } + ch.Close() + }() + for i := 0; i < b.N; i++ { + ch.In() <- nil + } + <-ch.Out() +} + +func BenchmarkInfiniteChannelTickTock(b *testing.B) { + ch := NewInfiniteChannel() + for i := 0; i < b.N; i++ { + ch.In() <- nil + <-ch.Out() + } +} diff --git a/vendor/github.com/eapache/channels/native_channel.go b/vendor/github.com/eapache/channels/native_channel.go new file mode 100644 index 0000000000..3807a19915 --- /dev/null +++ b/vendor/github.com/eapache/channels/native_channel.go @@ -0,0 +1,92 @@ +package channels + +// NativeInChannel implements the InChannel interface by wrapping a native go write-only channel. +type NativeInChannel chan<- interface{} + +func (ch NativeInChannel) In() chan<- interface{} { + return ch +} + +func (ch NativeInChannel) Len() int { + return len(ch) +} + +func (ch NativeInChannel) Cap() BufferCap { + return BufferCap(cap(ch)) +} + +func (ch NativeInChannel) Close() { + close(ch) +} + +// NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel. +type NativeOutChannel <-chan interface{} + +func (ch NativeOutChannel) Out() <-chan interface{} { + return ch +} + +func (ch NativeOutChannel) Len() int { + return len(ch) +} + +func (ch NativeOutChannel) Cap() BufferCap { + return BufferCap(cap(ch)) +} + +// NativeChannel implements the Channel interface by wrapping a native go channel. +type NativeChannel chan interface{} + +// NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper +// to avoid having to cast the result of make(). +func NewNativeChannel(size BufferCap) NativeChannel { + return make(chan interface{}, size) +} + +func (ch NativeChannel) In() chan<- interface{} { + return ch +} + +func (ch NativeChannel) Out() <-chan interface{} { + return ch +} + +func (ch NativeChannel) Len() int { + return len(ch) +} + +func (ch NativeChannel) Cap() BufferCap { + return BufferCap(cap(ch)) +} + +func (ch NativeChannel) Close() { + close(ch) +} + +// DeadChannel is a placeholder implementation of the Channel interface with no buffer +// that is never ready for reading or writing. Closing a dead channel is a no-op. +// Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic. +type DeadChannel struct{} + +func NewDeadChannel() DeadChannel { + return DeadChannel{} +} + +func (ch DeadChannel) In() chan<- interface{} { + return nil +} + +func (ch DeadChannel) Out() <-chan interface{} { + return nil +} + +func (ch DeadChannel) Len() int { + return 0 +} + +func (ch DeadChannel) Cap() BufferCap { + return BufferCap(0) +} + +func (ch DeadChannel) Close() { +} diff --git a/vendor/github.com/eapache/channels/native_channel_test.go b/vendor/github.com/eapache/channels/native_channel_test.go new file mode 100644 index 0000000000..c871bd65e2 --- /dev/null +++ b/vendor/github.com/eapache/channels/native_channel_test.go @@ -0,0 +1,58 @@ +package channels + +import "testing" + +func TestNativeChannels(t *testing.T) { + var ch Channel + + ch = NewNativeChannel(None) + testChannel(t, "bufferless native channel", ch) + + ch = NewNativeChannel(None) + testChannelPair(t, "bufferless native channel", ch, ch) + + ch = NewNativeChannel(5) + testChannel(t, "5-buffer native channel", ch) + + ch = NewNativeChannel(5) + testChannelPair(t, "5-buffer native channel", ch, ch) + + ch = NewNativeChannel(None) + testChannelConcurrentAccessors(t, "native channel", ch) +} + +func TestNativeInOutChannels(t *testing.T) { + ch1 := make(chan interface{}) + ch2 := make(chan interface{}) + + Pipe(NativeOutChannel(ch1), NativeInChannel(ch2)) + NativeInChannel(ch1).Close() +} + +func TestDeadChannel(t *testing.T) { + ch := NewDeadChannel() + + if ch.Len() != 0 { + t.Error("dead channel length not 0") + } + if ch.Cap() != 0 { + t.Error("dead channel cap not 0") + } + + select { + case <-ch.Out(): + t.Error("read from a dead channel") + default: + } + + select { + case ch.In() <- nil: + t.Error("wrote to a dead channel") + default: + } + + ch.Close() + + ch = NewDeadChannel() + testChannelConcurrentAccessors(t, "dead channel", ch) +} diff --git a/vendor/github.com/eapache/channels/overflowing_channel.go b/vendor/github.com/eapache/channels/overflowing_channel.go new file mode 100644 index 0000000000..35090f8e85 --- /dev/null +++ b/vendor/github.com/eapache/channels/overflowing_channel.go @@ -0,0 +1,113 @@ +package channels + +import "github.com/eapache/queue" + +// OverflowingChannel implements the Channel interface in a way that never blocks the writer. +// Specifically, if a value is written to an OverflowingChannel when its buffer is full +// (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. +// Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling +// the writer before the reader, so caveat emptor. +// For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel. +type OverflowingChannel struct { + input, output chan interface{} + length chan int + buffer *queue.Queue + size BufferCap +} + +func NewOverflowingChannel(size BufferCap) *OverflowingChannel { + if size < 0 && size != Infinity { + panic("channels: invalid negative size in NewOverflowingChannel") + } + ch := &OverflowingChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + length: make(chan int), + size: size, + } + if size == None { + go ch.overflowingDirect() + } else { + ch.buffer = queue.New() + go ch.overflowingBuffer() + } + return ch +} + +func (ch *OverflowingChannel) In() chan<- interface{} { + return ch.input +} + +func (ch *OverflowingChannel) Out() <-chan interface{} { + return ch.output +} + +func (ch *OverflowingChannel) Len() int { + if ch.size == None { + return 0 + } else { + return <-ch.length + } +} + +func (ch *OverflowingChannel) Cap() BufferCap { + return ch.size +} + +func (ch *OverflowingChannel) Close() { + close(ch.input) +} + +// for entirely unbuffered cases +func (ch *OverflowingChannel) overflowingDirect() { + for elem := range ch.input { + // if we can't write it immediately, drop it and move on + select { + case ch.output <- elem: + default: + } + } + close(ch.output) +} + +// for all buffered cases +func (ch *OverflowingChannel) overflowingBuffer() { + var input, output chan interface{} + var next interface{} + input = ch.input + + for input != nil || output != nil { + select { + // Prefer to write if possible, which is surprisingly effective in reducing + // dropped elements due to overflow. The naive read/write select chooses randomly + // when both channels are ready, which produces unnecessary drops 50% of the time. + case output <- next: + ch.buffer.Remove() + default: + select { + case elem, open := <-input: + if open { + if ch.size == Infinity || ch.buffer.Length() < int(ch.size) { + ch.buffer.Add(elem) + } + } else { + input = nil + } + case output <- next: + ch.buffer.Remove() + case ch.length <- ch.buffer.Length(): + } + } + + if ch.buffer.Length() > 0 { + output = ch.output + next = ch.buffer.Peek() + } else { + output = nil + next = nil + } + } + + close(ch.output) + close(ch.length) +} diff --git a/vendor/github.com/eapache/channels/overflowing_channel_test.go b/vendor/github.com/eapache/channels/overflowing_channel_test.go new file mode 100644 index 0000000000..46837a7936 --- /dev/null +++ b/vendor/github.com/eapache/channels/overflowing_channel_test.go @@ -0,0 +1,49 @@ +package channels + +import "testing" + +func TestOverflowingChannel(t *testing.T) { + var ch Channel + + ch = NewOverflowingChannel(Infinity) // yes this is rather silly, but it should work + testChannel(t, "infinite overflowing channel", ch) + + ch = NewOverflowingChannel(None) + go func() { + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + }() + prev := -1 + for i := range ch.Out() { + if prev >= i.(int) { + t.Fatal("overflowing channel prev", prev, "but got", i.(int)) + } + } + + ch = NewOverflowingChannel(10) + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + for i := 0; i < 10; i++ { + val := <-ch.Out() + if i != val.(int) { + t.Fatal("overflowing channel expected", i, "but got", val.(int)) + } + } + if val, open := <-ch.Out(); open == true { + t.Fatal("overflowing channel expected closed but got", val) + } + + ch = NewOverflowingChannel(None) + ch.In() <- 0 + ch.Close() + if val, open := <-ch.Out(); open == true { + t.Fatal("overflowing channel expected closed but got", val) + } + + ch = NewOverflowingChannel(2) + testChannelConcurrentAccessors(t, "overflowing channel", ch) +} diff --git a/vendor/github.com/eapache/channels/resizable_channel.go b/vendor/github.com/eapache/channels/resizable_channel.go new file mode 100644 index 0000000000..fafed0a29b --- /dev/null +++ b/vendor/github.com/eapache/channels/resizable_channel.go @@ -0,0 +1,109 @@ +package channels + +import "github.com/eapache/queue" + +// ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. +// The channel initially has a buffer size of 1, but can be resized by calling Resize(). +// +// Resizing to a buffer capacity of None is, unfortunately, not supported and will panic +// (see https://github.com/eapache/channels/issues/1). +// Resizing back and forth between a finite and infinite buffer is fully supported. +type ResizableChannel struct { + input, output chan interface{} + length chan int + capacity, resize chan BufferCap + size BufferCap + buffer *queue.Queue +} + +func NewResizableChannel() *ResizableChannel { + ch := &ResizableChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + length: make(chan int), + capacity: make(chan BufferCap), + resize: make(chan BufferCap), + size: 1, + buffer: queue.New(), + } + go ch.magicBuffer() + return ch +} + +func (ch *ResizableChannel) In() chan<- interface{} { + return ch.input +} + +func (ch *ResizableChannel) Out() <-chan interface{} { + return ch.output +} + +func (ch *ResizableChannel) Len() int { + return <-ch.length +} + +func (ch *ResizableChannel) Cap() BufferCap { + val, open := <-ch.capacity + if open { + return val + } else { + return ch.size + } +} + +func (ch *ResizableChannel) Close() { + close(ch.input) +} + +func (ch *ResizableChannel) Resize(newSize BufferCap) { + if newSize == None { + panic("channels: ResizableChannel does not support unbuffered behaviour") + } + if newSize < 0 && newSize != Infinity { + panic("channels: invalid negative size trying to resize channel") + } + ch.resize <- newSize +} + +func (ch *ResizableChannel) magicBuffer() { + var input, output, nextInput chan interface{} + var next interface{} + nextInput = ch.input + input = nextInput + + for input != nil || output != nil { + select { + case elem, open := <-input: + if open { + ch.buffer.Add(elem) + } else { + input = nil + nextInput = nil + } + case output <- next: + ch.buffer.Remove() + case ch.size = <-ch.resize: + case ch.length <- ch.buffer.Length(): + case ch.capacity <- ch.size: + } + + if ch.buffer.Length() == 0 { + output = nil + next = nil + } else { + output = ch.output + next = ch.buffer.Peek() + } + + if ch.size != Infinity && ch.buffer.Length() >= int(ch.size) { + input = nil + } else { + input = nextInput + } + } + + close(ch.output) + close(ch.resize) + close(ch.length) + close(ch.capacity) +} diff --git a/vendor/github.com/eapache/channels/resizable_channel_test.go b/vendor/github.com/eapache/channels/resizable_channel_test.go new file mode 100644 index 0000000000..58b5fd19e6 --- /dev/null +++ b/vendor/github.com/eapache/channels/resizable_channel_test.go @@ -0,0 +1,61 @@ +package channels + +import ( + "math/rand" + "testing" +) + +func TestResizableChannel(t *testing.T) { + var ch *ResizableChannel + + ch = NewResizableChannel() + testChannel(t, "default resizable channel", ch) + + ch = NewResizableChannel() + testChannelPair(t, "default resizable channel", ch, ch) + + ch = NewResizableChannel() + ch.Resize(Infinity) + testChannel(t, "infinite resizable channel", ch) + + ch = NewResizableChannel() + ch.Resize(Infinity) + testChannelPair(t, "infinite resizable channel", ch, ch) + + ch = NewResizableChannel() + ch.Resize(5) + testChannel(t, "5-buffer resizable channel", ch) + + ch = NewResizableChannel() + ch.Resize(5) + testChannelPair(t, "5-buffer resizable channel", ch, ch) + + ch = NewResizableChannel() + testChannelConcurrentAccessors(t, "resizable channel", ch) +} + +func TestResizableChannelOnline(t *testing.T) { + stopper := make(chan bool) + ch := NewResizableChannel() + go func() { + for i := 0; i < 1000; i++ { + ch.In() <- i + } + <-stopper + ch.Close() + }() + + go func() { + for i := 0; i < 1000; i++ { + ch.Resize(BufferCap(rand.Intn(50) + 1)) + } + close(stopper) + }() + + for i := 0; i < 1000; i++ { + val := <-ch.Out() + if i != val.(int) { + t.Fatal("resizable channel expected", i, "but got", val.(int)) + } + } +} diff --git a/vendor/github.com/eapache/channels/ring_channel.go b/vendor/github.com/eapache/channels/ring_channel.go new file mode 100644 index 0000000000..7aec207bdf --- /dev/null +++ b/vendor/github.com/eapache/channels/ring_channel.go @@ -0,0 +1,114 @@ +package channels + +import "github.com/eapache/queue" + +// RingChannel implements the Channel interface in a way that never blocks the writer. +// Specifically, if a value is written to a RingChannel when its buffer is full then the oldest +// value in the buffer is discarded to make room (just like a standard ring-buffer). +// Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling +// the writer before the reader, so caveat emptor. +// For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel. +type RingChannel struct { + input, output chan interface{} + length chan int + buffer *queue.Queue + size BufferCap +} + +func NewRingChannel(size BufferCap) *RingChannel { + if size < 0 && size != Infinity { + panic("channels: invalid negative size in NewRingChannel") + } + ch := &RingChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + buffer: queue.New(), + size: size, + } + if size == None { + go ch.overflowingDirect() + } else { + ch.length = make(chan int) + go ch.ringBuffer() + } + return ch +} + +func (ch *RingChannel) In() chan<- interface{} { + return ch.input +} + +func (ch *RingChannel) Out() <-chan interface{} { + return ch.output +} + +func (ch *RingChannel) Len() int { + if ch.size == None { + return 0 + } else { + return <-ch.length + } +} + +func (ch *RingChannel) Cap() BufferCap { + return ch.size +} + +func (ch *RingChannel) Close() { + close(ch.input) +} + +// for entirely unbuffered cases +func (ch *RingChannel) overflowingDirect() { + for elem := range ch.input { + // if we can't write it immediately, drop it and move on + select { + case ch.output <- elem: + default: + } + } + close(ch.output) +} + +// for all buffered cases +func (ch *RingChannel) ringBuffer() { + var input, output chan interface{} + var next interface{} + input = ch.input + + for input != nil || output != nil { + select { + // Prefer to write if possible, which is surprisingly effective in reducing + // dropped elements due to overflow. The naive read/write select chooses randomly + // when both channels are ready, which produces unnecessary drops 50% of the time. + case output <- next: + ch.buffer.Remove() + default: + select { + case elem, open := <-input: + if open { + ch.buffer.Add(elem) + if ch.size != Infinity && ch.buffer.Length() > int(ch.size) { + ch.buffer.Remove() + } + } else { + input = nil + } + case output <- next: + ch.buffer.Remove() + case ch.length <- ch.buffer.Length(): + } + } + + if ch.buffer.Length() > 0 { + output = ch.output + next = ch.buffer.Peek() + } else { + output = nil + next = nil + } + } + + close(ch.output) + close(ch.length) +} diff --git a/vendor/github.com/eapache/channels/ring_channel_test.go b/vendor/github.com/eapache/channels/ring_channel_test.go new file mode 100644 index 0000000000..92b851a55e --- /dev/null +++ b/vendor/github.com/eapache/channels/ring_channel_test.go @@ -0,0 +1,49 @@ +package channels + +import "testing" + +func TestRingChannel(t *testing.T) { + var ch Channel + + ch = NewRingChannel(Infinity) // yes this is rather silly, but it should work + testChannel(t, "infinite ring-buffer channel", ch) + + ch = NewRingChannel(None) + go func() { + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + }() + prev := -1 + for i := range ch.Out() { + if prev >= i.(int) { + t.Fatal("ring channel prev", prev, "but got", i.(int)) + } + } + + ch = NewRingChannel(10) + for i := 0; i < 1000; i++ { + ch.In() <- i + } + ch.Close() + for i := 990; i < 1000; i++ { + val := <-ch.Out() + if i != val.(int) { + t.Fatal("ring channel expected", i, "but got", val.(int)) + } + } + if val, open := <-ch.Out(); open == true { + t.Fatal("ring channel expected closed but got", val) + } + + ch = NewRingChannel(None) + ch.In() <- 0 + ch.Close() + if val, open := <-ch.Out(); open == true { + t.Fatal("ring channel expected closed but got", val) + } + + ch = NewRingChannel(2) + testChannelConcurrentAccessors(t, "ring channel", ch) +} diff --git a/vendor/github.com/eapache/channels/shared_buffer.go b/vendor/github.com/eapache/channels/shared_buffer.go new file mode 100644 index 0000000000..556dc190a1 --- /dev/null +++ b/vendor/github.com/eapache/channels/shared_buffer.go @@ -0,0 +1,167 @@ +package channels + +import ( + "reflect" + + "github.com/eapache/queue" +) + +//sharedBufferChannel implements SimpleChannel and is created by the public +//SharedBuffer type below +type sharedBufferChannel struct { + in chan interface{} + out chan interface{} + buf *queue.Queue + closed bool +} + +func (sch *sharedBufferChannel) In() chan<- interface{} { + return sch.in +} + +func (sch *sharedBufferChannel) Out() <-chan interface{} { + return sch.out +} + +func (sch *sharedBufferChannel) Close() { + close(sch.in) +} + +//SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. +//Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with +//other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This +//means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style +//parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements +//at any particular step. +type SharedBuffer struct { + cases []reflect.SelectCase // 2n+1 of these; [0] is for control, [1,3,5...] for recv, [2,4,6...] for send + chans []*sharedBufferChannel // n of these + count int + size BufferCap + in chan *sharedBufferChannel +} + +func NewSharedBuffer(size BufferCap) *SharedBuffer { + if size < 0 && size != Infinity { + panic("channels: invalid negative size in NewSharedBuffer") + } else if size == None { + panic("channels: SharedBuffer does not support unbuffered behaviour") + } + + buf := &SharedBuffer{ + size: size, + in: make(chan *sharedBufferChannel), + } + + buf.cases = append(buf.cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(buf.in), + }) + + go buf.mainLoop() + + return buf +} + +//NewChannel spawns and returns a new channel sharing the underlying buffer. +func (buf *SharedBuffer) NewChannel() SimpleChannel { + ch := &sharedBufferChannel{ + in: make(chan interface{}), + out: make(chan interface{}), + buf: queue.New(), + } + buf.in <- ch + return ch +} + +//Close shuts down the SharedBuffer. It is an error to call Close while channels are still using +//the buffer (I'm not really sure what would happen if you do so). +func (buf *SharedBuffer) Close() { + // TODO: what if there are still active channels using this buffer? + close(buf.in) +} + +func (buf *SharedBuffer) mainLoop() { + for { + i, val, ok := reflect.Select(buf.cases) + + if i == 0 { + if !ok { + //Close was called on the SharedBuffer itself + return + } + + //NewChannel was called on the SharedBuffer + ch := val.Interface().(*sharedBufferChannel) + buf.chans = append(buf.chans, ch) + buf.cases = append(buf.cases, + reflect.SelectCase{Dir: reflect.SelectRecv}, + reflect.SelectCase{Dir: reflect.SelectSend}, + ) + if buf.size == Infinity || buf.count < int(buf.size) { + buf.cases[len(buf.cases)-2].Chan = reflect.ValueOf(ch.in) + } + } else if i%2 == 0 { + //Send + if buf.count == int(buf.size) { + //room in the buffer again, re-enable all recv cases + for j := range buf.chans { + if !buf.chans[j].closed { + buf.cases[(j*2)+1].Chan = reflect.ValueOf(buf.chans[j].in) + } + } + } + buf.count-- + ch := buf.chans[(i-1)/2] + if ch.buf.Length() > 0 { + buf.cases[i].Send = reflect.ValueOf(ch.buf.Peek()) + ch.buf.Remove() + } else { + //nothing left for this channel to send, disable sending + buf.cases[i].Chan = reflect.Value{} + buf.cases[i].Send = reflect.Value{} + if ch.closed { + // and it was closed, so close the output channel + //TODO: shrink slice + close(ch.out) + } + } + } else { + ch := buf.chans[i/2] + if ok { + //Receive + buf.count++ + if ch.buf.Length() == 0 && !buf.cases[i+1].Chan.IsValid() { + //this channel now has something to send + buf.cases[i+1].Chan = reflect.ValueOf(ch.out) + buf.cases[i+1].Send = val + } else { + ch.buf.Add(val.Interface()) + } + if buf.count == int(buf.size) { + //buffer full, disable recv cases + for j := range buf.chans { + buf.cases[(j*2)+1].Chan = reflect.Value{} + } + } + } else { + //Close + buf.cases[i].Chan = reflect.Value{} + ch.closed = true + if ch.buf.Length() == 0 && !buf.cases[i+1].Chan.IsValid() { + //nothing pending, close the out channel right away + //TODO: shrink slice + close(ch.out) + } + } + } + } +} + +func (buf *SharedBuffer) Len() int { + return buf.count +} + +func (buf *SharedBuffer) Cap() BufferCap { + return buf.size +} diff --git a/vendor/github.com/eapache/channels/shared_buffer_test.go b/vendor/github.com/eapache/channels/shared_buffer_test.go new file mode 100644 index 0000000000..79a17b3108 --- /dev/null +++ b/vendor/github.com/eapache/channels/shared_buffer_test.go @@ -0,0 +1,127 @@ +package channels + +import "testing" + +func TestSharedBufferSingleton(t *testing.T) { + buf := NewSharedBuffer(3) + + ch := buf.NewChannel() + for i := 0; i < 5; i++ { + ch.In() <- (*int)(nil) + ch.In() <- (*int)(nil) + ch.In() <- (*int)(nil) + select { + case ch.In() <- (*int)(nil): + t.Error("Wrote to full shared-buffer") + default: + } + + <-ch.Out() + <-ch.Out() + <-ch.Out() + select { + case <-ch.Out(): + t.Error("Read from empty shared-buffer") + default: + } + } + + ch.Close() + buf.Close() +} + +func TestSharedBufferMultiple(t *testing.T) { + buf := NewSharedBuffer(3) + + ch1 := buf.NewChannel() + ch2 := buf.NewChannel() + + ch1.In() <- (*int)(nil) + ch1.In() <- (*int)(nil) + ch1.In() <- (*int)(nil) + + select { + case ch2.In() <- (*int)(nil): + t.Error("Wrote to full shared-buffer") + case <-ch2.Out(): + t.Error("Read from empty channel") + default: + } + + <-ch1.Out() + + for i := 0; i < 10; i++ { + ch2.In() <- (*int)(nil) + + select { + case ch1.In() <- (*int)(nil): + t.Error("Wrote to full shared-buffer") + case ch2.In() <- (*int)(nil): + t.Error("Wrote to full shared-buffer") + default: + } + + <-ch2.Out() + } + + <-ch1.Out() + <-ch1.Out() + + ch1.Close() + ch2.Close() + buf.Close() +} + +func TestSharedBufferConcurrent(t *testing.T) { + const threads = 10 + const iters = 200 + + buf := NewSharedBuffer(3) + done := make(chan bool) + + for i := 0; i < threads; i++ { + go func() { + ch := buf.NewChannel() + for i := 0; i < iters; i++ { + ch.In() <- i + val := <-ch.Out() + if val.(int) != i { + t.Error("Mismatched value out of channel") + } + } + ch.Close() + done <- true + }() + } + + for i := 0; i < threads; i++ { + <-done + } + close(done) + buf.Close() +} + +func ExampleSharedBuffer() { + // never more than 3 elements in the pipeline at once + buf := NewSharedBuffer(3) + + ch1 := buf.NewChannel() + ch2 := buf.NewChannel() + + // or, instead of a straight pipe, implement your pipeline step + Pipe(ch1, ch2) + + // inputs + go func() { + for i := 0; i < 20; i++ { + ch1.In() <- i + } + ch1.Close() + }() + + for _ = range ch2.Out() { + // outputs + } + + buf.Close() +} diff --git a/vendor/github.com/eapache/queue/.gitignore b/vendor/github.com/eapache/queue/.gitignore new file mode 100644 index 0000000000..836562412f --- /dev/null +++ b/vendor/github.com/eapache/queue/.gitignore @@ -0,0 +1,23 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test diff --git a/vendor/github.com/eapache/queue/.travis.yml b/vendor/github.com/eapache/queue/.travis.yml new file mode 100644 index 0000000000..235a40a493 --- /dev/null +++ b/vendor/github.com/eapache/queue/.travis.yml @@ -0,0 +1,7 @@ +language: go +sudo: false + +go: + - 1.2 + - 1.3 + - 1.4 diff --git a/vendor/github.com/eapache/queue/LICENSE b/vendor/github.com/eapache/queue/LICENSE new file mode 100644 index 0000000000..d5f36dbcaa --- /dev/null +++ b/vendor/github.com/eapache/queue/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/vendor/github.com/eapache/queue/README.md b/vendor/github.com/eapache/queue/README.md new file mode 100644 index 0000000000..8e782335cd --- /dev/null +++ b/vendor/github.com/eapache/queue/README.md @@ -0,0 +1,16 @@ +Queue +===== + +[![Build Status](https://travis-ci.org/eapache/queue.svg)](https://travis-ci.org/eapache/queue) +[![GoDoc](https://godoc.org/github.com/eapache/queue?status.png)](https://godoc.org/github.com/eapache/queue) +[![Code of Conduct](https://img.shields.io/badge/code%20of%20conduct-active-blue.svg)](https://eapache.github.io/conduct.html) + +A fast Golang queue using a ring-buffer, based on the version suggested by Dariusz Górecki. +Using this instead of other, simpler, queue implementations (slice+append or linked list) provides +substantial memory and time benefits, and fewer GC pauses. + +The queue implemented here is as fast as it is in part because it is *not* thread-safe. + +Follows semantic versioning using https://gopkg.in/ - import from +[`gopkg.in/eapache/queue.v1`](https://gopkg.in/eapache/queue.v1) +for guaranteed API stability. diff --git a/vendor/github.com/eapache/queue/queue.go b/vendor/github.com/eapache/queue/queue.go new file mode 100644 index 0000000000..71d1acdf27 --- /dev/null +++ b/vendor/github.com/eapache/queue/queue.go @@ -0,0 +1,102 @@ +/* +Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. +Using this instead of other, simpler, queue implementations (slice+append or linked list) provides +substantial memory and time benefits, and fewer GC pauses. + +The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe. +*/ +package queue + +// minQueueLen is smallest capacity that queue may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minQueueLen = 16 + +// Queue represents a single instance of the queue data structure. +type Queue struct { + buf []interface{} + head, tail, count int +} + +// New constructs and returns a new Queue. +func New() *Queue { + return &Queue{ + buf: make([]interface{}, minQueueLen), + } +} + +// Length returns the number of elements currently stored in the queue. +func (q *Queue) Length() int { + return q.count +} + +// resizes the queue to fit exactly twice its current contents +// this can result in shrinking if the queue is less than half-full +func (q *Queue) resize() { + newBuf := make([]interface{}, q.count<<1) + + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} + +// Add puts an element on the end of the queue. +func (q *Queue) Add(elem interface{}) { + if q.count == len(q.buf) { + q.resize() + } + + q.buf[q.tail] = elem + // bitwise modulus + q.tail = (q.tail + 1) & (len(q.buf) - 1) + q.count++ +} + +// Peek returns the element at the head of the queue. This call panics +// if the queue is empty. +func (q *Queue) Peek() interface{} { + if q.count <= 0 { + panic("queue: Peek() called on empty queue") + } + return q.buf[q.head] +} + +// Get returns the element at index i in the queue. If the index is +// invalid, the call will panic. This method accepts both positive and +// negative index values. Index 0 refers to the first element, and +// index -1 refers to the last. +func (q *Queue) Get(i int) interface{} { + // If indexing backwards, convert to positive index. + if i < 0 { + i += q.count + } + if i < 0 || i >= q.count { + panic("queue: Get() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Remove removes and returns the element from the front of the queue. If the +// queue is empty, the call will panic. +func (q *Queue) Remove() interface{} { + if q.count <= 0 { + panic("queue: Remove() called on empty queue") + } + ret := q.buf[q.head] + q.buf[q.head] = nil + // bitwise modulus + q.head = (q.head + 1) & (len(q.buf) - 1) + q.count-- + // Resize down if buffer 1/4 full. + if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) { + q.resize() + } + return ret +} diff --git a/vendor/github.com/eapache/queue/queue_test.go b/vendor/github.com/eapache/queue/queue_test.go new file mode 100644 index 0000000000..a87584883e --- /dev/null +++ b/vendor/github.com/eapache/queue/queue_test.go @@ -0,0 +1,178 @@ +package queue + +import "testing" + +func TestQueueSimple(t *testing.T) { + q := New() + + for i := 0; i < minQueueLen; i++ { + q.Add(i) + } + for i := 0; i < minQueueLen; i++ { + if q.Peek().(int) != i { + t.Error("peek", i, "had value", q.Peek()) + } + x := q.Remove() + if x != i { + t.Error("remove", i, "had value", x) + } + } +} + +func TestQueueWrapping(t *testing.T) { + q := New() + + for i := 0; i < minQueueLen; i++ { + q.Add(i) + } + for i := 0; i < 3; i++ { + q.Remove() + q.Add(minQueueLen + i) + } + + for i := 0; i < minQueueLen; i++ { + if q.Peek().(int) != i+3 { + t.Error("peek", i, "had value", q.Peek()) + } + q.Remove() + } +} + +func TestQueueLength(t *testing.T) { + q := New() + + if q.Length() != 0 { + t.Error("empty queue length not 0") + } + + for i := 0; i < 1000; i++ { + q.Add(i) + if q.Length() != i+1 { + t.Error("adding: queue with", i, "elements has length", q.Length()) + } + } + for i := 0; i < 1000; i++ { + q.Remove() + if q.Length() != 1000-i-1 { + t.Error("removing: queue with", 1000-i-i, "elements has length", q.Length()) + } + } +} + +func TestQueueGet(t *testing.T) { + q := New() + + for i := 0; i < 1000; i++ { + q.Add(i) + for j := 0; j < q.Length(); j++ { + if q.Get(j).(int) != j { + t.Errorf("index %d doesn't contain %d", j, j) + } + } + } +} + +func TestQueueGetNegative(t *testing.T) { + q := New() + + for i := 0; i < 1000; i++ { + q.Add(i) + for j := 1; j <= q.Length(); j++ { + if q.Get(-j).(int) != q.Length()-j { + t.Errorf("index %d doesn't contain %d", -j, q.Length()-j) + } + } + } +} + +func TestQueueGetOutOfRangePanics(t *testing.T) { + q := New() + + q.Add(1) + q.Add(2) + q.Add(3) + + assertPanics(t, "should panic when negative index", func() { + q.Get(-4) + }) + + assertPanics(t, "should panic when index greater than length", func() { + q.Get(4) + }) +} + +func TestQueuePeekOutOfRangePanics(t *testing.T) { + q := New() + + assertPanics(t, "should panic when peeking empty queue", func() { + q.Peek() + }) + + q.Add(1) + q.Remove() + + assertPanics(t, "should panic when peeking emptied queue", func() { + q.Peek() + }) +} + +func TestQueueRemoveOutOfRangePanics(t *testing.T) { + q := New() + + assertPanics(t, "should panic when removing empty queue", func() { + q.Remove() + }) + + q.Add(1) + q.Remove() + + assertPanics(t, "should panic when removing emptied queue", func() { + q.Remove() + }) +} + +func assertPanics(t *testing.T, name string, f func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("%s: didn't panic as expected", name) + } + }() + + f() +} + +// General warning: Go's benchmark utility (go test -bench .) increases the number of +// iterations until the benchmarks take a reasonable amount of time to run; memory usage +// is *NOT* considered. On my machine, these benchmarks hit around ~1GB before they've had +// enough, but if you have less than that available and start swapping, then all bets are off. + +func BenchmarkQueueSerial(b *testing.B) { + q := New() + for i := 0; i < b.N; i++ { + q.Add(nil) + } + for i := 0; i < b.N; i++ { + q.Peek() + q.Remove() + } +} + +func BenchmarkQueueGet(b *testing.B) { + q := New() + for i := 0; i < b.N; i++ { + q.Add(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Get(i) + } +} + +func BenchmarkQueueTickTock(b *testing.B) { + q := New() + for i := 0; i < b.N; i++ { + q.Add(nil) + q.Peek() + q.Remove() + } +}