Skip to content

Commit

Permalink
Merge pull request #610 from mreiferson/http_logs_610
Browse files Browse the repository at this point in the history
nsq*: log HTTP requests
  • Loading branch information
jehiah committed Jul 28, 2015

Verified

This commit was signed with the committer’s verified signature. The key has expired.
nowseemee Daniel Beck
2 parents 44bf385 + 0381117 commit df41bf9
Showing 5 changed files with 297 additions and 222 deletions.
87 changes: 81 additions & 6 deletions internal/http_api/api_response.go
Original file line number Diff line number Diff line change
@@ -3,12 +3,17 @@ package http_api
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/bitly/nsq/internal/app"
"github.com/julienschmidt/httprouter"
)

type Decorator func(APIHandler) APIHandler

type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)

type Err struct {
@@ -28,8 +33,24 @@ func acceptVersion(req *http.Request) int {
return 0
}

func NegotiateVersion(f APIHandler) httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func PlainText(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
code := 200
data, err := f(w, req, ps)
if err != nil {
code = err.(Err).Code
data = err.Error()
}
response := data.(string)
w.Header().Set("Content-Length", strconv.Itoa(len(response)))
w.WriteHeader(code)
io.WriteString(w, response)
return nil, nil
}
}

func NegotiateVersion(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
data, err := f(w, req, ps)
if err != nil {
if acceptVersion(req) == 1 {
@@ -38,24 +59,26 @@ func NegotiateVersion(f APIHandler) httprouter.Handle {
// this handler always returns 500 for backwards compatibility
Respond(w, 500, err.Error(), nil)
}
return
return nil, nil
}
if acceptVersion(req) == 1 {
RespondV1(w, 200, data)
} else {
Respond(w, 200, "OK", data)
}
return nil, nil
}
}

func V1(f APIHandler) httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func V1(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
data, err := f(w, req, ps)
if err != nil {
RespondV1(w, err.(Err).Code, err)
return
return nil, nil
}
RespondV1(w, 200, data)
return nil, nil
}
}

@@ -125,3 +148,55 @@ func RespondV1(w http.ResponseWriter, code int, data interface{}) {
w.WriteHeader(code)
w.Write(response)
}

func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}

func Log(l app.Logger) Decorator {
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
start := time.Now()
response, err := f(w, req, ps)
elapsed := time.Since(start)
status := 200
if e, ok := err.(Err); ok {
status = e.Code
}
l.Output(2, fmt.Sprintf("%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed))
return response, err
}
}
}

func LogPanicHandler(l app.Logger) func(w http.ResponseWriter, req *http.Request, p interface{}) {
return func(w http.ResponseWriter, req *http.Request, p interface{}) {
l.Output(2, fmt.Sprintf("ERROR: panic in HTTP handler - %s", p))
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{500, "INTERNAL_ERROR"}
}, Log(l), V1)(w, req, nil)
}
}

func LogNotFoundHandler(l app.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{404, "NOT_FOUND"}
}, Log(l), V1)(w, req, nil)
})
}

func LogMethodNotAllowedHandler(l app.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Decorate(func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return nil, Err{405, "METHOD_NOT_ALLOWED"}
}, Log(l), V1)(w, req, nil)
})
}
259 changes: 114 additions & 145 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package nsqadmin
import (
"fmt"
"html/template"
"io"
"io/ioutil"
"net"
"net/http"
@@ -74,38 +73,43 @@ func NewHTTPServer(ctx *Context) *httpServer {
proxy = NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second)
}

log := http_api.Log(ctx.nsqadmin.opts.Logger)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.opts.Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.opts.Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.opts.Logger)
s := &httpServer{
ctx: ctx,
counters: make(map[string]map[string]int64),
proxy: proxy,
router: router,
}

router.Handle("GET", "/ping", s.pingHandler)

