Skip to content

Commit

Permalink
use app.logf() for internal packages for nsqd nsqlookupd nsqadmin
Browse files Browse the repository at this point in the history
Instead of setting a Logger for github.com/nsqio/nsq/internal packages,
pass a logf() function, so it is called with and honors a LogLevel.
 * internal/clusterinfo/
 * internal/http_api/
 * internal/protocol/

nsqd lookupPeer also needed to be converted

Get rid of interal.app.Logger type, but internal/test/ needs
its own Logger definition to avoid circular import with
internal/lg/ tests.
  • Loading branch information
ploxiln committed May 27, 2017
1 parent 2b4a610 commit c2c139d
Show file tree
Hide file tree
Showing 15 changed files with 64 additions and 70 deletions.
5 changes: 0 additions & 5 deletions internal/app/logger.go

This file was deleted.

14 changes: 5 additions & 9 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/blang/semver"
"github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/stringy"
)

Expand All @@ -33,27 +34,22 @@ func (l ErrList) Errors() []error {
return l
}

type logger interface {
Output(maxdepth int, s string) error
}

type ClusterInfo struct {
log logger
log lg.AppLogFunc
client *http_api.Client
}

func New(log logger, client *http_api.Client) *ClusterInfo {
func New(log lg.AppLogFunc, client *http_api.Client) *ClusterInfo {
return &ClusterInfo{
log: log,
client: client,
}
}

func (c *ClusterInfo) logf(f string, args ...interface{}) {
if c.log == nil {
return
if c.log != nil {
c.log(lg.INFO, f, args...)
}
c.log.Output(2, fmt.Sprintf(f, args...))
}

// GetVersion returns a semver.Version object by querying /info
Expand Down
22 changes: 11 additions & 11 deletions internal/http_api/api_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/julienschmidt/httprouter"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
)

type Decorator func(APIHandler) APIHandler
Expand Down Expand Up @@ -112,7 +112,7 @@ func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
}
}

func Log(l app.Logger) Decorator {
func Log(logf lg.AppLogFunc) Decorator {
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
start := time.Now()
Expand All @@ -122,34 +122,34 @@ func Log(l app.Logger) Decorator {
if e, ok := err.(Err); ok {
status = e.Code
}
l.Output(2, fmt.Sprintf("%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed))
logf(lg.INFO, "%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
return response, err
}
}
}

func LogPanicHandler(l app.Logger) func(w http.ResponseWriter, req *http.Request, p interface{}) {
func LogPanicHandler(logf lg.AppLogFunc) func(w http.ResponseWriter, req *http.Request, p interface{}) {
return func(w http.ResponseWriter, req *http.Request, p interface{}) {
l.Output(2, fmt.Sprintf("ERROR: panic in HTTP handler - %s", p))
logf(lg.ERROR, "panic in HTTP handler - %s", p)
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{500, "INTERNAL_ERROR"}
}, Log(l), V1)(w, req, nil)
}, Log(logf), V1)(w, req, nil)
}
}

func LogNotFoundHandler(l app.Logger) http.Handler {
func LogNotFoundHandler(logf lg.AppLogFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{404, "NOT_FOUND"}
}, Log(l), V1)(w, req, nil)
}, Log(logf), V1)(w, req, nil)
})
}

func LogMethodNotAllowedHandler(l app.Logger) http.Handler {
func LogMethodNotAllowedHandler(logf lg.AppLogFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{405, "METHOD_NOT_ALLOWED"}
}, Log(l), V1)(w, req, nil)
}, Log(logf), V1)(w, req, nil)
})
}
17 changes: 8 additions & 9 deletions internal/http_api/http_server.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@
package http_api

import (
"fmt"
"log"
"net"
"net/http"
"strings"

"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
)

type logWriter struct {
app.Logger
logf lg.AppLogFunc
}

func (l logWriter) Write(p []byte) (int, error) {
l.Logger.Output(2, string(p))
l.logf(lg.WARN, "%s", string(p))
return len(p), nil
}

func Serve(listener net.Listener, handler http.Handler, proto string, l app.Logger) {
l.Output(2, fmt.Sprintf("%s: listening on %s", proto, listener.Addr()))
func Serve(listener net.Listener, handler http.Handler, proto string, logf lg.AppLogFunc) {
logf(lg.INFO, "%s: listening on %s", proto, listener.Addr())

server := &http.Server{
Handler: handler,
ErrorLog: log.New(logWriter{l}, "", 0),
ErrorLog: log.New(logWriter{logf}, "", 0),
}
err := server.Serve(listener)
// theres no direct way to detect this error because it is not exposed
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: http.Serve() - %s", err))
logf(lg.ERROR, "http.Serve() - %s", err)
}

l.Output(2, fmt.Sprintf("%s: closing %s", proto, listener.Addr()))
logf(lg.INFO, "%s: closing %s", proto, listener.Addr())
}
2 changes: 2 additions & 0 deletions internal/lg/lg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
FATAL = LogLevel(5)
)

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})

