Skip to content

Commit

Permalink
Updated logging; updated sample configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
johnabass committed Aug 31, 2017
1 parent 8982b15 commit 0b5ed42
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 24 deletions.
12 changes: 7 additions & 5 deletions src/talaria/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/go-kit/kit/log"
)

// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request
Expand All @@ -30,7 +31,7 @@ type Dispatcher interface {

// dispatcher is the internal Dispatcher implementation
type dispatcher struct {
logger logging.Logger
errorLog log.Logger
urlFilter URLFilter
method string
timeout time.Duration
Expand All @@ -51,8 +52,9 @@ func NewDispatcher(o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan *outb
}

outbounds := make(chan *outboundEnvelope, o.outboundQueueSize())
logger := o.logger()
return &dispatcher{
logger: o.logger(),
errorLog: logging.Error(logger),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
Expand Down Expand Up @@ -150,15 +152,15 @@ func (d *dispatcher) OnDeviceEvent(event *device.Event) {
if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.logger.Error("Error dispatching event [%s]: %s", destination, err)
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.logger.Error("Error dispatching to [%s]: %s", destination, err)
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
}
} else {
d.logger.Error("Unable to route to [%s]", destination)
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
}
}
9 changes: 5 additions & 4 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/go-kit/kit/log"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -48,13 +49,13 @@ type Outbounder struct {
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"`
IdleConnTimeout time.Duration `json:"idleConnTimeout"`
AuthKey []string `json:"authKey"`
Logger logging.Logger `json:"-"`
Logger log.Logger `json:"-"`
}

