diff --git a/src/talaria/dispatcher.go b/src/talaria/dispatcher.go index 25d7d54b..b9dc4a61 100644 --- a/src/talaria/dispatcher.go +++ b/src/talaria/dispatcher.go @@ -4,11 +4,12 @@ import ( "bytes" "context" "fmt" - "github.com/Comcast/webpa-common/device" - "github.com/Comcast/webpa-common/logging" "net/http" "strings" "time" + + "github.com/Comcast/webpa-common/device" + "github.com/Comcast/webpa-common/logging" ) // outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request @@ -27,13 +28,12 @@ type Dispatcher interface { // dispatcher is the internal Dispatcher implementation type dispatcher struct { - logger logging.Logger - urlFilter URLFilter - method string - timeout time.Duration - defaultEventEndpoints []string - eventEndpoints map[string][]string - outbounds chan<- *outboundEnvelope + logger logging.Logger + urlFilter URLFilter + method string + timeout time.Duration + eventEndpoints map[string][]string + outbounds chan<- *outboundEnvelope } // NewDispatcher constructs a Dispatcher which sends envelopes via the returned channel. @@ -49,13 +49,12 @@ func NewDispatcher(o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan *outb outbounds := make(chan *outboundEnvelope, o.outboundQueueSize()) return &dispatcher{ - logger: o.logger(), - urlFilter: urlFilter, - method: o.method(), - timeout: o.requestTimeout(), - defaultEventEndpoints: o.defaultEventEndpoints(), - eventEndpoints: o.eventEndpoints(), - outbounds: outbounds, + logger: o.logger(), + urlFilter: urlFilter, + method: o.method(), + timeout: o.requestTimeout(), + eventEndpoints: o.eventEndpoints(), + outbounds: outbounds, }, outbounds, nil } @@ -81,7 +80,7 @@ func (d *dispatcher) send(request *http.Request) error { func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byte) error { endpoints := d.eventEndpoints[eventType] if len(endpoints) == 0 { - endpoints = d.defaultEventEndpoints + endpoints = d.eventEndpoints[DefaultEventType] } if len(endpoints) == 0 { diff --git a/src/talaria/dispatcher_test.go b/src/talaria/dispatcher_test.go index fdbef607..de32c973 100644 --- a/src/talaria/dispatcher_test.go +++ b/src/talaria/dispatcher_test.go @@ -2,13 +2,14 @@ package main import ( "errors" + "io/ioutil" + "testing" + "time" + "github.com/Comcast/webpa-common/device" "github.com/Comcast/webpa-common/wrp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "io/ioutil" - "testing" - "time" ) func testDispatcherIgnoredEvent(t *testing.T) { @@ -86,22 +87,22 @@ func testDispatcherOnDeviceEventDispatchEvent(t *testing.T) { expectedEndpoints: map[string]bool{}, }, { - outbounder: &Outbounder{DefaultEventEndpoints: []string{"http://endpoint1.com"}}, + outbounder: &Outbounder{EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com"}}}, destination: "event:iot", expectedEndpoints: map[string]bool{"http://endpoint1.com": true}, }, { - outbounder: &Outbounder{Method: "PATCH", DefaultEventEndpoints: []string{"http://endpoint1.com"}}, + outbounder: &Outbounder{Method: "PATCH", EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com"}}}, destination: "event:iot", expectedEndpoints: map[string]bool{"http://endpoint1.com": true}, }, { - outbounder: &Outbounder{DefaultEventEndpoints: []string{"http://endpoint1.com", "http://endpoint2.com"}}, + outbounder: &Outbounder{EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com", "http://endpoint2.com"}}}, destination: "event:iot", expectedEndpoints: map[string]bool{"http://endpoint1.com": true, "http://endpoint2.com": true}, }, { - outbounder: &Outbounder{Method: "PATCH", DefaultEventEndpoints: []string{"http://endpoint1.com", "http://endpoint2.com"}}, + outbounder: &Outbounder{Method: "PATCH", EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com", "http://endpoint2.com"}}}, destination: "event:iot", expectedEndpoints: map[string]bool{"http://endpoint1.com": true, "http://endpoint2.com": true}, }, @@ -182,8 +183,8 @@ func testDispatcherOnDeviceEventEventTimeout(t *testing.T) { var ( require = require.New(t) outbounder = &Outbounder{ - RequestTimeout: 100 * time.Millisecond, - DefaultEventEndpoints: []string{"nowhere.com"}, + RequestTimeout: 100 * time.Millisecond, + EventEndpoints: map[string][]string{"default": []string{"nowhere.com"}}, } d, _, err = NewDispatcher(outbounder, nil) diff --git a/src/talaria/outbounder.go b/src/talaria/outbounder.go index e3b52c23..bf51273d 100644 --- a/src/talaria/outbounder.go +++ b/src/talaria/outbounder.go @@ -2,10 +2,11 @@ package main import ( "encoding/json" + "time" + "github.com/Comcast/webpa-common/device" "github.com/Comcast/webpa-common/logging" "github.com/spf13/viper" - "time" ) const ( @@ -21,6 +22,7 @@ const ( DefaultDefaultScheme = "https" DefaultAllowedScheme = "https" + DefaultEventType = "default" DefaultMethod = "POST" DefaultWorkerPoolSize uint = 100 DefaultOutboundQueueSize uint = 1000 @@ -34,19 +36,18 @@ const ( // Outbounder encapsulates the configuration necessary for handling outbound traffic // and grants the ability to start the outbounding infrastructure. type Outbounder struct { - Method string `json:"method"` - RequestTimeout time.Duration `json:"requestTimeout"` - DefaultScheme string `json:"defaultScheme"` - AllowedSchemes []string `json:"allowedSchemes"` - DefaultEventEndpoints []string `json:"defaultEventEndpoints"` - EventEndpoints map[string][]string `json:"eventEndpoints"` - OutboundQueueSize uint `json:"outboundQueueSize"` - WorkerPoolSize uint `json:"workerPoolSize"` - ClientTimeout time.Duration `json:"clientTimeout"` - MaxIdleConns int `json:"maxIdleConns"` - MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"` - IdleConnTimeout time.Duration `json:"idleConnTimeout"` - Logger logging.Logger `json:"-"` + Method string `json:"method"` + RequestTimeout time.Duration `json:"requestTimeout"` + DefaultScheme string `json:"defaultScheme"` + AllowedSchemes []string `json:"allowedSchemes"` + EventEndpoints map[string][]string `json:"eventEndpoints"` + OutboundQueueSize uint `json:"outboundQueueSize"` + WorkerPoolSize uint `json:"workerPoolSize"` + ClientTimeout time.Duration `json:"clientTimeout"` + MaxIdleConns int `json:"maxIdleConns"` + MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"` + IdleConnTimeout time.Duration `json:"idleConnTimeout"` + Logger logging.Logger `json:"-"` } // NewOutbounder returns an Outbounder unmarshalled from a Viper environment. @@ -128,16 +129,6 @@ func (o *Outbounder) allowedSchemes() map[string]bool { return map[string]bool{DefaultAllowedScheme: true} } -func (o *Outbounder) defaultEventEndpoints() []string { - if o != nil && len(o.DefaultEventEndpoints) > 0 { - return o.DefaultEventEndpoints - } - - // no reason not to return nil here, as client code - // only iterates over this slice and gets the length - return nil -} - func (o *Outbounder) eventEndpoints() map[string][]string { if o != nil { return o.EventEndpoints diff --git a/src/talaria/outbounder_test.go b/src/talaria/outbounder_test.go index 6ad0d4b9..6b3e6a2b 100644 --- a/src/talaria/outbounder_test.go +++ b/src/talaria/outbounder_test.go @@ -3,18 +3,19 @@ package main import ( "bytes" "fmt" - "github.com/Comcast/webpa-common/device" - "github.com/Comcast/webpa-common/logging" - "github.com/Comcast/webpa-common/wrp" - "github.com/spf13/viper" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "io/ioutil" "net/http" "net/http/httptest" "sync" "testing" "time" + + "github.com/Comcast/webpa-common/device" + "github.com/Comcast/webpa-common/logging" + "github.com/Comcast/webpa-common/wrp" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func ExampleOutbounder() { @@ -38,7 +39,7 @@ func ExampleOutbounder() { `{ "defaultScheme": "http", "allowedSchemes": ["http", "https"], - "defaultEventEndpoints": ["%s"], + "eventEndpoints": {"default": ["%s"]}, "workerPoolSize": 1 }`, server.URL, @@ -105,7 +106,6 @@ func testOutbounderDefaults(t *testing.T) { assert.Equal(DefaultRequestTimeout, o.requestTimeout()) assert.Equal(DefaultDefaultScheme, o.defaultScheme()) assert.Equal(map[string]bool{DefaultAllowedScheme: true}, o.allowedSchemes()) - assert.Nil(o.defaultEventEndpoints()) assert.Empty(o.eventEndpoints()) assert.Equal(DefaultOutboundQueueSize, o.outboundQueueSize()) assert.Equal(DefaultWorkerPoolSize, o.workerPoolSize()) @@ -154,7 +154,6 @@ func testOutbounderConfiguration(t *testing.T) { assert.Equal(30*time.Second, o.requestTimeout()) assert.Equal("ftp", o.defaultScheme()) assert.Equal(map[string]bool{"nntp": true, "ftp": true}, o.allowedSchemes()) - assert.Equal([]string{"https://default.endpoint.com"}, o.defaultEventEndpoints()) assert.Equal( map[string][]string{ "iot": []string{"https://endpoint1.com", "https://endpoint2.com"},