router.Handle("GET", "/", s.indexHandler)
router.Handle("GET", "/nodes", s.nodesHandler)
router.Handle("GET", "/node/:node", s.nodeHandler)
router.Handle("GET", "/topic/:topic", s.topicHandler)
router.Handle("GET", "/topic/:topic/:channel", s.channelHandler)
router.Handle("GET", "/static/:asset", s.embeddedAssetHandler)
router.Handle("GET", "/counter", s.counterHandler)
router.Handle("GET", "/counter/data", s.counterDataHandler)
router.Handle("GET", "/lookup", s.lookupHandler)
router.Handle("GET", "/graphite_data", s.graphiteDataHandler)

router.Handle("POST", "/tombstone_topic_producer", s.tombstoneTopicProducerHandler)
router.Handle("POST", "/empty_topic", s.emptyTopicHandler)
router.Handle("POST", "/delete_topic", s.deleteTopicHandler)
router.Handle("POST", "/pause_topic", s.pauseTopicHandler)
router.Handle("POST", "/unpause_topic", s.pauseTopicHandler)
router.Handle("POST", "/empty_channel", s.emptyChannelHandler)
router.Handle("POST", "/delete_channel", s.deleteChannelHandler)
router.Handle("POST", "/pause_channel", s.pauseChannelHandler)
router.Handle("POST", "/unpause_channel", s.pauseChannelHandler)
router.Handle("POST", "/create_topic_channel", s.createTopicChannelHandler)
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

router.Handle("GET", "/", http_api.Decorate(s.indexHandler, log))
router.Handle("GET", "/nodes", http_api.Decorate(s.nodesHandler, log))
router.Handle("GET", "/node/:node", http_api.Decorate(s.nodeHandler, log))
router.Handle("GET", "/topic/:topic", http_api.Decorate(s.topicHandler, log))
router.Handle("GET", "/topic/:topic/:channel", http_api.Decorate(s.channelHandler, log))
router.Handle("GET", "/static/:asset", http_api.Decorate(s.embeddedAssetHandler, log))
router.Handle("GET", "/counter", http_api.Decorate(s.counterHandler, log))
router.Handle("GET", "/counter/data", http_api.Decorate(s.counterDataHandler, log))
router.Handle("GET", "/lookup", http_api.Decorate(s.lookupHandler, log))
router.Handle("GET", "/graphite_data", http_api.Decorate(s.graphiteDataHandler, log))

router.Handle("POST", "/tombstone_topic_producer", http_api.Decorate(s.tombstoneTopicProducerHandler, log))
router.Handle("POST", "/empty_topic", http_api.Decorate(s.emptyTopicHandler, log))
router.Handle("POST", "/delete_topic", http_api.Decorate(s.deleteTopicHandler, log))
router.Handle("POST", "/pause_topic", http_api.Decorate(s.pauseTopicHandler, log))
router.Handle("POST", "/unpause_topic", http_api.Decorate(s.pauseTopicHandler, log))
router.Handle("POST", "/empty_channel", http_api.Decorate(s.emptyChannelHandler, log))
router.Handle("POST", "/delete_channel", http_api.Decorate(s.deleteChannelHandler, log))
router.Handle("POST", "/pause_channel", http_api.Decorate(s.pauseChannelHandler, log))
router.Handle("POST", "/unpause_channel", http_api.Decorate(s.pauseChannelHandler, log))
router.Handle("POST", "/create_topic_channel", http_api.Decorate(s.createTopicChannelHandler, log))

if s.ctx.nsqadmin.opts.ProxyGraphite {
router.Handler("GET", "/render", s.proxy)
@@ -118,38 +122,31 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.router.ServeHTTP(w, req)
}

func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return "OK", nil
}

func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) embeddedAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
assetName := ps.ByName("asset")
s.ctx.nsqadmin.logf("INFO: Requesting embedded asset - %s", assetName)

asset, error := templates.Asset(assetName)
if error != nil {
s.ctx.nsqadmin.logf("ERROR: embedded asset access - %s : %s", assetName, error)
http.NotFound(w, req)
return
return nil, http_api.Err{404, "NOT_FOUND"}
}
assetLen := len(asset)