// NewOutbounder returns an Outbounder unmarshalled from a Viper environment.
// This function allows the Viper instance to be nil, in which case a default
// Outbounder is returned.
func NewOutbounder(logger logging.Logger, v *viper.Viper) (o *Outbounder, err error) {
func NewOutbounder(logger log.Logger, v *viper.Viper) (o *Outbounder, err error) {
o = &Outbounder{
Method: DefaultMethod,
RequestTimeout: DefaultRequestTimeout,
Expand Down Expand Up @@ -85,7 +86,7 @@ func (o *Outbounder) String() string {
}
}

func (o *Outbounder) logger() logging.Logger {
func (o *Outbounder) logger() log.Logger {
if o != nil && o.Logger != nil {
return o.Logger
}
Expand Down Expand Up @@ -198,7 +199,7 @@ func (o *Outbounder) clientTimeout() time.Duration {

// Start spawns all necessary goroutines and returns a device.Listener
func (o *Outbounder) Start() (device.Listener, error) {
o.logger().Info("Starting outbounder: %s", o)
logging.Info(o.logger()).Log(logging.MessageKey(), "Starting outbounder", "outbounder", o)
dispatcher, outbounds, err := NewDispatcher(o, nil)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions src/talaria/primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"net/http"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/go-kit/kit/log"
"github.com/gorilla/mux"
"github.com/spf13/viper"
)
Expand All @@ -17,7 +17,7 @@ const (
version = "v2"
)

func NewPrimaryHandler(logger logging.Logger, connectedUpdates <-chan []byte, manager device.Manager, v *viper.Viper) (http.Handler, error) {
func NewPrimaryHandler(logger log.Logger, connectedUpdates <-chan []byte, manager device.Manager, v *viper.Viper) (http.Handler, error) {
poolFactory, err := wrp.NewPoolFactory(v.Sub(wrp.ViperKey))
if err != nil {
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions src/talaria/talaria.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/server"
"github.com/Comcast/webpa-common/service"
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
Expand All @@ -28,7 +29,7 @@ const (
// startDeviceManagement handles the configuration and initialization of the device management subsystem
// for talaria. The returned HTTP handler can be used for device connections and messages, while the returned
// Manager can be used to route and administer the set of connected devices.
func startDeviceManagement(logger logging.Logger, h *health.Health, v *viper.Viper) (http.Handler, device.Manager, error) {
func startDeviceManagement(logger log.Logger, h *health.Health, v *viper.Viper) (http.Handler, device.Manager, error) {
deviceOptions, err := device.NewOptions(logger, v.Sub(device.DeviceManagerKey))
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -80,7 +81,10 @@ func talaria(arguments []string) int {
return 1
}

logger.Info("Using configuration file: %s", v.ConfigFileUsed())
var (
infoLog = logging.Info(logger)
errorLog = logging.Error(logger)
)

//
// Initialize the manager first, as if it fails we don't want to advertise this service
Expand Down Expand Up @@ -110,7 +114,7 @@ func talaria(arguments []string) int {
return 2
}

logger.Info("Service options: %#v", serviceOptions)
infoLog.Log("configurationFile", v.ConfigFileUsed(), "serviceOptions", serviceOptions)

var (
accessor = service.NewUpdatableAccessor(serviceOptions, nil)
Expand All @@ -122,14 +126,14 @@ func talaria(arguments []string) int {
manager.DisconnectIf(func(candidate device.ID) bool {
hashedEndpoint, err := accessor.Get(candidate.Bytes())
if err != nil {
logger.Error("Error while attempting to rehash device id %s: %s", candidate, err)
errorLog.Log(logging.MessageKey(), "Error while attempting to rehash device", "deviceID", candidate, logging.ErrorKey(), err)
return true
}

// disconnect if hashedEndpoint was NOT found in the endpoints that this talaria instance registered under
disconnect := !registeredEndpoints.Has(hashedEndpoint)
if disconnect {
logger.Info("Disconnecting %s due to service discovery rehash", candidate)
infoLog.Log(logging.MessageKey(), "Service discovery rehash", "deviceID", candidate)
}

return disconnect
Expand Down
5 changes: 3 additions & 2 deletions src/talaria/talaria.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
},

"log" : {
"file" : "console",
"level" : "DEBUG"
"file" : "stdout",
"level" : "DEBUG",
"json": true
}
}
17 changes: 11 additions & 6 deletions src/talaria/workerPool.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"github.com/Comcast/webpa-common/logging"
"io"
"io/ioutil"
"net/http"
"sync"

"github.com/Comcast/webpa-common/logging"
"github.com/go-kit/kit/log"
)

// NewTransactor returns a closure which can handle HTTP transactions.
Expand All @@ -25,7 +27,8 @@ func NewTransactor(o *Outbounder) func(*http.Request) (*http.Response, error) {
// WorkerPool describes a pool of goroutines that dispatch http.Request objects to
// a transactor function
type WorkerPool struct {
logger logging.Logger
errorLog log.Logger
debugLog log.Logger
outbounds <-chan *outboundEnvelope
workerPoolSize uint
transactor func(*http.Request) (*http.Response, error)
Expand All @@ -34,8 +37,10 @@ type WorkerPool struct {
}

func NewWorkerPool(o *Outbounder, outbounds <-chan *outboundEnvelope) *WorkerPool {
logger := o.logger()
return &WorkerPool{
logger: o.logger(),
errorLog: logging.Error(logger),
debugLog: logging.Debug(logger),
outbounds: outbounds,
workerPoolSize: o.workerPoolSize(),
transactor: NewTransactor(o),
Expand All @@ -59,14 +64,14 @@ func (wp *WorkerPool) transact(e *outboundEnvelope) {

response, err := wp.transactor(e.request)
if err != nil {
wp.logger.Error("HTTP error: %s", err)
wp.errorLog.Log(logging.MessageKey(), "HTTP transaction error", logging.ErrorKey(), err)
return
}

if response.StatusCode < 400 {
wp.logger.Debug("HTTP response status: %s, target: %s", response.Status, e.request.URL)
wp.debugLog.Log(logging.MessageKey(), "HTTP response", "status", response.Status, "url", e.request.URL)
} else {
wp.logger.Error("HTTP response status: %s, target: %s", response.Status, e.request.URL)
wp.errorLog.Log(logging.MessageKey(), "HTTP response", "status", response.Status, "url", e.request.URL)
}

io.Copy(ioutil.Discard, response.Body)
Expand Down

0 comments on commit 0b5ed42

Please sign in to comment.