diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..fef1eaa6 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "vendor/github.com/mattbaird/elastigo"] + path = vendor/github.com/mattbaird/elastigo + url = https://github.com/mattbaird/elastigo +[submodule "vendor/github.com/araddon/gou"] + path = vendor/github.com/araddon/gou + url = https://github.com/araddon/gou +[submodule "vendor/github.com/bitly/go-hostpool"] + path = vendor/github.com/bitly/go-hostpool + url = https://github.com/bitly/go-hostpool diff --git a/elasticsearch.go b/elasticsearch.go index a930ac35..2ff4ab93 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -2,8 +2,7 @@ package main import ( "encoding/json" - "github.com/buger/elastigo/api" - "github.com/buger/elastigo/core" + "github.com/mattbaird/elastigo/lib" "github.com/buger/gor/proto" "log" "regexp" @@ -19,33 +18,34 @@ func (e *ESUriErorr) Error() string { type ESPlugin struct { Active bool ApiPort string + eConn *elastigo.Conn Host string Index string - indexor *core.BulkIndexer + indexor *elastigo.BulkIndexer done chan bool } type ESRequestResponse struct { - ReqUrl []byte `json:"Req_URL"` - ReqMethod []byte `json:"Req_Method"` - ReqUserAgent []byte `json:"Req_User-Agent"` - ReqAcceptLanguage []byte `json:"Req_Accept-Language,omitempty"` - ReqAccept []byte `json:"Req_Accept,omitempty"` - ReqAcceptEncoding []byte `json:"Req_Accept-Encoding,omitempty"` - ReqIfModifiedSince []byte `json:"Req_If-Modified-Since,omitempty"` - ReqConnection []byte `json:"Req_Connection,omitempty"` - ReqCookies []byte `json:"Req_Cookies,omitempty"` - RespStatus []byte `json:"Resp_Status"` - RespStatusCode []byte `json:"Resp_Status-Code"` - RespProto []byte `json:"Resp_Proto,omitempty"` - RespContentLength []byte `json:"Resp_Content-Length,omitempty"` - RespContentType []byte `json:"Resp_Content-Type,omitempty"` - RespTransferEncoding []byte `json:"Resp_Transfer-Encoding,omitempty"` - RespContentEncoding []byte `json:"Resp_Content-Encoding,omitempty"` - RespExpires []byte `json:"Resp_Expires,omitempty"` - RespCacheControl []byte `json:"Resp_Cache-Control,omitempty"` - RespVary []byte `json:"Resp_Vary,omitempty"` - RespSetCookie []byte `json:"Resp_Set-Cookie,omitempty"` + ReqURL string `json:"Req_URL"` + ReqMethod string `json:"Req_Method"` + ReqUserAgent string `json:"Req_User-Agent"` + ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"` + ReqAccept string `json:"Req_Accept,omitempty"` + ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"` + ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"` + ReqConnection string `json:"Req_Connection,omitempty"` + ReqCookies string `json:"Req_Cookies,omitempty"` + RespStatus string `json:"Resp_Status"` + RespStatusCode string `json:"Resp_Status-Code"` + RespProto string `json:"Resp_Proto,omitempty"` + RespContentLength string `json:"Resp_Content-Length,omitempty"` + RespContentType string `json:"Resp_Content-Type,omitempty"` + RespTransferEncoding string `json:"Resp_Transfer-Encoding,omitempty"` + RespContentEncoding string `json:"Resp_Content-Encoding,omitempty"` + RespExpires string `json:"Resp_Expires,omitempty"` + RespCacheControl string `json:"Resp_Cache-Control,omitempty"` + RespVary string `json:"Resp_Vary,omitempty"` + RespSetCookie string `json:"Resp_Set-Cookie,omitempty"` Rtt int64 `json:"RTT"` Timestamp time.Time } @@ -76,24 +76,24 @@ func (p *ESPlugin) Init(URI string) { if err != nil { log.Fatal("Can't initialize ElasticSearch plugin.", err) } + p.eConn = elastigo.NewConn() + p.eConn.SetPort(p.ApiPort) + p.eConn.SetHosts([]string{p.Host}) - api.Domain = p.Host - api.Port = p.ApiPort - - p.indexor = core.NewBulkIndexerErrors(50, 60) + p.indexor = p.eConn.NewBulkIndexerErrors(50, 60) p.done = make(chan bool) - p.indexor.Run(p.done) + p.indexor.Start() // Only start the ErrorHandler goroutine when in verbose mode // no need to burn ressources otherwise - // go p.ErrorHandler() + go p.ErrorHandler() log.Println("Initialized Elasticsearch Plugin") return } func (p *ESPlugin) IndexerShutdown() { - p.done <- true + p.indexor.Stop() return } @@ -118,28 +118,29 @@ func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) { } t := time.Now() rtt := p.RttDurationToMs(stop.Sub(start)) + req = payloadBody(req) esResp := ESRequestResponse{ - ReqUrl: proto.Path(req), - ReqMethod: proto.Method(req), - ReqUserAgent: proto.Header(req, []byte("User-Agent")), - ReqAcceptLanguage: proto.Header(req, []byte("Accept-Language")), - ReqAccept: proto.Header(req, []byte("Accept")), - ReqAcceptEncoding: proto.Header(req, []byte("Accept-Encoding")), - ReqIfModifiedSince: proto.Header(req, []byte("If-Modified-Since")), - ReqConnection: proto.Header(req, []byte("Connection")), - ReqCookies: proto.Header(req, []byte("Cookie")), - RespStatus: proto.Status(resp), - RespStatusCode: proto.Status(resp), - RespProto: proto.Method(resp), - RespContentLength: proto.Header(resp, []byte("Content-Length")), - RespContentType: proto.Header(resp, []byte("Content-Type")), - RespTransferEncoding: proto.Header(resp, []byte("Transfer-Encoding")), - RespContentEncoding: proto.Header(resp, []byte("Content-Encoding")), - RespExpires: proto.Header(resp, []byte("Expires")), - RespCacheControl: proto.Header(resp, []byte("Cache-Control")), - RespVary: proto.Header(resp, []byte("Vary")), - RespSetCookie: proto.Header(resp, []byte("Set-Cookie")), + ReqURL: string(proto.Path(req)), + ReqMethod: string(proto.Method(req)), + ReqUserAgent: string(proto.Header(req, []byte("User-Agent"))), + ReqAcceptLanguage: string(proto.Header(req, []byte("Accept-Language"))), + ReqAccept: string(proto.Header(req, []byte("Accept"))), + ReqAcceptEncoding: string(proto.Header(req, []byte("Accept-Encoding"))), + ReqIfModifiedSince: string(proto.Header(req, []byte("If-Modified-Since"))), + ReqConnection: string(proto.Header(req, []byte("Connection"))), + ReqCookies: string(proto.Header(req, []byte("Cookie"))), + RespStatus: string(proto.Status(resp)), + RespStatusCode: string(proto.Status(resp)), + RespProto: string(proto.Method(resp)), + RespContentLength: string(proto.Header(resp, []byte("Content-Length"))), + RespContentType: string(proto.Header(resp, []byte("Content-Type"))), + RespTransferEncoding: string(proto.Header(resp, []byte("Transfer-Encoding"))), + RespContentEncoding: string(proto.Header(resp, []byte("Content-Encoding"))), + RespExpires: string(proto.Header(resp, []byte("Expires"))), + RespCacheControl: string(proto.Header(resp, []byte("Cache-Control"))), + RespVary: string(proto.Header(resp, []byte("Vary"))), + RespSetCookie: string(proto.Header(resp, []byte("Set-Cookie"))), Rtt: rtt, Timestamp: t, } @@ -147,7 +148,7 @@ func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) { if err != nil { log.Println(err) } else { - p.indexor.Index(p.Index, "RequestResponse", "", "", &t, j, true) + p.indexor.Index(p.Index, "RequestResponse", "", "", "", &t, j) } return } diff --git a/vendor/github.com/araddon/gou b/vendor/github.com/araddon/gou new file mode 160000 index 00000000..50a94aa4 --- /dev/null +++ b/vendor/github.com/araddon/gou @@ -0,0 +1 @@ +Subproject commit 50a94aa4a3fb69e8fbde05df290fcb49fa685e07 diff --git a/vendor/github.com/bitly/go-hostpool b/vendor/github.com/bitly/go-hostpool new file mode 160000 index 00000000..d0e59c22 --- /dev/null +++ b/vendor/github.com/bitly/go-hostpool @@ -0,0 +1 @@ +Subproject commit d0e59c22a56e8dadfed24f74f452cea5a52722d2 diff --git a/vendor/github.com/mattbaird/elastigo b/vendor/github.com/mattbaird/elastigo new file mode 160000 index 00000000..34c4c4d8 --- /dev/null +++ b/vendor/github.com/mattbaird/elastigo @@ -0,0 +1 @@ +Subproject commit 34c4c4d8425cbdcbc8e257943a2044d5e9f7dab5