Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/go kit logging #18

Merged
merged 2 commits into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions src/glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions src/glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,5 @@ package: .
homepage: https://github.com/Comcast/talaria
import:
- package: github.com/Comcast/webpa-common
version: 7696a5eab9625e508963a5a2e9343214060f9718
subpackages:
- device
- logging
- wrp
- concurrent
- server
- service
- package: github.com/gorilla/mux
version: v1.3.0
version: 9f0810dec1058c339282960f3e34d9d94f8dd6be

12 changes: 7 additions & 5 deletions src/talaria/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/go-kit/kit/log"
)

// outboundEnvelope is a tuple of information related to handling an asynchronous HTTP request
Expand All @@ -30,7 +31,7 @@ type Dispatcher interface {

// dispatcher is the internal Dispatcher implementation
type dispatcher struct {
logger logging.Logger
errorLog log.Logger
urlFilter URLFilter
method string
timeout time.Duration
Expand All @@ -51,8 +52,9 @@ func NewDispatcher(o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan *outb
}

outbounds := make(chan *outboundEnvelope, o.outboundQueueSize())
logger := o.logger()
return &dispatcher{
logger: o.logger(),
errorLog: logging.Error(logger),
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
Expand Down Expand Up @@ -150,15 +152,15 @@ func (d *dispatcher) OnDeviceEvent(event *device.Event) {
if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.logger.Error("Error dispatching event [%s]: %s", destination, err)
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.logger.Error("Error dispatching to [%s]: %s", destination, err)
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
}
} else {
d.logger.Error("Unable to route to [%s]", destination)
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
}
}
9 changes: 5 additions & 4 deletions src/talaria/outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/go-kit/kit/log"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -48,13 +49,13 @@ type Outbounder struct {
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost"`
IdleConnTimeout time.Duration `json:"idleConnTimeout"`
AuthKey []string `json:"authKey"`
Logger logging.Logger `json:"-"`
Logger log.Logger `json:"-"`
}