if strings.HasSuffix(assetName, ".js") {
w.Header().Set("Content-Type", "text/javascript")
} else if strings.HasSuffix(assetName, ".css") {
w.Header().Set("Content-Type", "text/css")
}
w.Header().Set("Content-Length", fmt.Sprintf("%d", assetLen))
w.Write(asset)

return asset, nil
}

func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

var topics []string
@@ -172,19 +169,18 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps h
}
err = templates.T.ExecuteTemplate(w, "index.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
topicName := ps.ByName("topic")

reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

producers := s.getProducers(topicName)
@@ -228,30 +224,26 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
}
err = templates.T.ExecuteTemplate(w, "topic.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
topicName := ps.ByName("topic")
channelName := ps.ByName("channel")

reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

producers := s.getProducers(topicName)
_, allChannelStats, _ := lookupd.GetNSQDStats(producers, topicName)
channelStats, ok := allChannelStats[channelName]

if !ok {
s.ctx.nsqadmin.logf("ERROR: channel stats do not exist")
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{404, "NOT_FOUND"}
}

hasE2eLatency := channelStats.E2eProcessingLatency != nil &&
@@ -286,17 +278,16 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps

err = templates.T.ExecuteTemplate(w, "channel.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

channels := make(map[string][]string)
@@ -325,24 +316,23 @@ func (s *httpServer) lookupHandler(w http.ResponseWriter, req *http.Request, ps
}
err = templates.T.ExecuteTemplate(w, "lookup.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, err := reqParams.Get("topic")
if err != nil || !protocol.IsValidTopicName(topicName) {
http.Error(w, "INVALID_TOPIC", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
}

channelName, err := reqParams.Get("channel")
if err != nil || (len(channelName) > 0 && !protocol.IsValidChannelName(channelName)) {
http.Error(w, "INVALID_CHANNEL", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_CHANNEL"}
}

for _, addr := range s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses {
@@ -402,21 +392,20 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.
}

http.Redirect(w, req, "/lookup", 302)
return nil, nil
}

func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, err := reqParams.Get("topic")
if err != nil {
http.Error(w, "MISSING_ARG_TOPIC", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
}

node, err := reqParams.Get("node")
if err != nil {
http.Error(w, "MISSING_ARG_NODE", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_NODE"}
}

rd, _ := reqParams.Get("rd")
@@ -468,15 +457,15 @@ func (s *httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *h
s.notifyAdminAction("tombstone_topic_producer", topicName, "", node, req)

http.Redirect(w, req, rd, 302)
return nil, nil
}

func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, err := reqParams.Get("topic")
if err != nil {
http.Error(w, "MISSING_ARG_TOPIC", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
}

rd, _ := reqParams.Get("rd")
@@ -518,15 +507,15 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request
s.notifyAdminAction("delete_topic", topicName, "", "", req)

http.Redirect(w, req, rd, 302)
return nil, nil
}

func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
http.Error(w, err.Error(), 500)
return
return nil, http_api.Err{400, err.Error()}
}

rd, _ := reqParams.Get("rd")
@@ -569,15 +558,15 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque
s.notifyAdminAction("delete_channel", topicName, channelName, "", req)

http.Redirect(w, req, rd, 302)
return nil, nil
}

func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, err := reqParams.Get("topic")
if err != nil {
http.Error(w, "MISSING_ARG_TOPIC", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
}

producerAddrs := s.getProducers(topicName)
@@ -592,15 +581,15 @@ func (s *httpServer) emptyTopicHandler(w http.ResponseWriter, req *http.Request,
s.notifyAdminAction("empty_topic", topicName, "", "", req)

http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302)
return nil, nil
}

func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, err := reqParams.Get("topic")
if err != nil {
http.Error(w, "MISSING_ARG_TOPIC", 500)
return
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
}

verb := "pause"
@@ -620,15 +609,15 @@ func (s *httpServer) pauseTopicHandler(w http.ResponseWriter, req *http.Request,
s.notifyAdminAction(verb+"_topic", topicName, "", "", req)

http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302)
return nil, nil
}

func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
http.Error(w, err.Error(), 500)
return
return nil, http_api.Err{400, err.Error()}
}

producerAddrs := s.getProducers(topicName)
@@ -644,15 +633,15 @@ func (s *httpServer) emptyChannelHandler(w http.ResponseWriter, req *http.Reques

http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s",
url.QueryEscape(topicName), url.QueryEscape(channelName)), 302)
return nil, nil
}