type Logger interface {
Output(maxdepth int, s string) error
}
Expand Down
13 changes: 6 additions & 7 deletions internal/protocol/tcp_server.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
package protocol

import (
"fmt"
"net"
"runtime"
"strings"

"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
)

type TCPHandler interface {
Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
logf(lg.ERROR, "listener.Accept() - %s", err)
}
break
}
go handler.Handle(clientConn)
}

l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
logf(lg.INFO, "TCP: closing %s", listener.Addr())
}
8 changes: 4 additions & 4 deletions internal/test/logger.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package test

import (
"github.com/nsqio/nsq/internal/app"
)
type Logger interface {
Output(maxdepth int, s string) error
}

type tbLog interface {
Log(...interface{})
Expand All @@ -17,6 +17,6 @@ func (tl *testLogger) Output(maxdepth int, s string) error {
return nil
}

func NewTestLogger(tbl tbLog) app.Logger {
func NewTestLogger(tbl tbLog) Logger {
return &testLogger{tbl}
}
10 changes: 5 additions & 5 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ type httpServer struct {
}

func NewHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqadmin.getOpts().Logger)
log := http_api.Log(ctx.nsqadmin.logf)

client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig, ctx.nsqadmin.getOpts().HTTPClientConnectTimeout,
ctx.nsqadmin.getOpts().HTTPClientRequestTimeout)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.getOpts().Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.getOpts().Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.getOpts().Logger)
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.logf)
s := &httpServer{
ctx: ctx,
router: router,
client: client,
ci: clusterinfo.New(ctx.nsqadmin.getOpts().Logger, client),
ci: clusterinfo.New(ctx.nsqadmin.logf, client),
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/nsqadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (n *NSQAdmin) Main() {
n.Unlock()
httpServer := NewHTTPServer(&Context{n})
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.getOpts().Logger)
http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf)
})
n.waitGroup.Wrap(func() { n.handleAdminActions() })
}
Expand Down
8 changes: 4 additions & 4 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ type httpServer struct {
}

func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
log := http_api.Log(ctx.nsqd.getOpts().Logger)
log := http_api.Log(ctx.nsqd.logf)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.getOpts().Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.getOpts().Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.getOpts().Logger)
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
Expand Down
2 changes: 1 addition & 1 deletion nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (n *NSQD) lookupLoop() {
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger,
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname, syncTopicChan))
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
Expand Down
9 changes: 5 additions & 4 deletions nsqd/lookup_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/lg"
)

// lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
Expand All @@ -16,7 +17,7 @@ import (
// gracefully (i.e. it is all handled by the library). Clients can simply use the
// Command interface to perform a round-trip.
type lookupPeer struct {
l Logger
logf lg.AppLogFunc
addr string
conn net.Conn
state int32
Expand All @@ -36,9 +37,9 @@ type peerInfo struct {
// newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
//
// The supplied connectCallback will be called *every* time the instance connects.
func newLookupPeer(addr string, maxBodySize int64, l Logger, connectCallback func(*lookupPeer)) *lookupPeer {
func newLookupPeer(addr string, maxBodySize int64, l lg.AppLogFunc, connectCallback func(*lookupPeer)) *lookupPeer {
return &lookupPeer{
l: l,
logf: l,
addr: addr,
state: stateDisconnected,
maxBodySize: maxBodySize,
Expand All @@ -48,7 +49,7 @@ func newLookupPeer(addr string, maxBodySize int64, l Logger, connectCallback fun

// Connect will Dial the specified address, with timeouts
func (lp *lookupPeer) Connect() error {
lp.l.Output(2, fmt.Sprintf("LOOKUP connecting to %s", lp.addr))
lp.logf(lg.INFO, "LOOKUP connecting to %s", lp.addr)
conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
if err != nil {
return err
Expand Down
10 changes: 6 additions & 4 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ func New(opts *Options) *NSQD {
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
ci: clusterinfo.New(opts.Logger, http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)),
dl: dirlock.New(dataPath),
}
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
n.ci = clusterinfo.New(n.logf, httpcli)

n.swapOpts(opts)
n.errValue.Store(errStore{})

Expand Down Expand Up @@ -226,7 +228,7 @@ func (n *NSQD) Main() {
n.Unlock()
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})

if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
Expand All @@ -240,7 +242,7 @@ func (n *NSQD) Main() {
n.Unlock()
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
Expand All @@ -253,7 +255,7 @@ func (n *NSQD) Main() {
n.Unlock()
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})

n.waitGroup.Wrap(func() { n.queueScanLoop() })
Expand Down
8 changes: 4 additions & 4 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type httpServer struct {
}

func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.opts.Logger)
log := http_api.Log(ctx.nsqlookupd.logf)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.opts.Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.opts.Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.opts.Logger)
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
s := &httpServer{
ctx: ctx,
router: router,
Expand Down
Loading

0 comments on commit c2c139d

Please sign in to comment.