Skip to content

Commit

Permalink
Pipeline metrics
Browse files Browse the repository at this point in the history
- report pipeline metrics on:
  - libbeat.pipeline....
  - xpack.monitoring.pipeline...
  • Loading branch information
urso committed Jul 13, 2017
1 parent c407f56 commit a3adc49
Show file tree
Hide file tree
Showing 35 changed files with 575 additions and 254 deletions.
2 changes: 1 addition & 1 deletion libbeat/cmd/test/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func GenTestOutputCmd(name, beatVersion string) *cobra.Command {
os.Exit(1)
}

output, err := outputs.Load(b.Info, b.Config.Output.Name(), b.Config.Output.Config())
output, err := outputs.Load(b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config())
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err)
os.Exit(1)
Expand Down
13 changes: 9 additions & 4 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
Events: 20,
}), nil
}
pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})

monitoring := monitoring.Default.NewRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
monitoring,
brokerFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
if err != nil {
return nil, err
}
Expand Down
41 changes: 31 additions & 10 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type console struct {
out *os.File
stats *outputs.Stats
writer *bufio.Writer
codec codec.Codec
index string
Expand All @@ -33,7 +34,11 @@ func init() {
outputs.RegisterType("console", makeConsole)
}

func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) {
func makeConsole(
beat common.BeatInfo,
stats *outputs.Stats,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
if err != nil {
Expand All @@ -51,7 +56,7 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error
}

index := beat.Beat
c, err := newConsole(index, enc)
c, err := newConsole(index, stats, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
}
Expand All @@ -67,46 +72,62 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error
return outputs.Success(config.BatchSize, 0, c)
}

func newConsole(index string, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, index: index}
func newConsole(index string, stats *outputs.Stats, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, stats: stats, index: index}
c.writer = bufio.NewWriterSize(c.out, 8*1024)
return c, nil
}

func (c *console) Close() error { return nil }
func (c *console) Publish(batch publisher.Batch) error {
st := c.stats
events := batch.Events()
st.NewBatch(len(events))

dropped := 0
for i := range events {
c.publishEvent(&events[i])
ok := c.publishEvent(&events[i])
if !ok {
dropped++
}
}

c.writer.Flush()
batch.ACK()

st.Dropped(dropped)
st.Acked(len(events) - dropped)

return nil
}

var nl = []byte("\n")

func (c *console) publishEvent(event *publisher.Event) {
func (c *console) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return
return false
}

logp.Critical("Unable to encode event: %v", err)
return
return false
}

if err := c.writeBuffer(serializedEvent); err != nil {
c.stats.WriteError()
logp.Critical("Unable to publish events to console: %v", err)
return
return false
}

if err := c.writeBuffer(nl); err != nil {
c.stats.WriteError()
logp.Critical("Error when appending newline to event: %v", err)
return
return false
}

c.stats.WriteBytes(len(serializedEvent) + 1)
return true
}

