Skip to content

Commit

Permalink
Update elasticsearch output to use vendored elastigo and fix #331. (#333
Browse files Browse the repository at this point in the history
)

* add elastigo submodule

* add gou submodule

* add go-hostpool submodule

* use vendored elasticgo

* update ReqUrl to ReqURL
  • Loading branch information
joekiller authored and buger committed Jul 7, 2016
1 parent c128d46 commit e65035d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 51 deletions.
9 changes: 9 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[submodule "vendor/github.com/mattbaird/elastigo"]
path = vendor/github.com/mattbaird/elastigo
url = https://github.com/mattbaird/elastigo
[submodule "vendor/github.com/araddon/gou"]
path = vendor/github.com/araddon/gou
url = https://github.com/araddon/gou
[submodule "vendor/github.com/bitly/go-hostpool"]
path = vendor/github.com/bitly/go-hostpool
url = https://github.com/bitly/go-hostpool
103 changes: 52 additions & 51 deletions elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package main

import (
"encoding/json"
"github.com/buger/elastigo/api"
"github.com/buger/elastigo/core"
"github.com/mattbaird/elastigo/lib"
"github.com/buger/gor/proto"
"log"
"regexp"
Expand All @@ -19,33 +18,34 @@ func (e *ESUriErorr) Error() string {
type ESPlugin struct {
Active bool
ApiPort string
eConn *elastigo.Conn
Host string
Index string
indexor *core.BulkIndexer
indexor *elastigo.BulkIndexer
done chan bool
}

type ESRequestResponse struct {
ReqUrl []byte `json:"Req_URL"`
ReqMethod []byte `json:"Req_Method"`
ReqUserAgent []byte `json:"Req_User-Agent"`
ReqAcceptLanguage []byte `json:"Req_Accept-Language,omitempty"`
ReqAccept []byte `json:"Req_Accept,omitempty"`
ReqAcceptEncoding []byte `json:"Req_Accept-Encoding,omitempty"`
ReqIfModifiedSince []byte `json:"Req_If-Modified-Since,omitempty"`
ReqConnection []byte `json:"Req_Connection,omitempty"`
ReqCookies []byte `json:"Req_Cookies,omitempty"`
RespStatus []byte `json:"Resp_Status"`
RespStatusCode []byte `json:"Resp_Status-Code"`
RespProto []byte `json:"Resp_Proto,omitempty"`
RespContentLength []byte `json:"Resp_Content-Length,omitempty"`
RespContentType []byte `json:"Resp_Content-Type,omitempty"`
RespTransferEncoding []byte `json:"Resp_Transfer-Encoding,omitempty"`
RespContentEncoding []byte `json:"Resp_Content-Encoding,omitempty"`
RespExpires []byte `json:"Resp_Expires,omitempty"`
RespCacheControl []byte `json:"Resp_Cache-Control,omitempty"`
RespVary []byte `json:"Resp_Vary,omitempty"`
RespSetCookie []byte `json:"Resp_Set-Cookie,omitempty"`
ReqURL string `json:"Req_URL"`
ReqMethod string `json:"Req_Method"`
ReqUserAgent string `json:"Req_User-Agent"`
ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"`
ReqAccept string `json:"Req_Accept,omitempty"`
ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"`
ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"`
ReqConnection string `json:"Req_Connection,omitempty"`
ReqCookies string `json:"Req_Cookies,omitempty"`
RespStatus string `json:"Resp_Status"`
RespStatusCode string `json:"Resp_Status-Code"`
RespProto string `json:"Resp_Proto,omitempty"`
RespContentLength string `json:"Resp_Content-Length,omitempty"`
RespContentType string `json:"Resp_Content-Type,omitempty"`
RespTransferEncoding string `json:"Resp_Transfer-Encoding,omitempty"`
RespContentEncoding string `json:"Resp_Content-Encoding,omitempty"`
RespExpires string `json:"Resp_Expires,omitempty"`
RespCacheControl string `json:"Resp_Cache-Control,omitempty"`
RespVary string `json:"Resp_Vary,omitempty"`
RespSetCookie string `json:"Resp_Set-Cookie,omitempty"`
Rtt int64 `json:"RTT"`
Timestamp time.Time
}
Expand Down Expand Up @@ -76,24 +76,24 @@ func (p *ESPlugin) Init(URI string) {
if err != nil {
log.Fatal("Can't initialize ElasticSearch plugin.", err)
}
p.eConn = elastigo.NewConn()
p.eConn.SetPort(p.ApiPort)
p.eConn.SetHosts([]string{p.Host})

api.Domain = p.Host
api.Port = p.ApiPort

p.indexor = core.NewBulkIndexerErrors(50, 60)
p.indexor = p.eConn.NewBulkIndexerErrors(50, 60)
p.done = make(chan bool)
p.indexor.Run(p.done)
p.indexor.Start()

// Only start the ErrorHandler goroutine when in verbose mode
// no need to burn ressources otherwise
// go p.ErrorHandler()
go p.ErrorHandler()

log.Println("Initialized Elasticsearch Plugin")
return
}

func (p *ESPlugin) IndexerShutdown() {
p.done <- true
p.indexor.Stop()
return
}

Expand All @@ -118,36 +118,37 @@ func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {
}
t := time.Now()
rtt := p.RttDurationToMs(stop.Sub(start))
req = payloadBody(req)

esResp := ESRequestResponse{
ReqUrl: proto.Path(req),
ReqMethod: proto.Method(req),
ReqUserAgent: proto.Header(req, []byte("User-Agent")),
ReqAcceptLanguage: proto.Header(req, []byte("Accept-Language")),
ReqAccept: proto.Header(req, []byte("Accept")),
ReqAcceptEncoding: proto.Header(req, []byte("Accept-Encoding")),
ReqIfModifiedSince: proto.Header(req, []byte("If-Modified-Since")),
ReqConnection: proto.Header(req, []byte("Connection")),
ReqCookies: proto.Header(req, []byte("Cookie")),
RespStatus: proto.Status(resp),
RespStatusCode: proto.Status(resp),
RespProto: proto.Method(resp),
RespContentLength: proto.Header(resp, []byte("Content-Length")),
RespContentType: proto.Header(resp, []byte("Content-Type")),
RespTransferEncoding: proto.Header(resp, []byte("Transfer-Encoding")),
RespContentEncoding: proto.Header(resp, []byte("Content-Encoding")),
RespExpires: proto.Header(resp, []byte("Expires")),
RespCacheControl: proto.Header(resp, []byte("Cache-Control")),
RespVary: proto.Header(resp, []byte("Vary")),
RespSetCookie: proto.Header(resp, []byte("Set-Cookie")),
ReqURL: string(proto.Path(req)),
ReqMethod: string(proto.Method(req)),
ReqUserAgent: string(proto.Header(req, []byte("User-Agent"))),
ReqAcceptLanguage: string(proto.Header(req, []byte("Accept-Language"))),
ReqAccept: string(proto.Header(req, []byte("Accept"))),
ReqAcceptEncoding: string(proto.Header(req, []byte("Accept-Encoding"))),
ReqIfModifiedSince: string(proto.Header(req, []byte("If-Modified-Since"))),
ReqConnection: string(proto.Header(req, []byte("Connection"))),
ReqCookies: string(proto.Header(req, []byte("Cookie"))),
RespStatus: string(proto.Status(resp)),
RespStatusCode: string(proto.Status(resp)),
RespProto: string(proto.Method(resp)),
RespContentLength: string(proto.Header(resp, []byte("Content-Length"))),
RespContentType: string(proto.Header(resp, []byte("Content-Type"))),
RespTransferEncoding: string(proto.Header(resp, []byte("Transfer-Encoding"))),
RespContentEncoding: string(proto.Header(resp, []byte("Content-Encoding"))),
RespExpires: string(proto.Header(resp, []byte("Expires"))),
RespCacheControl: string(proto.Header(resp, []byte("Cache-Control"))),
RespVary: string(proto.Header(resp, []byte("Vary"))),
RespSetCookie: string(proto.Header(resp, []byte("Set-Cookie"))),
Rtt: rtt,
Timestamp: t,
}
j, err := json.Marshal(&esResp)
if err != nil {
log.Println(err)
} else {
p.indexor.Index(p.Index, "RequestResponse", "", "", &t, j, true)
p.indexor.Index(p.Index, "RequestResponse", "", "", "", &t, j)
}
return
}
1 change: 1 addition & 0 deletions vendor/github.com/araddon/gou
Submodule gou added at 50a94a
1 change: 1 addition & 0 deletions vendor/github.com/bitly/go-hostpool
Submodule go-hostpool added at d0e59c
1 change: 1 addition & 0 deletions vendor/github.com/mattbaird/elastigo
Submodule elastigo added at 34c4c4

0 comments on commit e65035d

Please sign in to comment.