func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams := &http_api.PostParams{req}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
http.Error(w, err.Error(), 500)
return
return nil, http_api.Err{400, err.Error()}
}

verb := "pause"
@@ -672,16 +661,15 @@ func (s *httpServer) pauseChannelHandler(w http.ResponseWriter, req *http.Reques
s.notifyAdminAction(verb+"_channel", topicName, channelName, "", req)

http.Redirect(w, req, fmt.Sprintf("/topic/%s/%s", url.QueryEscape(topicName), url.QueryEscape(channelName)), 302)
return nil, nil
}

func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
node := ps.ByName("node")

reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

found := false
@@ -699,8 +687,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht
}
}
if !found {
http.Error(w, "INVALID_NODE", 500)
return
return nil, http_api.Err{404, "NOT_FOUND"}
}

topicStats, channelStats, _ := lookupd.GetNSQDStats([]string{node}, "")
@@ -735,17 +722,16 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht
}
err = templates.T.ExecuteTemplate(w, "node.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}
producers, _ := lookupd.GetLookupdProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)

@@ -764,9 +750,10 @@ func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps h
}
err = templates.T.ExecuteTemplate(w, "nodes.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

type counterTarget struct{}
@@ -779,12 +766,10 @@ func (c counterTarget) Host() string {
return "*"
}

func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}
p := struct {
Title string
@@ -799,21 +784,20 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps
}
err = templates.T.ExecuteTemplate(w, "counter.html", p)
if err != nil {
s.ctx.nsqadmin.logf("Template Error %s", err)
http.Error(w, "Template Error", 500)
s.ctx.nsqadmin.logf("ERROR: executing template - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
return nil, nil
}

// this endpoint works by giving out an ID that maps to a stats dictionary
// The initial request is the number of messages processed since each nsqd started up.
// Subsequent requsts pass that ID and get an updated delta based on each individual channel/nsqd message count
// That ID must be re-requested or it will be expired.
func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http_api.Respond(w, 500, "INVALID_REQUEST", nil)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

statsID, _ := reqParams.Get("id")
@@ -852,61 +836,46 @@ func (s *httpServer) counterDataHandler(w http.ResponseWriter, req *http.Request
}
s.counters[statsID] = newStats

data := make(map[string]interface{})
data["new_messages"] = newMessages
data["total_messages"] = totalMessages
data["id"] = statsID
http_api.Respond(w, 200, "OK", data)
return struct {
NewMessages int64 `json:"new_messages"`
TotalMessages int64 `json:"total_messages"`
ID string `json:"id"`
}{newMessages, totalMessages, statsID}, nil
}

func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) graphiteDataHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: failed to parse request params - %s", err)
http.Error(w, "INVALID_REQUEST", 500)
return
return nil, http_api.Err{400, "INVALID_REQUEST"}
}

