Skip to content

Commit

Permalink
Merge pull request #10 from Comcast/feature/merge_default_events
Browse files Browse the repository at this point in the history
Removed DefaultEventEndpoints; Get the default endpoints from eventEn…
  • Loading branch information
johnabass authored Jun 28, 2017
2 parents 0fba71e + a9b42cf commit fa92060
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 59 deletions.
33 changes: 16 additions & 17 deletions src/talaria/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bytes"
"context"
"fmt"
"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"net/http"
"strings"
"time"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
)

// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request
Expand All @@ -27,13 +28,12 @@ type Dispatcher interface {

// dispatcher is the internal Dispatcher implementation
type dispatcher struct {
logger logging.Logger
urlFilter URLFilter
method string
timeout time.Duration
defaultEventEndpoints []string
eventEndpoints map[string][]string
outbounds chan<- *outboundEnvelope
logger logging.Logger
urlFilter URLFilter
method string
timeout time.Duration
eventEndpoints map[string][]string
outbounds chan<- *outboundEnvelope
}

// NewDispatcher constructs a Dispatcher which sends envelopes via the returned channel.
Expand All @@ -49,13 +49,12 @@ func NewDispatcher(o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan *outb

outbounds := make(chan *outboundEnvelope, o.outboundQueueSize())
return &dispatcher{
logger: o.logger(),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
defaultEventEndpoints: o.defaultEventEndpoints(),
eventEndpoints: o.eventEndpoints(),
outbounds: outbounds,
logger: o.logger(),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
eventEndpoints: o.eventEndpoints(),
outbounds: outbounds,
}, outbounds, nil
}

Expand All @@ -81,7 +80,7 @@ func (d *dispatcher) send(request *http.Request) error {
func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byte) error {
endpoints := d.eventEndpoints[eventType]
if len(endpoints) == 0 {
endpoints = d.defaultEventEndpoints
endpoints = d.eventEndpoints[DefaultEventType]
}

if len(endpoints) == 0 {
Expand Down
19 changes: 10 additions & 9 deletions src/talaria/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package main

import (
"errors"
"io/ioutil"
"testing"
"time"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/wrp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
"testing"
"time"
)

func testDispatcherIgnoredEvent(t *testing.T) {
Expand Down Expand Up @@ -86,22 +87,22 @@ func testDispatcherOnDeviceEventDispatchEvent(t *testing.T) {
expectedEndpoints: map[string]bool{},
},
{
outbounder: &Outbounder{DefaultEventEndpoints: []string{"http://endpoint1.com"}},
outbounder: &Outbounder{EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com"}}},
destination: "event:iot",
expectedEndpoints: map[string]bool{"http://endpoint1.com": true},
},
{
outbounder: &Outbounder{Method: "PATCH", DefaultEventEndpoints: []string{"http://endpoint1.com"}},
outbounder: &Outbounder{Method: "PATCH", EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com"}}},
destination: "event:iot",
expectedEndpoints: map[string]bool{"http://endpoint1.com": true},
},
{
outbounder: &Outbounder{DefaultEventEndpoints: []string{"http://endpoint1.com", "http://endpoint2.com"}},
outbounder: &Outbounder{EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com", "http://endpoint2.com"}}},
destination: "event:iot",
expectedEndpoints: map[string]bool{"http://endpoint1.com": true, "http://endpoint2.com": true},
},
{
outbounder: &Outbounder{Method: "PATCH", DefaultEventEndpoints: []string{"http://endpoint1.com", "http://endpoint2.com"}},
outbounder: &Outbounder{Method: "PATCH", EventEndpoints: map[string][]string{"default": []string{"http://endpoint1.com", "http://endpoint2.com"}}},
destination: "event:iot",
expectedEndpoints: map[string]bool{"http://endpoint1.com": true, "http://endpoint2.com": true},
},
Expand Down Expand Up @@ -182,8 +183,8 @@ func testDispatcherOnDeviceEventEventTimeout(t *testing.T) {
var (
require = require.New(t)
outbounder = &Outbounder{
RequestTimeout: 100 * time.Millisecond,
DefaultEventEndpoints: []string{"nowhere.com"},
RequestTimeout: 100 * time.Millisecond,
EventEndpoints: map[string][]string{"default": []string{"nowhere.com"}},
}

d, _, err = NewDispatcher(outbounder, nil)
Expand Down
39 changes: 15 additions & 24 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package main

import (
"encoding/json"
"time"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/spf13/viper"
"time"
)

const (
Expand All @@ -21,6 +22,7 @@ const (
DefaultDefaultScheme = "https"
DefaultAllowedScheme = "https"

DefaultEventType = "default"
DefaultMethod = "POST"
DefaultWorkerPoolSize uint = 100
DefaultOutboundQueueSize uint = 1000
Expand All @@ -34,19 +36,18 @@ const (
// Outbounder encapsulates the configuration necessary for handling outbound traffic
// and grants the ability to start the outbounding infrastructure.
type Outbounder struct {
Method string `json:"method"`
RequestTimeout time.Duration `json:"requestTimeout"`
DefaultScheme string `json:"defaultScheme"`
AllowedSchemes []string `json:"allowedSchemes"`
DefaultEventEndpoints []string `json:"defaultEventEndpoints"`
EventEndpoints map[string][]string `json:"eventEndpoints"`
OutboundQueueSize uint `json:"outboundQueueSize"`
WorkerPoolSize uint `json:"workerPoolSize"`
ClientTimeout time.Duration `json:"clientTimeout"`
MaxIdleConns int `json:"maxIdleConns"`
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"`
IdleConnTimeout time.Duration `json:"idleConnTimeout"`
Logger logging.Logger `json:"-"`
Method string `json:"method"`
RequestTimeout time.Duration `json:"requestTimeout"`
DefaultScheme string `json:"defaultScheme"`
AllowedSchemes []string `json:"allowedSchemes"`
EventEndpoints map[string][]string `json:"eventEndpoints"`
OutboundQueueSize uint `json:"outboundQueueSize"`
WorkerPoolSize uint `json:"workerPoolSize"`
ClientTimeout time.Duration `json:"clientTimeout"`
MaxIdleConns int `json:"maxIdleConns"`
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"`
IdleConnTimeout time.Duration `json:"idleConnTimeout"`
Logger logging.Logger `json:"-"`
}

// NewOutbounder returns an Outbounder unmarshalled from a Viper environment.
Expand Down Expand Up @@ -128,16 +129,6 @@ func (o *Outbounder) allowedSchemes() map[string]bool {
return map[string]bool{DefaultAllowedScheme: true}
}

func (o *Outbounder) defaultEventEndpoints() []string {
if o != nil && len(o.DefaultEventEndpoints) > 0 {
return o.DefaultEventEndpoints
}

// no reason not to return nil here, as client code
// only iterates over this slice and gets the length
return nil
}

func (o *Outbounder) eventEndpoints() map[string][]string {
if o != nil {
return o.EventEndpoints
Expand Down
17 changes: 8 additions & 9 deletions src/talaria/outbounder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package main
import (
"bytes"
"fmt"
"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func ExampleOutbounder() {
Expand All @@ -38,7 +39,7 @@ func ExampleOutbounder() {
`{
"defaultScheme": "http",
"allowedSchemes": ["http", "https"],
"defaultEventEndpoints": ["%s"],
"eventEndpoints": {"default": ["%s"]},
"workerPoolSize": 1
}`,
server.URL,
Expand Down Expand Up @@ -105,7 +106,6 @@ func testOutbounderDefaults(t *testing.T) {
assert.Equal(DefaultRequestTimeout, o.requestTimeout())
assert.Equal(DefaultDefaultScheme, o.defaultScheme())
assert.Equal(map[string]bool{DefaultAllowedScheme: true}, o.allowedSchemes())
assert.Nil(o.defaultEventEndpoints())
assert.Empty(o.eventEndpoints())
assert.Equal(DefaultOutboundQueueSize, o.outboundQueueSize())
assert.Equal(DefaultWorkerPoolSize, o.workerPoolSize())
Expand Down Expand Up @@ -154,7 +154,6 @@ func testOutbounderConfiguration(t *testing.T) {
assert.Equal(30*time.Second, o.requestTimeout())
assert.Equal("ftp", o.defaultScheme())
assert.Equal(map[string]bool{"nntp": true, "ftp": true}, o.allowedSchemes())
assert.Equal([]string{"https://default.endpoint.com"}, o.defaultEventEndpoints())
assert.Equal(
map[string][]string{
"iot": []string{"https://endpoint1.com", "https://endpoint2.com"},
Expand Down

0 comments on commit fa92060

Please sign in to comment.