func (c *console) writeBuffer(buf []byte) error {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestConsoleOutput(t *testing.T) {

func run(codec codec.Codec, batches ...publisher.Batch) (string, error) {
return withStdout(func() {
c, _ := newConsole("test", codec)
c, _ := newConsole("test", nil, codec)
for _, b := range batches {
c.Publish(b)
}
Expand Down
46 changes: 22 additions & 24 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand Down Expand Up @@ -41,7 +40,7 @@ type Client struct {
compressionLevel int
proxyURL *url.URL

stats *ClientStats
stats *outputs.Stats
}

// ClientSettings contains the settings for a client.
Expand All @@ -56,14 +55,7 @@ type ClientSettings struct {
Pipeline *outil.Selector
Timeout time.Duration
CompressionLevel int
Stats *ClientStats
}

type ClientStats struct {
PublishCallCount *monitoring.Int
EventsACKed *monitoring.Int
EventsFailed *monitoring.Int
IO *transport.IOStats
Stats *outputs.Stats
}

type connectCallback func(client *Client) error
Expand Down Expand Up @@ -139,9 +131,9 @@ func NewClient(
return nil, err
}

if st := s.Stats; st != nil && st.IO != nil {
dialer = transport.StatsDialer(dialer, st.IO)
tlsDialer = transport.StatsDialer(tlsDialer, st.IO)
if st := s.Stats; st != nil {
dialer = transport.StatsDialer(dialer, st)
tlsDialer = transport.StatsDialer(tlsDialer, st)
}

params := s.Parameters
Expand Down Expand Up @@ -251,8 +243,10 @@ func (client *Client) publishEvents(
data []publisher.Event,
) ([]publisher.Event, error) {
begin := time.Now()
if st := client.stats; st != nil && st.PublishCallCount != nil {
st.PublishCallCount.Add(1)
st := client.stats

if st != nil {
st.NewBatch(len(data))
}

if len(data) == 0 {
Expand All @@ -264,8 +258,14 @@ func (client *Client) publishEvents(

// encode events into bulk request buffer, dropping failed elements from
// events slice

origCount := len(data)
data = bulkEncodePublishRequest(body, client.index, client.pipeline, data)
if len(data) == 0 {
newCount := len(data)
if st != nil && origCount > newCount {
st.Dropped(origCount - newCount)
}
if newCount == 0 {
return nil, nil
}

Expand All @@ -290,22 +290,20 @@ func (client *Client) publishEvents(
failedEvents = bulkCollectPublishFails(&client.json, data)
}

failed := len(failedEvents)
if st := client.stats; st != nil {
countOK := int64(len(data) - len(failedEvents))
st.EventsACKed.Add(countOK)
outputs.AckedEvents.Add(countOK)
if failed := int64(len(failedEvents)); failed > 0 {
st.EventsFailed.Add(failed)
}
acked := len(data) - failed

st.Acked(acked)
st.Failed(failed)
}

if len(failedEvents) > 0 {
if failed > 0 {
if sendErr == nil {
sendErr = errTempBulkFailure
}
return failedEvents, sendErr
}

return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) {
t.Fatal(err)
}

output, err := makeES(common.BeatInfo{Beat: "libbeat"}, config)
output, err := makeES(common.BeatInfo{Beat: "libbeat"}, nil, config)
if err != nil {
t.Fatal(err)
}
Expand Down
36 changes: 5 additions & 31 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)

func init() {
Expand Down Expand Up @@ -40,20 +38,6 @@ type callbacksRegistry struct {
// XXX: it would be fantastic to do this without a package global
var connectCallbackRegistry callbacksRegistry

// Metrics that can retrieved through the expvar web interface.
var (
esMetrics = outputs.Metrics.NewRegistry("elasticsearch")

ackedEvents = monitoring.NewInt(esMetrics, "events.acked")
eventsNotAcked = monitoring.NewInt(esMetrics, "events.not_acked")
publishEventsCallCount = monitoring.NewInt(esMetrics, "publishEvents.call.count")

statReadBytes = monitoring.NewInt(esMetrics, "read.bytes")
statWriteBytes = monitoring.NewInt(esMetrics, "write.bytes")
statReadErrors = monitoring.NewInt(esMetrics, "read.errors")
statWriteErrors = monitoring.NewInt(esMetrics, "write.errors")
)

// RegisterConnectCallback registers a callback for the elasticsearch output
// The callback is called each time the client connects to elasticsearch.
func RegisterConnectCallback(callback connectCallback) {
Expand All @@ -62,7 +46,11 @@ func RegisterConnectCallback(callback connectCallback) {
connectCallbackRegistry.callbacks = append(connectCallbackRegistry.callbacks, callback)
}

func makeES(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) {
func makeES(
beat common.BeatInfo,
stats *outputs.Stats,
cfg *common.Config,
) (outputs.Group, error) {
if !cfg.HasField("bulk_max_size") {
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}
Expand Down Expand Up @@ -125,20 +113,6 @@ func makeES(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) {
params = nil
}

stats := &ClientStats{
PublishCallCount: publishEventsCallCount,
EventsACKed: ackedEvents,
EventsFailed: eventsNotAcked,
IO: &transport.IOStats{
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
OutputsWrite: outputs.WriteBytes,
OutputsWriteErrors: outputs.WriteErrors,
},
}

clients := make([]outputs.NetworkClient, len(hosts))
for i, host := range hosts {
esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200)
Expand Down
24 changes: 22 additions & 2 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ func init() {

type fileOutput struct {
beat common.BeatInfo
stats *outputs.Stats
rotator logp.FileRotator
codec codec.Codec
}

// New instantiates a new file output instance.
func makeFileout(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) {
func makeFileout(
beat common.BeatInfo,
stats *outputs.Stats,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
Expand All @@ -29,7 +34,7 @@ func makeFileout(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error
cfg.SetInt("flush_interval", -1, -1)
cfg.SetInt("bulk_max_size", -1, -1)

fo := &fileOutput{beat: beat}
fo := &fileOutput{beat: beat, stats: stats}
if err := fo.init(config); err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -87,7 +92,11 @@ func (out *fileOutput) Publish(
) error {
defer batch.ACK()

st := out.stats
events := batch.Events()
st.NewBatch(len(events))

dropped := 0
for i := range events {
event := &events[i]

Expand All @@ -98,19 +107,30 @@ func (out *fileOutput) Publish(
} else {
logp.Warn("Failed to serialize the event: %v", err)
}

dropped++
continue
}

err = out.rotator.WriteLine(serializedEvent)
if err != nil {
st.WriteError()

if event.Guaranteed() {
logp.Critical("Writing event to file failed with: %v", err)
} else {
logp.Warn("Writing event to file failed with: %v", err)
}

dropped++
continue
}

st.WriteBytes(len(serializedEvent) + 1)
}

st.Dropped(dropped)
st.Acked(len(events) - dropped)

return nil
}
Loading

0 comments on commit a3adc49

Please sign in to comment.