metric, err := reqParams.Get("metric")
if err != nil {
s.ctx.nsqadmin.logf("ERROR: missing metric param - %s", err)
http.Error(w, "MISSING_METRIC_PARAM", 500)
return
if err != nil || metric != "rate" {
return nil, http_api.Err{404, "INVALID_ARG_METRIC"}
}

target, err := reqParams.Get("target")
if err != nil {
s.ctx.nsqadmin.logf("ERROR: missing target param - %s", err)
http.Error(w, "MISSING_TARGET_PARAM", 500)
return
}

if metric != "rate" {
s.ctx.nsqadmin.logf("ERROR: unknown metric value %s", metric)
http.Error(w, "INVALID_METRIC_PARAM", 500)
return
return nil, http_api.Err{404, "INVALID_ARG_TARGET"}
}

query := rateQuery(target, s.ctx.nsqadmin.opts.StatsdInterval)
url := s.ctx.nsqadmin.opts.GraphiteURL + query
s.ctx.nsqadmin.logf("GRAPHITE: %s", url)
response, err := graphiteGet(url)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: graphite request failed %s", err)
http.Error(w, "GRAPHITE_FAILED", 500)
return
s.ctx.nsqadmin.logf("ERROR: graphite request failed - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}

resp, err := parseRateResponse(response, s.ctx.nsqadmin.opts.StatsdInterval)
if err != nil {
s.ctx.nsqadmin.logf("ERROR: response formating failed - %s", err)
http.Error(w, "INVALID_GRAPHITE_RESPONSE", 500)
return
s.ctx.nsqadmin.logf("ERROR: response formatting failed - %s", err)
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}

w.Header().Set("Content-Type", "application/json")
w.Write(resp)
return
return resp, nil
}

func graphiteGet(url string) ([]byte, error) {
93 changes: 48 additions & 45 deletions nsqd/http.go
Original file line number Diff line number Diff line change
@@ -30,59 +30,65 @@ type httpServer struct {
}

func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
log := http_api.Log(ctx.nsqd.getOpts().Logger)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.getOpts().Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.getOpts().Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.getOpts().Logger)
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

// v1 negotiate
router.Handle("POST", "/pub", http_api.NegotiateVersion(s.doPUB))
router.Handle("POST", "/mpub", http_api.NegotiateVersion(s.doMPUB))
router.Handle("GET", "/stats", http_api.NegotiateVersion(s.doStats))
router.Handle("GET", "/ping", s.pingHandler)
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.NegotiateVersion))

// only v1
router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic))
router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic))
router.Handle("POST", "/topic/empty", http_api.V1(s.doEmptyTopic))
router.Handle("POST", "/topic/pause", http_api.V1(s.doPauseTopic))
router.Handle("POST", "/topic/unpause", http_api.V1(s.doPauseTopic))
router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel))
router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel))
router.Handle("POST", "/channel/empty", http_api.V1(s.doEmptyChannel))
router.Handle("POST", "/channel/pause", http_api.V1(s.doPauseChannel))
router.Handle("POST", "/channel/unpause", http_api.V1(s.doPauseChannel))
router.Handle("GET", "/config/:opt", http_api.V1(s.doConfig))
router.Handle("PUT", "/config/:opt", http_api.V1(s.doConfig))
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))

// deprecated, v1 negotiate
router.Handle("POST", "/put", http_api.NegotiateVersion(s.doPUB))
router.Handle("POST", "/mput", http_api.NegotiateVersion(s.doMPUB))
router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo))
router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic))
router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic))
router.Handle("POST", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic))
router.Handle("POST", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic))
router.Handle("POST", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic))
router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel))
router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel))
router.Handle("POST", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel))
router.Handle("POST", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel))
router.Handle("POST", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel))
router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic))
router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic))
router.Handle("GET", "/empty_topic", http_api.NegotiateVersion(s.doEmptyTopic))
router.Handle("GET", "/pause_topic", http_api.NegotiateVersion(s.doPauseTopic))
router.Handle("GET", "/unpause_topic", http_api.NegotiateVersion(s.doPauseTopic))
router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel))
router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel))
router.Handle("GET", "/empty_channel", http_api.NegotiateVersion(s.doEmptyChannel))
router.Handle("GET", "/pause_channel", http_api.NegotiateVersion(s.doPauseChannel))
router.Handle("GET", "/unpause_channel", http_api.NegotiateVersion(s.doPauseChannel))
router.Handle("POST", "/put", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
router.Handle("POST", "/mput", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion))
router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))