// NewOutbounder returns an Outbounder unmarshalled from a Viper environment.
// This function allows the Viper instance to be nil, in which case a default
// Outbounder is returned.
func NewOutbounder(logger logging.Logger, v *viper.Viper) (o *Outbounder, err error) {
func NewOutbounder(logger log.Logger, v *viper.Viper) (o *Outbounder, err error) {
o = &Outbounder{
Method: DefaultMethod,
RequestTimeout: DefaultRequestTimeout,
Expand Down Expand Up @@ -85,7 +86,7 @@ func (o *Outbounder) String() string {
}
}

func (o *Outbounder) logger() logging.Logger {
func (o *Outbounder) logger() log.Logger {
if o != nil && o.Logger != nil {
return o.Logger
}
Expand Down Expand Up @@ -198,7 +199,7 @@ func (o *Outbounder) clientTimeout() time.Duration {

// Start spawns all necessary goroutines and returns a device.Listener
func (o *Outbounder) Start() (device.Listener, error) {
o.logger().Info("Starting outbounder: %s", o)
logging.Info(o.logger()).Log(logging.MessageKey(), "Starting outbounder", "outbounder", o)
dispatcher, outbounds, err := NewDispatcher(o, nil)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions src/talaria/primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"net/http"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/go-kit/kit/log"
"github.com/gorilla/mux"
"github.com/spf13/viper"
)
Expand All @@ -17,7 +17,7 @@ const (
version = "v2"
)

func NewPrimaryHandler(logger logging.Logger, connectedUpdates <-chan []byte, manager device.Manager, v *viper.Viper) (http.Handler, error) {
func NewPrimaryHandler(logger log.Logger, connectedUpdates <-chan []byte, manager device.Manager, v *viper.Viper) (http.Handler, error) {
poolFactory, err := wrp.NewPoolFactory(v.Sub(wrp.ViperKey))
if err != nil {
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions src/talaria/talaria.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/server"
"github.com/Comcast/webpa-common/service"
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
Expand All @@ -28,7 +29,7 @@ const (
// startDeviceManagement handles the configuration and initialization of the device management subsystem
// for talaria. The returned HTTP handler can be used for device connections and messages, while the returned
// Manager can be used to route and administer the set of connected devices.
func startDeviceManagement(logger logging.Logger, h *health.Health, v *viper.Viper) (http.Handler, device.Manager, error) {
func startDeviceManagement(logger log.Logger, h *health.Health, v *viper.Viper) (http.Handler, device.Manager, error) {
deviceOptions, err := device.NewOptions(logger, v.Sub(device.DeviceManagerKey))
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -80,7 +81,10 @@ func talaria(arguments []string) int {
return 1
}

logger.Info("Using configuration file: %s", v.ConfigFileUsed())
var (
infoLog = logging.Info(logger)
errorLog = logging.Error(logger)
)

//
// Initialize the manager first, as if it fails we don't want to advertise this service
Expand Down Expand Up @@ -110,7 +114,7 @@ func talaria(arguments []string) int {
return 2
}

logger.Info("Service options: %#v", serviceOptions)
infoLog.Log("configurationFile", v.ConfigFileUsed(), "serviceOptions", serviceOptions)

var (
accessor = service.NewUpdatableAccessor(serviceOptions, nil)
Expand All @@ -122,14 +126,14 @@ func talaria(arguments []string) int {
manager.DisconnectIf(func(candidate device.ID) bool {
hashedEndpoint, err := accessor.Get(candidate.Bytes())
if err != nil {
logger.Error("Error while attempting to rehash device id %s: %s", candidate, err)
errorLog.Log(logging.MessageKey(), "Error while attempting to rehash device", "deviceID", candidate, logging.ErrorKey(), err)
return true
}

// disconnect if hashedEndpoint was NOT found in the endpoints that this talaria instance registered under
disconnect := !registeredEndpoints.Has(hashedEndpoint)
if disconnect {
logger.Info("Disconnecting %s due to service discovery rehash", candidate)
infoLog.Log(logging.MessageKey(), "Service discovery rehash", "deviceID", candidate)
}

return disconnect
Expand Down
5 changes: 3 additions & 2 deletions src/talaria/talaria.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
},

"log" : {
"file" : "console",
"level" : "DEBUG"
"file" : "stdout",
"level" : "DEBUG",
"json": true
}
}
17 changes: 11 additions & 6 deletions src/talaria/workerPool.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"github.com/Comcast/webpa-common/logging"
"io"
"io/ioutil"
"net/http"
"sync"

"github.com/Comcast/webpa-common/logging"
"github.com/go-kit/kit/log"
)

// NewTransactor returns a closure which can handle HTTP transactions.
Expand All @@ -25,7 +27,8 @@ func NewTransactor(o *Outbounder) func(*http.Request) (*http.Response, error) {
// WorkerPool describes a pool of goroutines that dispatch http.Request objects to
// a transactor function
type WorkerPool struct {
logger logging.Logger
errorLog log.Logger
debugLog log.Logger
outbounds <-chan *outboundEnvelope
workerPoolSize uint
transactor func(*http.Request) (*http.Response, error)
Expand All @@ -34,8 +37,10 @@ type WorkerPool struct {
}

func NewWorkerPool(o *Outbounder, outbounds <-chan *outboundEnvelope) *WorkerPool {
logger := o.logger()
return &WorkerPool{
logger: o.logger(),
errorLog: logging.Error(logger),
debugLog: logging.Debug(logger),
outbounds: outbounds,
workerPoolSize: o.workerPoolSize(),
transactor: NewTransactor(o),
Expand All @@ -59,14 +64,14 @@ func (wp *WorkerPool) transact(e *outboundEnvelope) {

response, err := wp.transactor(e.request)
if err != nil {
wp.logger.Error("HTTP error: %s", err)
wp.errorLog.Log(logging.MessageKey(), "HTTP transaction error", logging.ErrorKey(), err)
return
}

if response.StatusCode < 400 {
wp.logger.Debug("HTTP response status: %s, target: %s", response.Status, e.request.URL)
wp.debugLog.Log(logging.MessageKey(), "HTTP response", "status", response.Status, "url", e.request.URL)
} else {
wp.logger.Error("HTTP response status: %s, target: %s", response.Status, e.request.URL)
wp.errorLog.Log(logging.MessageKey(), "HTTP response", "status", response.Status, "url", e.request.URL)
}

io.Copy(ioutil.Discard, response.Body)
Expand Down