// debug
router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
@@ -105,15 +111,12 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.router.ServeHTTP(w, req)
}

func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
health := s.ctx.nsqd.GetHealth()
code := 200
if !s.ctx.nsqd.IsHealthy() {
code = 500
return nil, http_api.Err{500, health}
}
w.Header().Set("Content-Length", strconv.Itoa(len(health)))
w.WriteHeader(code)
io.WriteString(w, health)
return health, nil
}

func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
24 changes: 24 additions & 0 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
@@ -760,3 +760,27 @@ func TestHTTPconfig(t *testing.T) {
equal(t, resp.StatusCode, 200)
equal(t, string(body), addrs)
}

func TestHTTPerrors(t *testing.T) {
opts := NewOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

url := fmt.Sprintf("http://%s/stats", httpAddr)
resp, err := http.Post(url, "text/plain", nil)
equal(t, err, nil)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
equal(t, resp.StatusCode, 405)
equal(t, string(body), `{"message":"METHOD_NOT_ALLOWED"}`)

url = fmt.Sprintf("http://%s/not_found", httpAddr)
resp, err = http.Get(url)
equal(t, err, nil)
defer resp.Body.Close()
body, _ = ioutil.ReadAll(resp.Body)
equal(t, resp.StatusCode, 404)
equal(t, string(body), `{"message":"NOT_FOUND"}`)
}
56 changes: 30 additions & 26 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package nsqlookupd

import (
"fmt"
"io"
"net/http"
"net/http/pprof"
"sync/atomic"
@@ -19,40 +18,46 @@ type httpServer struct {
}

func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.opts.Logger)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.opts.Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.opts.Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.opts.Logger)
s := &httpServer{
ctx: ctx,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

// v1 negotiate
router.Handle("GET", "/ping", s.pingHandler)
router.Handle("GET", "/debug", http_api.NegotiateVersion(s.doDebug))
router.Handle("GET", "/lookup", http_api.NegotiateVersion(s.doLookup))
router.Handle("GET", "/topics", http_api.NegotiateVersion(s.doTopics))
router.Handle("GET", "/channels", http_api.NegotiateVersion(s.doChannels))
router.Handle("GET", "/nodes", http_api.NegotiateVersion(s.doNodes))
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.NegotiateVersion))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.NegotiateVersion))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.NegotiateVersion))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.NegotiateVersion))
router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.NegotiateVersion))

// only v1
router.Handle("POST", "/topic/create", http_api.V1(s.doCreateTopic))
router.Handle("POST", "/topic/delete", http_api.V1(s.doDeleteTopic))
router.Handle("POST", "/channel/create", http_api.V1(s.doCreateChannel))
router.Handle("POST", "/channel/delete", http_api.V1(s.doDeleteChannel))
router.Handle("POST", "/topic/tombstone", http_api.V1(s.doTombstoneTopicProducer))
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

// deprecated, v1 negotiate
router.Handle("GET", "/info", http_api.NegotiateVersion(s.doInfo))
router.Handle("POST", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic))
router.Handle("POST", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic))
router.Handle("POST", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel))
router.Handle("POST", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel))
router.Handle("POST", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer))
router.Handle("GET", "/create_topic", http_api.NegotiateVersion(s.doCreateTopic))
router.Handle("GET", "/delete_topic", http_api.NegotiateVersion(s.doDeleteTopic))
router.Handle("GET", "/create_channel", http_api.NegotiateVersion(s.doCreateChannel))
router.Handle("GET", "/delete_channel", http_api.NegotiateVersion(s.doDeleteChannel))
router.Handle("GET", "/tombstone_topic_producer", http_api.NegotiateVersion(s.doTombstoneTopicProducer))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion))
router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
router.Handle("POST", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion))
router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
router.Handle("GET", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion))

// debug
router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
@@ -71,9 +76,8 @@ func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.router.ServeHTTP(w, req)
}

func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return "OK", nil
}

func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

0 comments on commit df41bf9

Please sign in to comment.